You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/04 01:02:26 UTC
[1/6] beam git commit: Removes all overrides of
PTransform.getDefaultOutputCoder and deprecates it
Repository: beam
Updated Branches:
refs/heads/master 38f189063 -> 9e6530adb
Removes all overrides of PTransform.getDefaultOutputCoder and deprecates it
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95e2a00a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95e2a00a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95e2a00a
Branch: refs/heads/master
Commit: 95e2a00a807caaf3b4a9532e29dc38fd9d32e700
Parents: bb1bf3c
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 26 16:33:36 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 3 15:40:46 2017 -0700
----------------------------------------------------------------------
.../apache/beam/examples/complete/TfIdf.java | 14 ++--
.../core/construction/ForwardingPTransform.java | 18 ++---
.../construction/ForwardingPTransformTest.java | 17 ++++-
.../beam/runners/direct/MultiStepCombine.java | 10 +--
.../beam/runners/dataflow/AssignWindows.java | 8 +--
.../DataflowPipelineTranslatorTest.java | 10 ---
.../java/org/apache/beam/sdk/io/AvroIO.java | 6 --
.../sdk/io/BoundedReadFromUnboundedSource.java | 8 +--
.../main/java/org/apache/beam/sdk/io/Read.java | 6 --
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 18 +----
.../java/org/apache/beam/sdk/io/TextIO.java | 12 ----
.../org/apache/beam/sdk/transforms/Combine.java | 22 ++----
.../org/apache/beam/sdk/transforms/Create.java | 70 +++++++++-----------
.../org/apache/beam/sdk/transforms/Filter.java | 26 ++++----
.../apache/beam/sdk/transforms/PTransform.java | 13 +++-
.../org/apache/beam/sdk/transforms/ParDo.java | 59 ++++++++---------
.../beam/sdk/transforms/windowing/Window.java | 6 --
.../beam/sdk/runners/TransformTreeTest.java | 7 --
.../apache/beam/sdk/transforms/CreateTest.java | 27 +++-----
.../beam/sdk/extensions/sorter/SortValues.java | 20 +++---
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 18 +----
.../io/gcp/bigquery/PassThroughThenCleanup.java | 2 +-
.../sdk/io/gcp/bigquery/StreamingInserts.java | 8 ---
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 13 +---
24 files changed, 149 insertions(+), 269 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 435ffab..cfc413c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -25,7 +25,6 @@ import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.Set;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringDelegateCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -155,11 +154,6 @@ public class TfIdf {
}
@Override
- public Coder<?> getDefaultOutputCoder() {
- return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of());
- }
-
- @Override
public PCollection<KV<URI, String>> expand(PBegin input) {
Pipeline pipeline = input.getPipeline();
@@ -179,9 +173,11 @@ public class TfIdf {
uriString = uri.toString();
}
- PCollection<KV<URI, String>> oneUriToLines = pipeline
- .apply("TextIO.Read(" + uriString + ")", TextIO.read().from(uriString))
- .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
+ PCollection<KV<URI, String>> oneUriToLines =
+ pipeline
+ .apply("TextIO.Read(" + uriString + ")", TextIO.read().from(uriString))
+ .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri))
+ .setCoder(KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of()));
urisToLines = urisToLines.and(oneUriToLines);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
index ca25ba7..ccf41f3 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.core.construction;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -37,7 +36,16 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT extend
@Override
public OutputT expand(InputT input) {
- return delegate().expand(input);
+ OutputT res = delegate().expand(input);
+ if (res instanceof PCollection) {
+ PCollection pc = (PCollection) res;
+ try {
+ pc.setCoder(delegate().getDefaultOutputCoder(input, pc));
+ } catch (CannotProvideCoderException e) {
+ // Let coder inference happen later.
+ }
+ }
+ return res;
}
@Override
@@ -51,12 +59,6 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT extend
}
@Override
- public <T> Coder<T> getDefaultOutputCoder(InputT input, PCollection<T> output)
- throws CannotProvideCoderException {
- return delegate().getDefaultOutputCoder(input, output);
- }
-
- @Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.delegate(delegate());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
index 74c056c..4741b6b 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -90,14 +91,24 @@ public class ForwardingPTransformTest {
@Test
public void getDefaultOutputCoderDelegates() throws Exception {
@SuppressWarnings("unchecked")
- PCollection<Integer> input = Mockito.mock(PCollection.class);
+ PCollection<Integer> input =
+ PCollection.createPrimitiveOutputInternal(
+ null /* pipeline */,
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.BOUNDED,
+ null /* coder */);
@SuppressWarnings("unchecked")
- PCollection<String> output = Mockito.mock(PCollection.class);
+ PCollection<String> output = PCollection.createPrimitiveOutputInternal(
+ null /* pipeline */,
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.BOUNDED,
+ null /* coder */);
@SuppressWarnings("unchecked")
Coder<String> outputCoder = Mockito.mock(Coder.class);
+ Mockito.when(delegate.expand(input)).thenReturn(output);
Mockito.when(delegate.getDefaultOutputCoder(input, output)).thenReturn(outputCoder);
- assertThat(forwarding.getDefaultOutputCoder(input, output), equalTo(outputCoder));
+ assertThat(forwarding.expand(input).getCoder(), equalTo(outputCoder));
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
index 6f49e94..ae21b4d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
@@ -213,8 +213,7 @@ class MultiStepCombine<K, InputT, AccumT, OutputT>
inputCoder.getKeyCoder())))
.setCoder(KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder))
.apply(GroupByKey.<K, AccumT>create())
- .apply(new MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>(combineFn))
- .setCoder(outputCoder);
+ .apply(new MergeAndExtractAccumulatorOutput<>(combineFn, outputCoder));
}
private static class CombineInputs<K, InputT, AccumT> extends DoFn<KV<K, InputT>, KV<K, AccumT>> {
@@ -320,9 +319,12 @@ class MultiStepCombine<K, InputT, AccumT, OutputT>
static class MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>
extends RawPTransform<PCollection<KV<K, Iterable<AccumT>>>, PCollection<KV<K, OutputT>>> {
private final CombineFn<?, AccumT, OutputT> combineFn;
+ private final Coder<KV<K, OutputT>> outputCoder;
- private MergeAndExtractAccumulatorOutput(CombineFn<?, AccumT, OutputT> combineFn) {
+ private MergeAndExtractAccumulatorOutput(
+ CombineFn<?, AccumT, OutputT> combineFn, Coder<KV<K, OutputT>> outputCoder) {
this.combineFn = combineFn;
+ this.outputCoder = outputCoder;
}
CombineFn<?, AccumT, OutputT> getCombineFn() {
@@ -332,7 +334,7 @@ class MultiStepCombine<K, InputT, AccumT, OutputT>
@Override
public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, Iterable<AccumT>>> input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), outputCoder);
}
@Nullable
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
index d015d2b..7d1dadb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.dataflow;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -69,7 +68,7 @@ class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}
- })).setWindowingStrategyInternal(outputStrategy);
+ })).setWindowingStrategyInternal(outputStrategy).setCoder(input.getCoder());
}
}
@@ -79,11 +78,6 @@ class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
}
@Override
- protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
- return input.getCoder();
- }
-
- @Override
protected String getKindString() {
return "Window.Into()";
}
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 7a99f75..f756065 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -606,11 +606,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
// Return a value unrelated to the input.
return input.getPipeline().apply(Create.of(1, 2, 3, 4));
}
-
- @Override
- protected Coder<?> getDefaultOutputCoder() {
- return VarIntCoder.of();
- }
}
/**
@@ -626,11 +621,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
return PDone.in(input.getPipeline());
}
-
- @Override
- protected Coder<?> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 824f725..cd5857c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
@@ -996,11 +995,6 @@ public class AvroIO {
DisplayData.item("tempDirectory", tempDirectory)
.withLabel("Directory for temporary files"));
}
-
- @Override
- protected Coder<Void> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 8505ca4..80a03eb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -114,12 +114,8 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
}
}));
}
- return read.apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn<T>()));
- }
-
- @Override
- protected Coder<T> getDefaultOutputCoder() {
- return source.getOutputCoder();
+ return read.apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn<T>()))
+ .setCoder(source.getOutputCoder());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 574ba0c..9b273f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.io;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.NameUtils;
@@ -160,11 +159,6 @@ public class Read {
}
@Override
- protected Coder<T> getDefaultOutputCoder() {
- return source.getOutputCoder();
- }
-
- @Override
public final PCollection<T> expand(PBegin input) {
source.validate();
return PCollection.createPrimitiveOutputInternal(
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 1b2e95b..c75051f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -35,8 +35,6 @@ import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.Read.Bounded;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.ResourceId;
@@ -171,11 +169,7 @@ public class TFRecordIO {
}
}
- final Bounded<byte[]> read = org.apache.beam.sdk.io.Read.from(getSource());
- PCollection<byte[]> pcol = input.getPipeline().apply("Read", read);
- // Honor the default output coder that would have been used by this PTransform.
- pcol.setCoder(getDefaultOutputCoder());
- return pcol;
+ return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
}
// Helper to create a source specific to the requested compression type.
@@ -212,11 +206,6 @@ public class TFRecordIO {
.addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
.withLabel("File Pattern"));
}
-
- @Override
- protected Coder<byte[]> getDefaultOutputCoder() {
- return ByteArrayCoder.of();
- }
}
/////////////////////////////////////////////////////////////////////////////
@@ -391,11 +380,6 @@ public class TFRecordIO {
.add(DisplayData.item("compressionType", getCompressionType().toString())
.withLabel("Compression Type"));
}
-
- @Override
- protected Coder<Void> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 312dc07..9a14ad9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -30,9 +30,7 @@ import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.CompressedSource.CompressionMode;
import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
@@ -337,11 +335,6 @@ public class TextIO {
.addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
.withLabel("File Pattern"));
}
-
- @Override
- protected Coder<String> getDefaultOutputCoder() {
- return StringUtf8Coder.of();
- }
}
/////////////////////////////////////////////////////////////////////////////
@@ -813,11 +806,6 @@ public class TextIO {
"writableByteChannelFactory", getWritableByteChannelFactory().toString())
.withLabel("Compression/Transformation Type"));
}
-
- @Override
- protected Coder<Void> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index c195352..fab98f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -2156,8 +2156,13 @@ public class Combine {
}).withSideInputs(sideInputs));
try {
- Coder<KV<K, OutputT>> outputCoder = getDefaultOutputCoder(input);
- output.setCoder(outputCoder);
+ KvCoder<K, InputT> kvCoder = getKvCoder(input.getCoder());
+ @SuppressWarnings("unchecked")
+ Coder<OutputT> outputValueCoder =
+ ((GlobalCombineFn<InputT, ?, OutputT>) fn)
+ .getDefaultOutputCoder(
+ input.getPipeline().getCoderRegistry(), kvCoder.getValueCoder());
+ output.setCoder(KvCoder.of(kvCoder.getKeyCoder(), outputValueCoder));
} catch (CannotProvideCoderException exc) {
// let coder inference happen later, if it can
}
@@ -2200,19 +2205,6 @@ public class Combine {
}
@Override
- public Coder<KV<K, OutputT>> getDefaultOutputCoder(
- PCollection<? extends KV<K, ? extends Iterable<InputT>>> input)
- throws CannotProvideCoderException {
- KvCoder<K, InputT> kvCoder = getKvCoder(input.getCoder());
- @SuppressWarnings("unchecked")
- Coder<OutputT> outputValueCoder =
- ((GlobalCombineFn<InputT, ?, OutputT>) fn)
- .getDefaultOutputCoder(
- input.getPipeline().getCoderRegistry(), kvCoder.getValueCoder());
- return KvCoder.of(kvCoder.getKeyCoder(), outputValueCoder);
- }
-
- @Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
Combine.populateDisplayData(builder, fn, fnDisplayData);
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index a28e9b2..2635bc8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -315,29 +315,25 @@ public class Create<T> {
@Override
public PCollection<T> expand(PBegin input) {
+ Coder<T> coder;
try {
- Coder<T> coder = getDefaultOutputCoder(input);
- try {
- CreateSource<T> source = CreateSource.fromIterable(elems, coder);
- return input.getPipeline().apply(Read.from(source));
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Unable to apply Create %s using Coder %s.", this, coder), e);
- }
+ CoderRegistry registry = input.getPipeline().getCoderRegistry();
+ coder =
+ this.coder.isPresent()
+ ? this.coder.get()
+ : typeDescriptor.isPresent()
+ ? registry.getCoder(typeDescriptor.get())
+ : getDefaultCreateCoder(registry, elems);
} catch (CannotProvideCoderException e) {
throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified. "
+ "Please set a coder by invoking Create.withCoder() explicitly.", e);
}
- }
-
- @Override
- public Coder<T> getDefaultOutputCoder(PBegin input) throws CannotProvideCoderException {
- if (coder.isPresent()) {
- return coder.get();
- } else if (typeDescriptor.isPresent()) {
- return input.getPipeline().getCoderRegistry().getCoder(typeDescriptor.get());
- } else {
- return getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), elems);
+ try {
+ CreateSource<T> source = CreateSource.fromIterable(elems, coder);
+ return input.getPipeline().apply(Read.from(source));
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Unable to apply Create %s using Coder %s.", this, coder), e);
}
}
@@ -570,7 +566,23 @@ public class Create<T> {
@Override
public PCollection<T> expand(PBegin input) {
try {
- Coder<T> coder = getDefaultOutputCoder(input);
+ Coder<T> coder;
+ if (elementCoder.isPresent()) {
+ coder = elementCoder.get();
+ } else if (typeDescriptor.isPresent()) {
+ coder = input.getPipeline().getCoderRegistry().getCoder(typeDescriptor.get());
+ } else {
+ Iterable<T> rawElements =
+ Iterables.transform(
+ timestampedElements,
+ new Function<TimestampedValue<T>, T>() {
+ @Override
+ public T apply(TimestampedValue<T> timestampedValue) {
+ return timestampedValue.getValue();
+ }
+ });
+ coder = getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), rawElements);
+ }
PCollection<TimestampedValue<T>> intermediate = Pipeline.applyTransform(input,
Create.of(timestampedElements).withCoder(TimestampedValueCoder.of(coder)));
@@ -610,26 +622,6 @@ public class Create<T> {
c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp());
}
}
-
- @Override
- public Coder<T> getDefaultOutputCoder(PBegin input) throws CannotProvideCoderException {
- if (elementCoder.isPresent()) {
- return elementCoder.get();
- } else if (typeDescriptor.isPresent()) {
- return input.getPipeline().getCoderRegistry().getCoder(typeDescriptor.get());
- } else {
- Iterable<T> rawElements =
- Iterables.transform(
- timestampedElements,
- new Function<TimestampedValue<T>, T>() {
- @Override
- public T apply(TimestampedValue<T> input) {
- return input.getValue();
- }
- });
- return getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), rawElements);
- }
- }
}
private static <T> Coder<T> getDefaultCreateCoder(CoderRegistry registry, Iterable<T> elems)
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index d0314eb..2fd12de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.transforms;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
@@ -229,19 +228,18 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
public PCollection<T> expand(PCollection<T> input) {
- return input.apply(ParDo.of(new DoFn<T, T>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- if (predicate.apply(c.element())) {
- c.output(c.element());
- }
- }
- }));
- }
-
- @Override
- protected Coder<T> getDefaultOutputCoder(PCollection<T> input) {
- return input.getCoder();
+ return input
+ .apply(
+ ParDo.of(
+ new DoFn<T, T>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ if (predicate.apply(c.element())) {
+ c.output(c.element());
+ }
+ }
+ }))
+ .setCoder(input.getCoder());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 58051df..f5e7830 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -277,13 +277,16 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
}
/**
- * Returns the default {@code Coder} to use for the output of this
- * single-output {@code PTransform}.
+ * Returns the default {@code Coder} to use for the output of this single-output {@code
+ * PTransform}.
*
* <p>By default, always throws
*
* @throws CannotProvideCoderException if no coder can be inferred
+ * @deprecated Instead, the PTransform should explicitly call {@link PCollection#setCoder} on the
+ * returned PCollection.
*/
+ @Deprecated
protected Coder<?> getDefaultOutputCoder() throws CannotProvideCoderException {
throw new CannotProvideCoderException("PTransform.getOutputCoder called.");
}
@@ -295,7 +298,10 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
* <p>By default, always throws.
*
* @throws CannotProvideCoderException if none can be inferred.
+ * @deprecated Instead, the PTransform should explicitly call {@link PCollection#setCoder} on the
+ * returned PCollection.
*/
+ @Deprecated
protected Coder<?> getDefaultOutputCoder(@SuppressWarnings("unused") InputT input)
throws CannotProvideCoderException {
return getDefaultOutputCoder();
@@ -308,7 +314,10 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
* <p>By default, always throws.
*
* @throws CannotProvideCoderException if none can be inferred.
+ * @deprecated Instead, the PTransform should explicitly call {@link PCollection#setCoder} on the
+ * returned PCollection.
*/
+ @Deprecated
public <T> Coder<T> getDefaultOutputCoder(
InputT input, @SuppressWarnings("unused") PCollection<T> output)
throws CannotProvideCoderException {
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index bc4f629..a0e1eb2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -636,19 +636,21 @@ public class ParDo {
@Override
public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
- finishSpecifyingStateSpecs(fn, input.getPipeline().getCoderRegistry(), input.getCoder());
+ CoderRegistry registry = input.getPipeline().getCoderRegistry();
+ finishSpecifyingStateSpecs(fn, registry, input.getCoder());
TupleTag<OutputT> mainOutput = new TupleTag<>();
- return input.apply(withOutputTags(mainOutput, TupleTagList.empty())).get(mainOutput);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- protected Coder<OutputT> getDefaultOutputCoder(PCollection<? extends InputT> input)
- throws CannotProvideCoderException {
- return input.getPipeline().getCoderRegistry().getCoder(
- getFn().getOutputTypeDescriptor(),
- getFn().getInputTypeDescriptor(),
- ((PCollection<InputT>) input).getCoder());
+ PCollection<OutputT> res =
+ input.apply(withOutputTags(mainOutput, TupleTagList.empty())).get(mainOutput);
+ try {
+ res.setCoder(
+ registry.getCoder(
+ getFn().getOutputTypeDescriptor(),
+ getFn().getInputTypeDescriptor(),
+ ((PCollection<InputT>) input).getCoder()));
+ } catch (CannotProvideCoderException e) {
+ // Ignore and leave coder unset.
+ }
+ return res;
}
@Override
@@ -757,7 +759,8 @@ public class ParDo {
validateWindowType(input, fn);
// Use coder registry to determine coders for all StateSpec defined in the fn signature.
- finishSpecifyingStateSpecs(fn, input.getPipeline().getCoderRegistry(), input.getCoder());
+ CoderRegistry registry = input.getPipeline().getCoderRegistry();
+ finishSpecifyingStateSpecs(fn, registry, input.getCoder());
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
if (signature.usesState() || signature.usesTimers()) {
@@ -771,6 +774,18 @@ public class ParDo {
Collections.<TupleTag<?>, Coder<?>>emptyMap(),
input.getWindowingStrategy(),
input.isBounded());
+ @SuppressWarnings("unchecked")
+ Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
+ for (PCollection<?> out : outputs.getAll().values()) {
+ try {
+ out.setCoder(
+ (Coder)
+ registry.getCoder(
+ out.getTypeDescriptor(), getFn().getInputTypeDescriptor(), inputCoder));
+ } catch (CannotProvideCoderException e) {
+ // Ignore and let coder inference happen later.
+ }
+ }
// The fn will likely be an instance of an anonymous subclass
// such as DoFn<Integer, String> { }, thus will have a high-fidelity
@@ -781,24 +796,6 @@ public class ParDo {
}
@Override
- protected Coder<OutputT> getDefaultOutputCoder() {
- throw new RuntimeException(
- "internal error: shouldn't be calling this on a multi-output ParDo");
- }
-
- @Override
- public <T> Coder<T> getDefaultOutputCoder(
- PCollection<? extends InputT> input, PCollection<T> output)
- throws CannotProvideCoderException {
- @SuppressWarnings("unchecked")
- Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
- return input.getPipeline().getCoderRegistry().getCoder(
- output.getTypeDescriptor(),
- getFn().getInputTypeDescriptor(),
- inputCoder);
- }
-
- @Override
protected String getKindString() {
return String.format("ParMultiDo(%s)", NameUtils.approximateSimpleName(getFn()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index af583e5..2337798 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -23,7 +23,6 @@ import com.google.common.collect.Ordering;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -453,11 +452,6 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
}
@Override
- protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
- return input.getCoder();
- }
-
- @Override
protected String getKindString() {
return "Window.Into()";
}
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index bf06d78..9956d5c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -27,9 +27,7 @@ import java.io.File;
import java.util.Arrays;
import java.util.EnumSet;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.WriteFiles;
@@ -109,11 +107,6 @@ public class TransformTreeTest {
return PDone.in(input.getPipeline());
}
-
- @Override
- protected Coder<?> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 6be6772..81ad947 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -62,7 +62,6 @@ import org.apache.beam.sdk.transforms.Create.Values.CreateSource;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -314,28 +313,24 @@ public class CreateTest {
@Test
public void testCreateTimestampedDefaultOutputCoderUsingCoder() throws Exception {
Coder<Record> coder = new RecordCoder();
- PBegin pBegin = PBegin.in(p);
Create.TimestampedValues<Record> values =
Create.timestamped(
TimestampedValue.of(new Record(), new Instant(0)),
TimestampedValue.<Record>of(new Record2(), new Instant(0)))
.withCoder(coder);
- Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
- assertThat(defaultCoder, equalTo(coder));
+ assertThat(p.apply(values).getCoder(), equalTo(coder));
}
@Test
public void testCreateTimestampedDefaultOutputCoderUsingTypeDescriptor() throws Exception {
Coder<Record> coder = new RecordCoder();
p.getCoderRegistry().registerCoderForClass(Record.class, coder);
- PBegin pBegin = PBegin.in(p);
Create.TimestampedValues<Record> values =
Create.timestamped(
TimestampedValue.of(new Record(), new Instant(0)),
TimestampedValue.<Record>of(new Record2(), new Instant(0)))
.withType(new TypeDescriptor<Record>() {});
- Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
- assertThat(defaultCoder, equalTo(coder));
+ assertThat(p.apply(values).getCoder(), equalTo(coder));
}
@Test
@@ -417,31 +412,25 @@ public class CreateTest {
public void testCreateDefaultOutputCoderUsingInference() throws Exception {
Coder<Record> coder = new RecordCoder();
p.getCoderRegistry().registerCoderForClass(Record.class, coder);
- PBegin pBegin = PBegin.in(p);
- Create.Values<Record> values = Create.of(new Record(), new Record(), new Record());
- Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
- assertThat(defaultCoder, equalTo(coder));
+ assertThat(
+ p.apply(Create.of(new Record(), new Record(), new Record())).getCoder(), equalTo(coder));
}
@Test
public void testCreateDefaultOutputCoderUsingCoder() throws Exception {
Coder<Record> coder = new RecordCoder();
- PBegin pBegin = PBegin.in(p);
- Create.Values<Record> values =
- Create.of(new Record(), new Record2()).withCoder(coder);
- Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
- assertThat(defaultCoder, equalTo(coder));
+ assertThat(
+ p.apply(Create.of(new Record(), new Record2()).withCoder(coder)).getCoder(),
+ equalTo(coder));
}
@Test
public void testCreateDefaultOutputCoderUsingTypeDescriptor() throws Exception {
Coder<Record> coder = new RecordCoder();
p.getCoderRegistry().registerCoderForClass(Record.class, coder);
- PBegin pBegin = PBegin.in(p);
Create.Values<Record> values =
Create.of(new Record(), new Record2()).withType(new TypeDescriptor<Record>() {});
- Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
- assertThat(defaultCoder, equalTo(coder));
+ assertThat(p.apply(values).getCoder(), equalTo(coder));
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
index d1b4d07..cb9d984 100644
--- a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
@@ -76,18 +76,14 @@ public class SortValues<PrimaryKeyT, SecondaryKeyT, ValueT>
@Override
public PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> expand(
PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> input) {
- return input.apply(
- ParDo.of(
- new SortValuesDoFn<PrimaryKeyT, SecondaryKeyT, ValueT>(
- sorterOptions,
- getSecondaryKeyCoder(input.getCoder()),
- getValueCoder(input.getCoder()))));
- }
-
- @Override
- protected Coder<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> getDefaultOutputCoder(
- PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> input) {
- return input.getCoder();
+ return input
+ .apply(
+ ParDo.of(
+ new SortValuesDoFn<PrimaryKeyT, SecondaryKeyT, ValueT>(
+ sorterOptions,
+ getSecondaryKeyCoder(input.getCoder()),
+ getValueCoder(input.getCoder()))))
+ .setCoder(input.getCoder());
}
/** Retrieves the {@link Coder} for the secondary key-value pairs. */
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 7ca4ce2..6edbd06 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -50,7 +50,6 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileSystems;
@@ -543,9 +542,7 @@ public class BigQueryIO {
p.apply("TriggerIdCreation", Create.of(staticJobUuid))
.apply("ViewId", View.<String>asSingleton());
// Apply the traditional Source model.
- rows =
- p.apply(org.apache.beam.sdk.io.Read.from(createSource(staticJobUuid)))
- .setCoder(getDefaultOutputCoder());
+ rows = p.apply(org.apache.beam.sdk.io.Read.from(createSource(staticJobUuid)));
} else {
// Create a singleton job ID token at execution time.
jobIdTokenCollection =
@@ -625,7 +622,8 @@ public class BigQueryIO {
}
}
})
- .withSideInputs(schemaView, jobIdTokenView));
+ .withSideInputs(schemaView, jobIdTokenView))
+ .setCoder(TableRowJsonCoder.of());
}
PassThroughThenCleanup.CleanupOperation cleanupOperation =
new PassThroughThenCleanup.CleanupOperation() {
@@ -658,11 +656,6 @@ public class BigQueryIO {
}
@Override
- protected Coder<TableRow> getDefaultOutputCoder() {
- return TableRowJsonCoder.of();
- }
-
- @Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
@@ -1141,11 +1134,6 @@ public class BigQueryIO {
}
@Override
- protected Coder<Void> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
-
- @Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
index de26c8d..2f7da08 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
@@ -74,7 +74,7 @@ class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T
})
.withSideInputs(jobIdSideInput, cleanupSignalView));
- return outputs.get(mainOutput);
+ return outputs.get(mainOutput).setCoder(input.getCoder());
}
private static class IdentityFn<T> extends DoFn<T, T> {
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
index ba09cb3..747f2b0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
@@ -19,8 +19,6 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableRow;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
@@ -67,12 +65,6 @@ public class StreamingInserts<DestinationT>
return new StreamingInserts<>(
createDisposition, dynamicDestinations, bigQueryServices, retryPolicy); }
-
- @Override
- protected Coder<Void> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
-
@Override
public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
PCollection<KV<TableDestination, TableRow>> writes =
http://git-wip-us.apache.org/repos/asf/beam/blob/95e2a00a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 4f33d61..46c2df4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
@@ -727,7 +726,7 @@ public class PubsubIO {
getTimestampAttribute(),
getIdAttribute(),
getNeedsAttributes());
- return input.apply(source).apply(MapElements.via(getParseFn()));
+ return input.apply(source).apply(MapElements.via(getParseFn())).setCoder(getCoder());
}
@Override
@@ -743,11 +742,6 @@ public class PubsubIO {
.withLabel("Pubsub Subscription"));
}
}
-
- @Override
- protected Coder<T> getDefaultOutputCoder() {
- return getCoder();
- }
}
/////////////////////////////////////////////////////////////////////////////
@@ -870,11 +864,6 @@ public class PubsubIO {
builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider());
}
- @Override
- protected Coder<Void> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
-
/**
* Writer to Pubsub which batches messages from bounded collections.
*
[6/6] beam git commit: This closes #3649: [BEAM-2686] Towards
deprecating PCollection.setCoder()
Posted by jk...@apache.org.
This closes #3649: [BEAM-2686] Towards deprecating PCollection.setCoder()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e6530ad
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e6530ad
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e6530ad
Branch: refs/heads/master
Commit: 9e6530adb00669b7cf0f01cb8b128be0a21fd721
Parents: 38f1890 48690bc
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 3 17:45:41 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 3 17:45:41 2017 -0700
----------------------------------------------------------------------
.../apache/beam/examples/complete/TfIdf.java | 14 +-
.../apache/beam/runners/apex/ApexRunner.java | 10 +-
.../FlattenPCollectionTranslator.java | 15 +-
.../apex/translation/ParDoTranslator.java | 7 +-
.../apex/translation/utils/ValuesSource.java | 2 +-
.../apex/examples/UnboundedTextSource.java | 2 +-
.../translation/ApexGroupByKeyOperatorTest.java | 6 +-
.../translation/GroupByKeyTranslatorTest.java | 2 +-
.../translation/utils/CollectionSource.java | 2 +-
.../core/construction/ForwardingPTransform.java | 18 ++-
.../construction/PCollectionTranslation.java | 8 +-
.../construction/PTransformReplacements.java | 6 +
.../core/construction/PrimitiveCreate.java | 14 +-
.../core/construction/SplittableParDo.java | 62 ++++----
.../UnboundedReadFromBoundedSource.java | 11 +-
.../construction/ForwardingPTransformTest.java | 17 ++-
.../construction/PTransformMatchersTest.java | 79 +++++-----
.../core/construction/ReadTranslationTest.java | 4 +-
.../construction/ReplacementOutputsTest.java | 14 +-
.../core/construction/SplittableParDoTest.java | 33 +++--
.../UnboundedReadFromBoundedSourceTest.java | 2 +-
.../core/GroupByKeyViaGroupByKeyOnly.java | 15 +-
.../core/SplittableParDoViaKeyedWorkItems.java | 10 +-
.../beam/runners/direct/DirectGroupByKey.java | 31 ++--
.../beam/runners/direct/MultiStepCombine.java | 10 +-
.../direct/ParDoMultiOverrideFactory.java | 5 +-
.../direct/TestStreamEvaluatorFactory.java | 8 +-
.../runners/direct/ViewOverrideFactory.java | 5 +-
.../direct/BoundedReadEvaluatorFactoryTest.java | 2 +-
.../runners/direct/CommittedResultTest.java | 26 +++-
.../beam/runners/direct/DirectRunnerTest.java | 4 +-
.../runners/direct/EvaluationContextTest.java | 8 +-
.../UnboundedReadEvaluatorFactoryTest.java | 2 +-
.../runners/flink/CreateStreamingFlinkView.java | 5 +-
.../streaming/io/UnboundedSocketSource.java | 2 +-
.../flink/streaming/TestCountingSource.java | 2 +-
.../beam/runners/dataflow/AssignWindows.java | 12 +-
.../runners/dataflow/BatchViewOverrides.java | 19 +--
.../runners/dataflow/CreateDataflowView.java | 5 +-
.../beam/runners/dataflow/DataflowRunner.java | 147 ++++++++++---------
.../dataflow/PrimitiveParDoSingleFactory.java | 12 +-
.../dataflow/SplittableParDoOverrides.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 21 +--
.../runners/dataflow/DataflowRunnerTest.java | 11 +-
.../transforms/DataflowGroupByKeyTest.java | 12 +-
.../dataflow/transforms/DataflowViewTest.java | 14 +-
.../beam/runners/spark/io/CreateStream.java | 11 +-
.../beam/runners/spark/io/MicrobatchSource.java | 4 +-
.../runners/spark/io/SparkUnboundedSource.java | 2 +-
.../spark/stateful/StateSpecFunctions.java | 2 +-
.../translation/StorageLevelPTransform.java | 10 +-
.../util/SinglePrimitiveOutputPTransform.java | 51 -------
.../java/org/apache/beam/sdk/io/AvroIO.java | 6 -
.../java/org/apache/beam/sdk/io/AvroSource.java | 2 +-
.../sdk/io/BoundedReadFromUnboundedSource.java | 12 +-
.../apache/beam/sdk/io/CompressedSource.java | 6 +-
.../org/apache/beam/sdk/io/CountingSource.java | 4 +-
.../main/java/org/apache/beam/sdk/io/Read.java | 27 ++--
.../java/org/apache/beam/sdk/io/Source.java | 14 +-
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 20 +--
.../java/org/apache/beam/sdk/io/TextIO.java | 12 --
.../java/org/apache/beam/sdk/io/TextSource.java | 2 +-
.../beam/sdk/testing/SourceTestUtils.java | 12 +-
.../org/apache/beam/sdk/testing/TestStream.java | 5 +-
.../org/apache/beam/sdk/transforms/Combine.java | 22 +--
.../org/apache/beam/sdk/transforms/Create.java | 72 ++++-----
.../org/apache/beam/sdk/transforms/Filter.java | 26 ++--
.../org/apache/beam/sdk/transforms/Flatten.java | 22 +--
.../apache/beam/sdk/transforms/GroupByKey.java | 12 +-
.../apache/beam/sdk/transforms/PTransform.java | 13 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 61 ++++----
.../org/apache/beam/sdk/transforms/View.java | 5 +-
.../beam/sdk/transforms/windowing/Window.java | 8 +-
.../org/apache/beam/sdk/values/PCollection.java | 9 +-
.../beam/sdk/values/PCollectionTuple.java | 10 +-
.../beam/sdk/io/CompressedSourceTest.java | 2 +-
.../apache/beam/sdk/io/FileBasedSourceTest.java | 2 +-
.../beam/sdk/io/OffsetBasedSourceTest.java | 2 +-
.../java/org/apache/beam/sdk/io/ReadTest.java | 4 +-
.../sdk/runners/TransformHierarchyTest.java | 41 +++---
.../beam/sdk/runners/TransformTreeTest.java | 19 +--
.../runners/dataflow/TestCountingSource.java | 2 +-
.../apache/beam/sdk/transforms/CreateTest.java | 31 ++--
.../apache/beam/sdk/transforms/FlattenTest.java | 2 +-
.../beam/sdk/transforms/GroupByKeyTest.java | 11 +-
.../apache/beam/sdk/transforms/ViewTest.java | 11 +-
.../beam/sdk/values/PCollectionTupleTest.java | 7 +-
.../beam/sdk/extensions/sorter/SortValues.java | 20 +--
.../org/apache/beam/sdk/io/amqp/AmqpIO.java | 2 +-
.../beam/sdk/io/cassandra/CassandraIO.java | 2 +-
.../sdk/io/elasticsearch/ElasticsearchIO.java | 2 +-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 18 +--
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 4 +-
.../io/gcp/bigquery/PassThroughThenCleanup.java | 2 +-
.../sdk/io/gcp/bigquery/StreamingInserts.java | 8 -
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 2 +-
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 13 +-
.../io/gcp/pubsub/PubsubUnboundedSource.java | 2 +-
.../hadoop/inputformat/HadoopInputFormatIO.java | 2 +-
.../org/apache/beam/sdk/io/hbase/HBaseIO.java | 2 +-
.../apache/beam/sdk/io/hcatalog/HCatalogIO.java | 2 +-
.../java/org/apache/beam/sdk/io/jms/JmsIO.java | 2 +-
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +-
.../beam/sdk/io/kinesis/KinesisSource.java | 2 +-
.../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 2 +-
.../apache/beam/sdk/io/mongodb/MongoDbIO.java | 2 +-
.../org/apache/beam/sdk/io/mqtt/MqttIO.java | 2 +-
.../org/apache/beam/sdk/io/xml/XmlSource.java | 2 +-
108 files changed, 631 insertions(+), 778 deletions(-)
----------------------------------------------------------------------
[4/6] beam git commit: Makes all Source classes override
getOutputCoder instead of getDefaultOutputCoder
Posted by jk...@apache.org.
Makes all Source classes override getOutputCoder instead of getDefaultOutputCoder
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e017a0ec
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e017a0ec
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e017a0ec
Branch: refs/heads/master
Commit: e017a0ec8a16b63828d0955f405b23bc9771bc9e
Parents: 38f1890
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 26 16:05:27 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 3 15:40:46 2017 -0700
----------------------------------------------------------------------
.../runners/apex/translation/utils/ValuesSource.java | 2 +-
.../runners/apex/examples/UnboundedTextSource.java | 2 +-
.../apex/translation/GroupByKeyTranslatorTest.java | 2 +-
.../apex/translation/utils/CollectionSource.java | 2 +-
.../construction/UnboundedReadFromBoundedSource.java | 8 ++++----
.../core/construction/ReadTranslationTest.java | 4 ++--
.../UnboundedReadFromBoundedSourceTest.java | 2 +-
.../direct/BoundedReadEvaluatorFactoryTest.java | 2 +-
.../apache/beam/runners/direct/DirectRunnerTest.java | 4 ++--
.../direct/UnboundedReadEvaluatorFactoryTest.java | 2 +-
.../wrappers/streaming/io/UnboundedSocketSource.java | 2 +-
.../runners/flink/streaming/TestCountingSource.java | 2 +-
.../apache/beam/runners/dataflow/DataflowRunner.java | 6 +++---
.../beam/runners/spark/io/MicrobatchSource.java | 4 ++--
.../beam/runners/spark/io/SparkUnboundedSource.java | 2 +-
.../runners/spark/stateful/StateSpecFunctions.java | 2 +-
.../main/java/org/apache/beam/sdk/io/AvroSource.java | 2 +-
.../beam/sdk/io/BoundedReadFromUnboundedSource.java | 6 +++---
.../java/org/apache/beam/sdk/io/CompressedSource.java | 6 +++---
.../java/org/apache/beam/sdk/io/CountingSource.java | 4 ++--
.../src/main/java/org/apache/beam/sdk/io/Read.java | 4 ++--
.../src/main/java/org/apache/beam/sdk/io/Source.java | 14 ++++++++++----
.../main/java/org/apache/beam/sdk/io/TFRecordIO.java | 2 +-
.../main/java/org/apache/beam/sdk/io/TextSource.java | 2 +-
.../org/apache/beam/sdk/testing/SourceTestUtils.java | 12 ++++++------
.../java/org/apache/beam/sdk/transforms/Create.java | 2 +-
.../org/apache/beam/sdk/io/CompressedSourceTest.java | 2 +-
.../org/apache/beam/sdk/io/FileBasedSourceTest.java | 2 +-
.../org/apache/beam/sdk/io/OffsetBasedSourceTest.java | 2 +-
.../test/java/org/apache/beam/sdk/io/ReadTest.java | 4 ++--
.../beam/sdk/runners/dataflow/TestCountingSource.java | 2 +-
.../org/apache/beam/sdk/transforms/CreateTest.java | 4 ++--
.../main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java | 2 +-
.../org/apache/beam/sdk/io/cassandra/CassandraIO.java | 2 +-
.../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 2 +-
.../beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java | 4 ++--
.../apache/beam/sdk/io/gcp/bigtable/BigtableIO.java | 2 +-
.../beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 2 +-
.../io/hadoop/inputformat/HadoopInputFormatIO.java | 2 +-
.../java/org/apache/beam/sdk/io/hbase/HBaseIO.java | 2 +-
.../org/apache/beam/sdk/io/hcatalog/HCatalogIO.java | 2 +-
.../main/java/org/apache/beam/sdk/io/jms/JmsIO.java | 2 +-
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +-
.../org/apache/beam/sdk/io/kinesis/KinesisSource.java | 2 +-
.../apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java | 2 +-
.../org/apache/beam/sdk/io/mongodb/MongoDbIO.java | 2 +-
.../main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java | 2 +-
.../java/org/apache/beam/sdk/io/xml/XmlSource.java | 2 +-
48 files changed, 79 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
index 41f027f..4a00ff1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
@@ -81,7 +81,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
}
@Override
- public Coder<T> getDefaultOutputCoder() {
+ public Coder<T> getOutputCoder() {
return iterableCoder.getElemCoder();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
index c590a2e..8f3e6bc 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
@@ -59,7 +59,7 @@ public class UnboundedTextSource extends UnboundedSource<String, UnboundedSource
}
@Override
- public Coder<String> getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
index 9c61b47..58f33ae 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
@@ -153,7 +153,7 @@ public class GroupByKeyTranslatorTest {
}
@Override
- public Coder<String> getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
index 288aade..01a2a85 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
@@ -67,7 +67,7 @@ public class CollectionSource<T> extends UnboundedSource<T, UnboundedSource.Chec
}
@Override
- public Coder<T> getDefaultOutputCoder() {
+ public Coder<T> getOutputCoder() {
return coder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 24eb384..f35f4c3 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -92,7 +92,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
@Override
protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
+ return source.getOutputCoder();
}
@Override
@@ -166,14 +166,14 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
}
@Override
- public Coder<T> getDefaultOutputCoder() {
- return boundedSource.getDefaultOutputCoder();
+ public Coder<T> getOutputCoder() {
+ return boundedSource.getOutputCoder();
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
- return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
+ return new CheckpointCoder<>(boundedSource.getOutputCoder());
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
index 740b324..f85bd79 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReadTranslationTest.java
@@ -112,7 +112,7 @@ public class ReadTranslationTest {
public void validate() {}
@Override
- public Coder<String> getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
@@ -132,7 +132,7 @@ public class ReadTranslationTest {
public void validate() {}
@Override
- public Coder<byte[]> getDefaultOutputCoder() {
+ public Coder<byte[]> getOutputCoder() {
return ByteArrayCoder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
index 0e48a9d..62b06b7 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
@@ -320,7 +320,7 @@ public class UnboundedReadFromBoundedSourceTest {
}
@Override
- public Coder<Byte> getDefaultOutputCoder() {
+ public Coder<Byte> getOutputCoder() {
return SerializableCoder.of(Byte.class);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 6180d29..3d81884 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -395,7 +395,7 @@ public class BoundedReadEvaluatorFactoryTest {
}
@Override
- public Coder<T> getDefaultOutputCoder() {
+ public Coder<T> getOutputCoder() {
return coder;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 943d27c..d3f407a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -573,8 +573,8 @@ public class DirectRunnerTest implements Serializable {
}
@Override
- public Coder<T> getDefaultOutputCoder() {
- return underlying.getDefaultOutputCoder();
+ public Coder<T> getOutputCoder() {
+ return underlying.getOutputCoder();
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 2a01db5..cc6847d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -477,7 +477,7 @@ public class UnboundedReadEvaluatorFactoryTest {
public void validate() {}
@Override
- public Coder<T> getDefaultOutputCoder() {
+ public Coder<T> getOutputCoder() {
return coder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
index 910a33f..49e4ddc 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -123,7 +123,7 @@ public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.Check
}
@Override
- public Coder getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return DEFAULT_SOCKET_CODER;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
index edf548a..fcb9282 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
@@ -238,7 +238,7 @@ public class TestCountingSource
public void validate() {}
@Override
- public Coder<KV<Integer, Integer>> getDefaultOutputCoder() {
+ public Coder<KV<Integer, Integer>> getOutputCoder() {
return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index f8d2c3c..8fce5b4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1176,7 +1176,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
+ return source.getOutputCoder();
}
@Override
@@ -1212,7 +1212,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
protected Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
- return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getDefaultOutputCoder());
+ return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getOutputCoder());
}
@Override
@@ -1291,7 +1291,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
+ return source.getOutputCoder();
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 3b48caf..ae873a3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -140,8 +140,8 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
}
@Override
- public Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
+ public Coder<T> getOutputCoder() {
+ return source.getOutputCoder();
}
public Coder<CheckpointMarkT> getCheckpointMarkCoder() {
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index b31aa9f..26af0c0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -116,7 +116,7 @@ public class SparkUnboundedSource {
// output the actual (deserialized) stream.
WindowedValue.FullWindowedValueCoder<T> coder =
WindowedValue.FullWindowedValueCoder.of(
- source.getDefaultOutputCoder(),
+ source.getOutputCoder(),
GlobalWindow.Coder.INSTANCE);
JavaDStream<WindowedValue<T>> readUnboundedStream =
mapWithStateDStream
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 1b54478..ca54715 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -161,7 +161,7 @@ public class StateSpecFunctions {
final List<byte[]> readValues = new ArrayList<>();
WindowedValue.FullWindowedValueCoder<T> coder =
WindowedValue.FullWindowedValueCoder.of(
- source.getDefaultOutputCoder(),
+ source.getOutputCoder(),
GlobalWindow.Coder.INSTANCE);
try {
// measure how long a read takes per-partition.
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index d277503..8dd3125 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -315,7 +315,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
}
@Override
- public Coder<T> getDefaultOutputCoder() {
+ public Coder<T> getOutputCoder() {
return mode.getOutputCoder();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index c882447..8505ca4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -119,7 +119,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
@Override
protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
+ return source.getOutputCoder();
}
@Override
@@ -211,8 +211,8 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
}
@Override
- public Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
- return ValueWithRecordId.ValueWithRecordIdCoder.of(getSource().getDefaultOutputCoder());
+ public Coder<ValueWithRecordId<T>> getOutputCoder() {
+ return ValueWithRecordId.ValueWithRecordIdCoder.of(getSource().getOutputCoder());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index ad81b61..6943a02 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -404,11 +404,11 @@ public class CompressedSource<T> extends FileBasedSource<T> {
}
/**
- * Returns the delegate source's default output coder.
+ * Returns the delegate source's output coder.
*/
@Override
- public final Coder<T> getDefaultOutputCoder() {
- return sourceDelegate.getDefaultOutputCoder();
+ public final Coder<T> getOutputCoder() {
+ return sourceDelegate.getOutputCoder();
}
public final DecompressingChannelFactory getChannelFactory() {
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index 6202c2b..b47edc7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -188,7 +188,7 @@ public class CountingSource {
}
@Override
- public Coder<Long> getDefaultOutputCoder() {
+ public Coder<Long> getOutputCoder() {
return VarLongCoder.of();
}
@@ -364,7 +364,7 @@ public class CountingSource {
public void validate() {}
@Override
- public Coder<Long> getDefaultOutputCoder() {
+ public Coder<Long> getOutputCoder() {
return VarLongCoder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index a07fca8..6e6750d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -96,7 +96,7 @@ public class Read {
@Override
protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
+ return source.getOutputCoder();
}
@Override
@@ -164,7 +164,7 @@ public class Read {
@Override
protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
+ return source.getOutputCoder();
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
index 542d91c..32a7270 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
@@ -61,10 +61,16 @@ public abstract class Source<T> implements Serializable, HasDisplayData {
*/
public abstract void validate();
- /**
- * Returns the default {@code Coder} to use for the data read from this source.
- */
- public abstract Coder<T> getDefaultOutputCoder();
+ /** @deprecated Override {@link #getOutputCoder()} instead. */
+ @Deprecated
+ public Coder<T> getDefaultOutputCoder() {
+ throw new UnsupportedOperationException("Source needs to override getOutputCoder()");
+ }
+
+ /** Returns the {@code Coder} to use for the data read from this source. */
+ public Coder<T> getOutputCoder() {
+ return getDefaultOutputCoder();
+ }
/**
* {@inheritDoc}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 29b3e29..1b2e95b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -474,7 +474,7 @@ public class TFRecordIO {
}
@Override
- public Coder<byte[]> getDefaultOutputCoder() {
+ public Coder<byte[]> getOutputCoder() {
return DEFAULT_BYTE_ARRAY_CODER;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
index 4d9fa77..86c73d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -69,7 +69,7 @@ class TextSource extends FileBasedSource<String> {
}
@Override
- public Coder<String> getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
index cde0b94..e147221 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
@@ -212,7 +212,7 @@ public class SourceTestUtils {
List<? extends BoundedSource<T>> sources,
PipelineOptions options)
throws Exception {
- Coder<T> coder = referenceSource.getDefaultOutputCoder();
+ Coder<T> coder = referenceSource.getOutputCoder();
List<T> referenceRecords = readFromSource(referenceSource, options);
List<T> bundleRecords = new ArrayList<>();
for (BoundedSource<T> source : sources) {
@@ -221,7 +221,7 @@ public class SourceTestUtils {
+ source
+ " is not compatible with Coder type for referenceSource "
+ referenceSource,
- source.getDefaultOutputCoder(),
+ source.getOutputCoder(),
equalTo(coder));
List<T> elems = readFromSource(source, options);
bundleRecords.addAll(elems);
@@ -239,7 +239,7 @@ public class SourceTestUtils {
*/
public static <T> void assertUnstartedReaderReadsSameAsItsSource(
BoundedSource.BoundedReader<T> reader, PipelineOptions options) throws Exception {
- Coder<T> coder = reader.getCurrentSource().getDefaultOutputCoder();
+ Coder<T> coder = reader.getCurrentSource().getOutputCoder();
List<T> expected = readFromUnstartedReader(reader);
List<T> actual = readFromSource(reader.getCurrentSource(), options);
List<ReadableStructuralValue<T>> expectedStructural = createStructuralValues(coder, expected);
@@ -415,7 +415,7 @@ public class SourceTestUtils {
source,
primary,
residual);
- Coder<T> coder = primary.getDefaultOutputCoder();
+ Coder<T> coder = primary.getOutputCoder();
List<ReadableStructuralValue<T>> primaryValues =
createStructuralValues(coder, primaryItems);
List<ReadableStructuralValue<T>> currentValues =
@@ -728,8 +728,8 @@ public class SourceTestUtils {
}
@Override
- public Coder<T> getDefaultOutputCoder() {
- return boundedSource.getDefaultOutputCoder();
+ public Coder<T> getOutputCoder() {
+ return boundedSource.getOutputCoder();
}
private static class UnsplittableReader<T> extends BoundedReader<T> {
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 09e12ef..a28e9b2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -411,7 +411,7 @@ public class Create<T> {
public void validate() {}
@Override
- public Coder<T> getDefaultOutputCoder() {
+ public Coder<T> getOutputCoder() {
return coder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index fa28e4b..fe6f01f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -638,7 +638,7 @@ public class CompressedSourceTest {
}
@Override
- public Coder<Byte> getDefaultOutputCoder() {
+ public Coder<Byte> getOutputCoder() {
return SerializableCoder.of(Byte.class);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index c15e667..1bdb915 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -107,7 +107,7 @@ public class FileBasedSourceTest {
public void validate() {}
@Override
- public Coder<String> getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
index 25168a3..feda355 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
@@ -65,7 +65,7 @@ public class OffsetBasedSourceTest {
public void validate() {}
@Override
- public Coder<Integer> getDefaultOutputCoder() {
+ public Coder<Integer> getOutputCoder() {
return BigEndianIntegerCoder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
index 74acf18..4277dc3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
@@ -171,7 +171,7 @@ public class ReadTest implements Serializable{
public void validate() {}
@Override
- public Coder<String> getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
}
@@ -207,7 +207,7 @@ public class ReadTest implements Serializable{
public void validate() {}
@Override
- public Coder<String> getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
index 9fcc3c5..338ea38 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
@@ -248,7 +248,7 @@ public class TestCountingSource
public void validate() {}
@Override
- public Coder<KV<Integer, Integer>> getDefaultOutputCoder() {
+ public Coder<KV<Integer, Integer>> getOutputCoder() {
return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 6a682ef..6be6772 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -487,12 +487,12 @@ public class CreateTest {
}
@Test
- public void testSourceGetDefaultOutputCoderReturnsConstructorCoder() throws Exception {
+ public void testSourceGetOutputCoderReturnsConstructorCoder() throws Exception {
Coder<Integer> coder = VarIntCoder.of();
CreateSource<Integer> source =
CreateSource.fromIterable(ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), coder);
- Coder<Integer> defaultCoder = source.getDefaultOutputCoder();
+ Coder<Integer> defaultCoder = source.getOutputCoder();
assertThat(defaultCoder, equalTo(coder));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
index 1f307b2..508373f 100644
--- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
+++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java
@@ -246,7 +246,7 @@ public class AmqpIO {
}
@Override
- public Coder<Message> getDefaultOutputCoder() {
+ public Coder<Message> getOutputCoder() {
return new AmqpMessageCoder();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 32905b7..eacc3e4 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -289,7 +289,7 @@ public class CassandraIO {
}
@Override
- public Coder<T> getDefaultOutputCoder() {
+ public Coder<T> getOutputCoder() {
return spec.coder();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index acc7f2f..5046888 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -484,7 +484,7 @@ public class ElasticsearchIO {
}
@Override
- public Coder<String> getDefaultOutputCoder() {
+ public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 6c118a0..abe559c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -133,7 +133,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
}
@Override
- public Coder<TableRow> getDefaultOutputCoder() {
+ public Coder<TableRow> getOutputCoder() {
return TableRowJsonCoder.of();
}
@@ -184,7 +184,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
for (ResourceId file : files) {
avroSources.add(
- AvroSource.from(file.toString()).withParseFn(function, getDefaultOutputCoder()));
+ AvroSource.from(file.toString()).withParseFn(function, getOutputCoder()));
}
return ImmutableList.copyOf(avroSources);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 0a90dde..c5b0fbf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -893,7 +893,7 @@ public class BigtableIO {
}
@Override
- public Coder<Row> getDefaultOutputCoder() {
+ public Coder<Row> getOutputCoder() {
return ProtoCoder.of(Row.class);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index b7df804..8da6ff4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -1164,7 +1164,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
}
@Override
- public Coder<PubsubMessage> getDefaultOutputCoder() {
+ public Coder<PubsubMessage> getOutputCoder() {
return outer.getNeedsAttributes()
? PubsubMessageWithAttributesCoder.of()
: PubsubMessagePayloadOnlyCoder.of();
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index 0b4c23f..20ca50a 100644
--- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -552,7 +552,7 @@ public class HadoopInputFormatIO {
}
@Override
- public Coder<KV<K, V>> getDefaultOutputCoder() {
+ public Coder<KV<K, V>> getOutputCoder() {
return KvCoder.of(keyCoder, valueCoder);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 90ede4c..2ba6826 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -457,7 +457,7 @@ public class HBaseIO {
}
@Override
- public Coder<Result> getDefaultOutputCoder() {
+ public Coder<Result> getOutputCoder() {
return HBaseResultCoder.of();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
index 4199b80..d8e462b 100644
--- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
+++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
@@ -210,7 +210,7 @@ public class HCatalogIO {
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
- public Coder<HCatRecord> getDefaultOutputCoder() {
+ public Coder<HCatRecord> getOutputCoder() {
return (Coder) WritableCoder.of(DefaultHCatRecord.class);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index f8cba5e..2af0ce9 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -373,7 +373,7 @@ public class JmsIO {
}
@Override
- public Coder<JmsRecord> getDefaultOutputCoder() {
+ public Coder<JmsRecord> getOutputCoder() {
return SerializableCoder.of(JmsRecord.class);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 026313a..7fb4260 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -844,7 +844,7 @@ public class KafkaIO {
}
@Override
- public Coder<KafkaRecord<K, V>> getDefaultOutputCoder() {
+ public Coder<KafkaRecord<K, V>> getOutputCoder() {
return KafkaRecordCoder.of(spec.getKeyCoder(), spec.getValueCoder());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 362792b..144bd80 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -107,7 +107,7 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi
}
@Override
- public Coder<KinesisRecord> getDefaultOutputCoder() {
+ public Coder<KinesisRecord> getOutputCoder() {
return KinesisRecordCoder.of();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index 5b5412c..c612d52 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -440,7 +440,7 @@ public class MongoDbGridFSIO {
}
@Override
- public Coder<ObjectId> getDefaultOutputCoder() {
+ public Coder<ObjectId> getOutputCoder() {
return SerializableCoder.of(ObjectId.class);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 3b14182..087123a 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -277,7 +277,7 @@ public class MongoDbIO {
}
@Override
- public Coder<Document> getDefaultOutputCoder() {
+ public Coder<Document> getOutputCoder() {
return SerializableCoder.of(Document.class);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index add5cb5..5aadb80 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -387,7 +387,7 @@ public class MqttIO {
}
@Override
- public Coder<byte[]> getDefaultOutputCoder() {
+ public Coder<byte[]> getOutputCoder() {
return ByteArrayCoder.of();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e017a0ec/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
index 7aa42c5..b893d43 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
@@ -85,7 +85,7 @@ public class XmlSource<T> extends FileBasedSource<T> {
}
@Override
- public Coder<T> getDefaultOutputCoder() {
+ public Coder<T> getOutputCoder() {
return JAXBCoder.of(spec.getRecordClass());
}
[5/6] beam git commit: Remembers the output coders of SplittableParDo
Posted by jk...@apache.org.
Remembers the output coders of SplittableParDo
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/48690bc6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/48690bc6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/48690bc6
Branch: refs/heads/master
Commit: 48690bc61673e767d4a1fa72e0499c32f160db39
Parents: 95e2a00
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 3 17:35:03 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 3 17:35:03 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/apex/ApexRunner.java | 5 +-
.../core/construction/SplittableParDo.java | 65 ++++++++++----------
.../core/construction/SplittableParDoTest.java | 33 +++++-----
.../core/SplittableParDoViaKeyedWorkItems.java | 1 +
.../direct/ParDoMultiOverrideFactory.java | 2 +-
.../dataflow/SplittableParDoOverrides.java | 2 +-
6 files changed, 56 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index cee524e..57d2593 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -379,8 +379,9 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
public PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform(
AppliedPTransform<PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
transform) {
- return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform),
- SplittableParDo.forJavaParDo(transform.getTransform()));
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(transform),
+ SplittableParDo.forAppliedParDo(transform));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index bcc5de8..32d3409 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.Maps;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -74,6 +74,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
private final List<PCollectionView<?>> sideInputs;
private final TupleTag<OutputT> mainOutputTag;
private final TupleTagList additionalOutputTags;
+ private final Map<TupleTag<?>, Coder<?>> outputTagsToCoders;
public static final String SPLITTABLE_PROCESS_URN =
"urn:beam:runners_core:transforms:splittable_process:v1";
@@ -86,34 +87,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
private SplittableParDo(
DoFn<InputT, OutputT> doFn,
- TupleTag<OutputT> mainOutputTag,
List<PCollectionView<?>> sideInputs,
- TupleTagList additionalOutputTags) {
+ TupleTag<OutputT> mainOutputTag,
+ TupleTagList additionalOutputTags,
+ Map<TupleTag<?>, Coder<?>> outputTagsToCoders) {
checkArgument(
DoFnSignatures.getSignature(doFn.getClass()).processElement().isSplittable(),
"fn must be a splittable DoFn");
this.doFn = doFn;
- this.mainOutputTag = mainOutputTag;
this.sideInputs = sideInputs;
+ this.mainOutputTag = mainOutputTag;
this.additionalOutputTags = additionalOutputTags;
- }
-
- /**
- * Creates a {@link SplittableParDo} from an original Java {@link ParDo}.
- *
- * @param parDo The splittable {@link ParDo} transform.
- */
- public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forJavaParDo(
- ParDo.MultiOutput<InputT, OutputT> parDo) {
- checkArgument(parDo != null, "parDo must not be null");
- checkArgument(
- DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(),
- "fn must be a splittable DoFn");
- return new SplittableParDo(
- parDo.getFn(),
- parDo.getMainOutputTag(),
- parDo.getSideInputs(),
- parDo.getAdditionalOutputTags());
+ this.outputTagsToCoders = outputTagsToCoders;
}
/**
@@ -122,15 +107,22 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
* <p>The input may generally be a deserialized transform so it may not actually be a {@link
* ParDo}. Instead {@link ParDoTranslation} will be used to extract fields.
*/
- public static SplittableParDo<?, ?, ?> forAppliedParDo(AppliedPTransform<?, ?, ?> parDo) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forAppliedParDo(
+ AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> parDo) {
checkArgument(parDo != null, "parDo must not be null");
try {
- return new SplittableParDo<>(
+ Map<TupleTag<?>, Coder<?>> outputTagsToCoders = Maps.newHashMap();
+ for (Map.Entry<TupleTag<?>, PValue> entry : parDo.getOutputs().entrySet()) {
+ outputTagsToCoders.put(entry.getKey(), ((PCollection) entry.getValue()).getCoder());
+ }
+ return new SplittableParDo(
ParDoTranslation.getDoFn(parDo),
- (TupleTag) ParDoTranslation.getMainOutputTag(parDo),
ParDoTranslation.getSideInputs(parDo),
- ParDoTranslation.getAdditionalOutputTags(parDo));
+ ParDoTranslation.getMainOutputTag(parDo),
+ ParDoTranslation.getAdditionalOutputTags(parDo),
+ outputTagsToCoders);
} catch (IOException exc) {
throw new RuntimeException(exc);
}
@@ -169,7 +161,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
(WindowingStrategy<InputT, ?>) input.getWindowingStrategy(),
sideInputs,
mainOutputTag,
- additionalOutputTags));
+ additionalOutputTags,
+ outputTagsToCoders));
}
@Override
@@ -203,6 +196,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
private final List<PCollectionView<?>> sideInputs;
private final TupleTag<OutputT> mainOutputTag;
private final TupleTagList additionalOutputTags;
+ private final Map<TupleTag<?>, Coder<?>> outputTagsToCoders;
/**
* @param fn the splittable {@link DoFn}.
@@ -210,7 +204,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
* @param sideInputs list of side inputs that should be available to the {@link DoFn}.
* @param mainOutputTag {@link TupleTag Tag} of the {@link DoFn DoFn's} main output.
* @param additionalOutputTags {@link TupleTagList Tags} of the {@link DoFn DoFn's} additional
- * outputs.
+ * @param outputTagsToCoders A map from output tag to the coder for that output, which should
+ * provide mappings for the main and all additional tags.
*/
public ProcessKeyedElements(
DoFn<InputT, OutputT> fn,
@@ -219,7 +214,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
WindowingStrategy<InputT, ?> windowingStrategy,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
- TupleTagList additionalOutputTags) {
+ TupleTagList additionalOutputTags,
+ Map<TupleTag<?>, Coder<?>> outputTagsToCoders) {
this.fn = fn;
this.elementCoder = elementCoder;
this.restrictionCoder = restrictionCoder;
@@ -227,6 +223,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
this.sideInputs = sideInputs;
this.mainOutputTag = mainOutputTag;
this.additionalOutputTags = additionalOutputTags;
+ this.outputTagsToCoders = outputTagsToCoders;
}
public DoFn<InputT, OutputT> getFn() {
@@ -257,10 +254,14 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
return additionalOutputTags;
}
+ public Map<TupleTag<?>, Coder<?>> getOutputTagsToCoders() {
+ return outputTagsToCoders;
+ }
+
@Override
public PCollectionTuple expand(PCollection<KV<String, KV<InputT, RestrictionT>>> input) {
return createPrimitiveOutputFor(
- input, fn, mainOutputTag, additionalOutputTags, windowingStrategy);
+ input, fn, mainOutputTag, additionalOutputTags, outputTagsToCoders, windowingStrategy);
}
public static <OutputT> PCollectionTuple createPrimitiveOutputFor(
@@ -268,14 +269,14 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
DoFn<?, OutputT> fn,
TupleTag<OutputT> mainOutputTag,
TupleTagList additionalOutputTags,
+ Map<TupleTag<?>, Coder<?>> outputTagsToCoders,
WindowingStrategy<?, ?> windowingStrategy) {
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
PCollectionTuple outputs =
PCollectionTuple.ofPrimitiveOutputsInternal(
input.getPipeline(),
TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
- // TODO
- Collections.<TupleTag<?>, Coder<?>>emptyMap(),
+ outputTagsToCoders,
windowingStrategy,
input.isBounded().and(signature.isBoundedPerElement()));
http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
index 267232c..05c471d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import java.io.Serializable;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -29,6 +30,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.junit.Rule;
@@ -106,12 +108,18 @@ public class SplittableParDoTest {
private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<String>() {};
- private ParDo.MultiOutput<Integer, String> makeParDo(DoFn<Integer, String> fn) {
- return ParDo.of(fn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty());
+ private PCollection<String> applySplittableParDo(
+ String name, PCollection<Integer> input, DoFn<Integer, String> fn) {
+ ParDo.MultiOutput<Integer, String> multiOutput =
+ ParDo.of(fn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty());
+ PCollectionTuple output = multiOutput.expand(input);
+ output.get(MAIN_OUTPUT_TAG).setName("main");
+ AppliedPTransform<PCollection<Integer>, PCollectionTuple, ?> transform =
+ AppliedPTransform.of("ParDo", input.expand(), output.expand(), multiOutput, pipeline);
+ return input.apply(name, SplittableParDo.forAppliedParDo(transform)).get(MAIN_OUTPUT_TAG);
}
- @Rule
- public TestPipeline pipeline = TestPipeline.create();
+ @Rule public TestPipeline pipeline = TestPipeline.create();
@Test
public void testBoundednessForBoundedFn() {
@@ -121,16 +129,12 @@ public class SplittableParDoTest {
assertEquals(
"Applying a bounded SDF to a bounded collection produces a bounded collection",
PCollection.IsBounded.BOUNDED,
- makeBoundedCollection(pipeline)
- .apply("bounded to bounded", SplittableParDo.forJavaParDo(makeParDo(boundedFn)))
- .get(MAIN_OUTPUT_TAG)
+ applySplittableParDo("bounded to bounded", makeBoundedCollection(pipeline), boundedFn)
.isBounded());
assertEquals(
"Applying a bounded SDF to an unbounded collection produces an unbounded collection",
PCollection.IsBounded.UNBOUNDED,
- makeUnboundedCollection(pipeline)
- .apply("bounded to unbounded", SplittableParDo.forJavaParDo(makeParDo(boundedFn)))
- .get(MAIN_OUTPUT_TAG)
+ applySplittableParDo("bounded to unbounded", makeUnboundedCollection(pipeline), boundedFn)
.isBounded());
}
@@ -142,16 +146,13 @@ public class SplittableParDoTest {
assertEquals(
"Applying an unbounded SDF to a bounded collection produces a bounded collection",
PCollection.IsBounded.UNBOUNDED,
- makeBoundedCollection(pipeline)
- .apply("unbounded to bounded", SplittableParDo.forJavaParDo(makeParDo(unboundedFn)))
- .get(MAIN_OUTPUT_TAG)
+ applySplittableParDo("unbounded to bounded", makeBoundedCollection(pipeline), unboundedFn)
.isBounded());
assertEquals(
"Applying an unbounded SDF to an unbounded collection produces an unbounded collection",
PCollection.IsBounded.UNBOUNDED,
- makeUnboundedCollection(pipeline)
- .apply("unbounded to unbounded", SplittableParDo.forJavaParDo(makeParDo(unboundedFn)))
- .get(MAIN_OUTPUT_TAG)
+ applySplittableParDo(
+ "unbounded to unbounded", makeUnboundedCollection(pipeline), unboundedFn)
.isBounded());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index af720fd..251260e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -184,6 +184,7 @@ public class SplittableParDoViaKeyedWorkItems {
original.getFn(),
original.getMainOutputTag(),
original.getAdditionalOutputTags(),
+ original.getOutputTagsToCoders(),
original.getInputWindowingStrategy());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 3f04b56..26f30b0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -96,7 +96,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
if (signature.processElement().isSplittable()) {
- return (PTransform) SplittableParDo.forAppliedParDo(application);
+ return SplittableParDo.forAppliedParDo((AppliedPTransform) application);
} else if (signature.stateDeclarations().size() > 0
|| signature.timerDeclarations().size() > 0) {
return new GbkThenStatefulParDo(
http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
index fc010f8..7b65950 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
@@ -64,7 +64,7 @@ class SplittableParDoOverrides {
appliedTransform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(appliedTransform),
- SplittableParDo.forJavaParDo(appliedTransform.getTransform()));
+ SplittableParDo.forAppliedParDo(appliedTransform));
}
@Override
[2/6] beam git commit: Requires specifying a Coder on
PCollection.createPrimitiveOutputInternal
Posted by jk...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index 4063d11..e8bf9b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -366,10 +366,15 @@ public class PCollection<T> extends PValueBase implements PValue {
public static <T> PCollection<T> createPrimitiveOutputInternal(
Pipeline pipeline,
WindowingStrategy<?, ?> windowingStrategy,
- IsBounded isBounded) {
- return new PCollection<T>(pipeline)
+ IsBounded isBounded,
+ @Nullable Coder<T> coder) {
+ PCollection<T> res = new PCollection<T>(pipeline)
.setWindowingStrategyInternal(windowingStrategy)
.setIsBoundedInternal(isBounded);
+ if (coder != null) {
+ res.setCoder(coder);
+ }
+ return res;
}
private static class CoderOrFailure<T> {
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
index 793994f..9799d0e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -201,6 +202,7 @@ public class PCollectionTuple implements PInput, POutput {
public static PCollectionTuple ofPrimitiveOutputsInternal(
Pipeline pipeline,
TupleTagList outputTags,
+ Map<TupleTag<?>, Coder<?>> coders,
WindowingStrategy<?, ?> windowingStrategy,
IsBounded isBounded) {
Map<TupleTag<?>, PCollection<?>> pcollectionMap = new LinkedHashMap<>();
@@ -217,10 +219,10 @@ public class PCollectionTuple implements PInput, POutput {
// erasure as the correct type. When a transform adds
// elements to `outputCollection` they will be of type T.
@SuppressWarnings("unchecked")
- TypeDescriptor<Object> token = (TypeDescriptor<Object>) outputTag.getTypeDescriptor();
- PCollection<Object> outputCollection = PCollection
- .createPrimitiveOutputInternal(pipeline, windowingStrategy, isBounded)
- .setTypeDescriptor(token);
+ PCollection outputCollection =
+ PCollection.createPrimitiveOutputInternal(
+ pipeline, windowingStrategy, isBounded, coders.get(outputTag))
+ .setTypeDescriptor((TypeDescriptor) outputTag.getTypeDescriptor());
pcollectionMap.put(outputTag, outputCollection);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 93650dd..12fe633 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
@@ -110,7 +111,7 @@ public class TransformHierarchyTest implements Serializable {
public void emptyCompositeSucceeds() {
PCollection<Long> created =
PCollection.createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
TransformHierarchy.Node node = hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1));
hierarchy.setOutput(created);
hierarchy.popNode();
@@ -139,7 +140,7 @@ public class TransformHierarchyTest implements Serializable {
public void producingOwnAndOthersOutputsFails() {
PCollection<Long> created =
PCollection.createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1));
hierarchy.setOutput(created);
hierarchy.popNode();
@@ -147,8 +148,11 @@ public class TransformHierarchyTest implements Serializable {
final PCollectionList<Long> appended =
pcList.and(
- PCollection.<Long>createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
+ PCollection.createPrimitiveOutputInternal(
+ pipeline,
+ WindowingStrategy.globalDefault(),
+ IsBounded.BOUNDED,
+ VarLongCoder.of())
.setName("prim"));
hierarchy.pushNode(
"AddPc",
@@ -171,7 +175,7 @@ public class TransformHierarchyTest implements Serializable {
public void producingOwnOutputWithCompositeFails() {
final PCollection<Long> comp =
PCollection.createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
PTransform<PBegin, PCollection<Long>> root =
new PTransform<PBegin, PCollection<Long>>() {
@Override
@@ -327,7 +331,7 @@ public class TransformHierarchyTest implements Serializable {
PCollection<Long> created =
PCollection.createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
SingleOutput<Long, Long> pardo =
ParDo.of(
@@ -340,7 +344,7 @@ public class TransformHierarchyTest implements Serializable {
PCollection<Long> mapped =
PCollection.createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
TransformHierarchy.Node compositeNode = hierarchy.pushNode("Create", begin, create);
hierarchy.finishSpecifyingInput();
@@ -499,13 +503,11 @@ public class TransformHierarchyTest implements Serializable {
@Test
public void visitIsTopologicallyOrdered() {
PCollection<String> one =
- PCollection.<String>createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
- .setCoder(StringUtf8Coder.of());
+ PCollection.createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, StringUtf8Coder.of());
final PCollection<Integer> two =
- PCollection.<Integer>createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
- .setCoder(VarIntCoder.of());
+ PCollection.createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, VarIntCoder.of());
final PDone done = PDone.in(pipeline);
final TupleTag<String> oneTag = new TupleTag<String>() {};
final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};
@@ -617,13 +619,14 @@ public class TransformHierarchyTest implements Serializable {
@Test
public void visitDoesNotVisitSkippedNodes() {
PCollection<String> one =
- PCollection.<String>createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
- .setCoder(StringUtf8Coder.of());
+ PCollection.createPrimitiveOutputInternal(
+ pipeline,
+ WindowingStrategy.globalDefault(),
+ IsBounded.BOUNDED,
+ StringUtf8Coder.of());
final PCollection<Integer> two =
- PCollection.<Integer>createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
- .setCoder(VarIntCoder.of());
+ PCollection.createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, VarIntCoder.of());
final PDone done = PDone.in(pipeline);
final TupleTag<String> oneTag = new TupleTag<String>() {};
final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index e7b680a..bf06d78 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.EnumSet;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
@@ -85,10 +86,13 @@ public class TransformTreeTest {
// Issue below: PCollection.createPrimitiveOutput should not be used
// from within a composite transform.
return PCollectionList.of(
- Arrays.asList(result, PCollection.<String>createPrimitiveOutputInternal(
- b.getPipeline(),
- WindowingStrategy.globalDefault(),
- result.isBounded())));
+ Arrays.asList(
+ result,
+ PCollection.createPrimitiveOutputInternal(
+ b.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ result.isBounded(),
+ StringUtf8Coder.of())));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index a8cb843..5dbe176 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -228,7 +228,7 @@ public class FlattenTest implements Serializable {
public void testFlattenNoListsNoCoder() {
// not ValidatesRunner because it should fail at pipeline construction time anyhow.
thrown.expect(IllegalStateException.class);
- thrown.expectMessage("cannot provide a Coder for empty");
+ thrown.expectMessage("Unable to return a default Coder");
PCollectionList.<ClassWithoutCoder>empty(p)
.apply(Flatten.<ClassWithoutCoder>pCollections());
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 8fcb4c0..a76714c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -70,7 +70,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.hamcrest.Matcher;
import org.joda.time.Duration;
@@ -423,11 +422,11 @@ public class GroupByKeyTest implements Serializable {
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
public PCollection<KV<String, Integer>> expand(PBegin input) {
- return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.UNBOUNDED)
- .setTypeDescriptor(new TypeDescriptor<KV<String, Integer>>() {});
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED,
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
}
});
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index cdd03d9..bfb8b5a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -60,7 +60,6 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
@@ -1340,11 +1339,11 @@ public class ViewTest implements Serializable {
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
public PCollection<KV<String, Integer>> expand(PBegin input) {
- return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.UNBOUNDED)
- .setTypeDescriptor(new TypeDescriptor<KV<String, Integer>>() {});
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED,
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
}
})
.apply(view);
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index 58e2bbd..33503b6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -59,9 +60,9 @@ public final class PCollectionTupleTest implements Serializable {
@Test
public void testOfThenHas() {
- PCollection<Object> pCollection = PCollection.createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
- TupleTag<Object> tag = new TupleTag<>();
+ PCollection<Integer> pCollection = PCollection.createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
+ TupleTag<Integer> tag = new TupleTag<>();
assertTrue(PCollectionTuple.of(tag, pCollection).has(tag));
}
[3/6] beam git commit: Requires specifying a Coder on
PCollection.createPrimitiveOutputInternal
Posted by jk...@apache.org.
Requires specifying a Coder on PCollection.createPrimitiveOutputInternal
The coder can still be null, in which case it is left unspecified.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bb1bf3c1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bb1bf3c1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bb1bf3c1
Branch: refs/heads/master
Commit: bb1bf3c19ca0baa2c04cec9863bfcaca2024f94e
Parents: e017a0e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 26 17:14:58 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 3 15:40:46 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/apex/ApexRunner.java | 5 +-
.../FlattenPCollectionTranslator.java | 15 +-
.../apex/translation/ParDoTranslator.java | 7 +-
.../translation/ApexGroupByKeyOperatorTest.java | 6 +-
.../construction/PCollectionTranslation.java | 8 +-
.../construction/PTransformReplacements.java | 6 +
.../core/construction/PrimitiveCreate.java | 14 +-
.../core/construction/SplittableParDo.java | 3 +
.../UnboundedReadFromBoundedSource.java | 5 -
.../construction/PTransformMatchersTest.java | 79 +++++-----
.../construction/ReplacementOutputsTest.java | 14 +-
.../core/GroupByKeyViaGroupByKeyOnly.java | 15 +-
.../core/SplittableParDoViaKeyedWorkItems.java | 9 +-
.../beam/runners/direct/DirectGroupByKey.java | 31 ++--
.../direct/ParDoMultiOverrideFactory.java | 3 +
.../direct/TestStreamEvaluatorFactory.java | 8 +-
.../runners/direct/ViewOverrideFactory.java | 5 +-
.../runners/direct/CommittedResultTest.java | 26 +++-
.../runners/direct/EvaluationContextTest.java | 8 +-
.../runners/flink/CreateStreamingFlinkView.java | 5 +-
.../beam/runners/dataflow/AssignWindows.java | 4 +-
.../runners/dataflow/BatchViewOverrides.java | 19 +--
.../runners/dataflow/CreateDataflowView.java | 5 +-
.../beam/runners/dataflow/DataflowRunner.java | 147 ++++++++++---------
.../dataflow/PrimitiveParDoSingleFactory.java | 12 +-
.../DataflowPipelineTranslatorTest.java | 11 +-
.../runners/dataflow/DataflowRunnerTest.java | 11 +-
.../transforms/DataflowGroupByKeyTest.java | 12 +-
.../dataflow/transforms/DataflowViewTest.java | 14 +-
.../beam/runners/spark/io/CreateStream.java | 11 +-
.../translation/StorageLevelPTransform.java | 10 +-
.../util/SinglePrimitiveOutputPTransform.java | 51 -------
.../main/java/org/apache/beam/sdk/io/Read.java | 21 ++-
.../org/apache/beam/sdk/testing/TestStream.java | 5 +-
.../org/apache/beam/sdk/transforms/Flatten.java | 22 +--
.../apache/beam/sdk/transforms/GroupByKey.java | 12 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 2 +
.../org/apache/beam/sdk/transforms/View.java | 5 +-
.../beam/sdk/transforms/windowing/Window.java | 2 +-
.../org/apache/beam/sdk/values/PCollection.java | 9 +-
.../beam/sdk/values/PCollectionTuple.java | 10 +-
.../sdk/runners/TransformHierarchyTest.java | 41 +++---
.../beam/sdk/runners/TransformTreeTest.java | 12 +-
.../apache/beam/sdk/transforms/FlattenTest.java | 2 +-
.../beam/sdk/transforms/GroupByKeyTest.java | 11 +-
.../apache/beam/sdk/transforms/ViewTest.java | 11 +-
.../beam/sdk/values/PCollectionTupleTest.java | 7 +-
47 files changed, 357 insertions(+), 394 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index fd0a1c9..cee524e 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -227,9 +227,8 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
@Override
public PCollection<ElemT> expand(PCollection<ElemT> input) {
- return PCollection.<ElemT>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
- .setCoder(input.getCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
}
public PCollectionView<ViewT> getView() {
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
index 440b801..189cb65 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
@@ -110,8 +110,12 @@ class FlattenPCollectionTranslator<T> implements
}
if (collections.size() > 2) {
- PCollection<T> intermediateCollection = intermediateCollection(collection,
- collection.getCoder());
+ PCollection<T> intermediateCollection =
+ PCollection.createPrimitiveOutputInternal(
+ collection.getPipeline(),
+ collection.getWindowingStrategy(),
+ collection.isBounded(),
+ collection.getCoder());
context.addOperator(operator, operator.out, intermediateCollection);
remainingCollections.add(intermediateCollection);
} else {
@@ -135,11 +139,4 @@ class FlattenPCollectionTranslator<T> implements
}
}
- static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) {
- PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(),
- input.getWindowingStrategy(), input.isBounded());
- output.setCoder(outputCoder);
- return output;
- }
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index e46687a..be11b02 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -241,8 +241,11 @@ class ParDoTranslator<InputT, OutputT>
}
PCollection<Object> resultCollection =
- FlattenPCollectionTranslator.intermediateCollection(
- firstSideInput, firstSideInput.getCoder());
+ PCollection.createPrimitiveOutputInternal(
+ firstSideInput.getPipeline(),
+ firstSideInput.getWindowingStrategy(),
+ firstSideInput.isBounded(),
+ firstSideInput.getCoder());
FlattenPCollectionTranslator.flattenCollections(
sourceCollections, unionTags, resultCollection, context);
return resultCollection;
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
index 206b430..63a218b 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
@@ -59,9 +59,9 @@ public class ApexGroupByKeyOperatorTest {
WindowingStrategy<?, ?> ws = WindowingStrategy.of(FixedWindows.of(
Duration.standardSeconds(10)));
- PCollection<KV<String, Integer>> input = PCollection.createPrimitiveOutputInternal(pipeline,
- ws, IsBounded.BOUNDED);
- input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+ PCollection<KV<String, Integer>> input =
+ PCollection.createPrimitiveOutputInternal(
+ pipeline, ws, IsBounded.BOUNDED, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
ApexGroupByKeyOperator<String, Integer> operator = new ApexGroupByKeyOperator<>(options,
input, new ApexStateInternals.ApexStateBackend()
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index c0a5acf..c256e4c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -52,10 +52,10 @@ public class PCollectionTranslation {
Coder<?> coder = components.getCoder(pCollection.getCoderId());
return PCollection.createPrimitiveOutputInternal(
- pipeline,
- components.getWindowingStrategy(pCollection.getWindowingStrategyId()),
- fromProto(pCollection.getIsBounded()))
- .setCoder((Coder) coder);
+ pipeline,
+ components.getWindowingStrategy(pCollection.getWindowingStrategyId()),
+ fromProto(pCollection.getIsBounded()),
+ (Coder) coder);
}
public static IsBounded isBounded(RunnerApi.PCollection pCollection) {
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java
index 706a956..35bad15 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.Iterables;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -66,4 +67,9 @@ public class PTransformReplacements {
ignoredTags);
return mainInput;
}
+
+ public static <T> PCollection<T> getSingletonMainOutput(
+ AppliedPTransform<?, PCollection<T>, ? extends PTransform<?, PCollection<T>>> transform) {
+ return ((PCollection<T>) Iterables.getOnlyElement(transform.getOutputs().values()));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
index f43d23b..62b6d0a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
@@ -18,7 +18,9 @@
package org.apache.beam.runners.core.construction;
+import com.google.common.collect.Iterables;
import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Create;
@@ -36,15 +38,17 @@ import org.apache.beam.sdk.values.WindowingStrategy;
*/
public class PrimitiveCreate<T> extends PTransform<PBegin, PCollection<T>> {
private final Create.Values<T> transform;
+ private final Coder<T> coder;
- private PrimitiveCreate(Create.Values<T> transform) {
+ private PrimitiveCreate(Create.Values<T> transform, Coder<T> coder) {
this.transform = transform;
+ this.coder = coder;
}
@Override
public PCollection<T> expand(PBegin input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED, coder);
}
public Iterable<T> getElements() {
@@ -60,7 +64,11 @@ public class PrimitiveCreate<T> extends PTransform<PBegin, PCollection<T>> {
public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
AppliedPTransform<PBegin, PCollection<T>, Values<T>> transform) {
return PTransformReplacement.of(
- transform.getPipeline().begin(), new PrimitiveCreate<T>(transform.getTransform()));
+ transform.getPipeline().begin(),
+ new PrimitiveCreate<T>(
+ transform.getTransform(),
+ ((PCollection<T>) Iterables.getOnlyElement(transform.getOutputs().values()))
+ .getCoder()));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index e71187b..bcc5de8 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -273,6 +274,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
PCollectionTuple.ofPrimitiveOutputsInternal(
input.getPipeline(),
TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
+ // TODO
+ Collections.<TupleTag<?>, Coder<?>>emptyMap(),
windowingStrategy,
input.isBounded().and(signature.isBoundedPerElement()));
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index f35f4c3..55f9519 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -91,11 +91,6 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
}
@Override
- protected Coder<T> getDefaultOutputCoder() {
- return source.getOutputCoder();
- }
-
- @Override
public String getKindString() {
return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 1862699..fa7e1e9 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -100,15 +100,16 @@ public class PTransformMatchersTest implements Serializable {
private AppliedPTransform<?, ?, ?> getAppliedTransform(PTransform pardo) {
PCollection<KV<String, Integer>> input =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p,
+ WindowingStrategy.globalDefault(),
+ IsBounded.BOUNDED,
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
input.setName("dummy input");
- input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
PCollection<Integer> output =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
output.setName("dummy output");
- output.setCoder(VarIntCoder.of());
return AppliedPTransform.of("pardo", input.expand(), output.expand(), pardo, p);
}
@@ -133,7 +134,7 @@ public class PTransformMatchersTest implements Serializable {
@Override
public PCollection<Integer> expand(PCollection<KV<String, Integer>> input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), VarIntCoder.of());
}
}
PTransformMatcher matcher = PTransformMatchers.classEqualTo(MyPTransform.class);
@@ -425,14 +426,14 @@ public class PTransformMatchersTest implements Serializable {
public void emptyFlattenWithEmptyFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+ .<PCollectionList<Integer>, PCollection<Integer>, Flatten.PCollections<Integer>>of(
"EmptyFlatten",
Collections.<TupleTag<?>, PValue>emptyMap(),
Collections.<TupleTag<?>, PValue>singletonMap(
- new TupleTag<Object>(),
+ new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
- Flatten.pCollections(),
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+ Flatten.<Integer>pCollections(),
p);
assertThat(PTransformMatchers.emptyFlatten().matches(application), is(true));
@@ -442,17 +443,17 @@ public class PTransformMatchersTest implements Serializable {
public void emptyFlattenWithNonEmptyFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+ .<PCollectionList<Integer>, PCollection<Integer>, Flatten.PCollections<Integer>>of(
"Flatten",
Collections.<TupleTag<?>, PValue>singletonMap(
- new TupleTag<Object>(),
+ new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
Collections.<TupleTag<?>, PValue>singletonMap(
- new TupleTag<Object>(),
+ new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
- Flatten.pCollections(),
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+ Flatten.<Integer>pCollections(),
p);
assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
@@ -462,15 +463,15 @@ public class PTransformMatchersTest implements Serializable {
public void emptyFlattenWithNonFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>of(
+ .<PCollection<Iterable<Integer>>, PCollection<Integer>, Flatten.Iterables<Integer>>of(
"EmptyFlatten",
Collections.<TupleTag<?>, PValue>emptyMap(),
Collections.<TupleTag<?>, PValue>singletonMap(
- new TupleTag<Object>(),
+ new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
- Flatten.iterables() /* This isn't actually possible to construct,
- * but for the sake of example */,
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+ /* This isn't actually possible to construct, but for the sake of example */
+ Flatten.<Integer>iterables(),
p);
assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
@@ -480,17 +481,17 @@ public class PTransformMatchersTest implements Serializable {
public void flattenWithDuplicateInputsWithoutDuplicates() {
AppliedPTransform application =
AppliedPTransform
- .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+ .<PCollectionList<Integer>, PCollection<Integer>, Flatten.PCollections<Integer>>of(
"Flatten",
Collections.<TupleTag<?>, PValue>singletonMap(
- new TupleTag<Object>(),
+ new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
Collections.<TupleTag<?>, PValue>singletonMap(
- new TupleTag<Object>(),
+ new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
- Flatten.pCollections(),
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+ Flatten.<Integer>pCollections(),
p);
assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));
@@ -498,22 +499,22 @@ public class PTransformMatchersTest implements Serializable {
@Test
public void flattenWithDuplicateInputsWithDuplicates() {
- PCollection<Object> duplicate =
+ PCollection<Integer> duplicate =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
AppliedPTransform application =
AppliedPTransform
- .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+ .<PCollectionList<Integer>, PCollection<Integer>, Flatten.PCollections<Integer>>of(
"Flatten",
ImmutableMap.<TupleTag<?>, PValue>builder()
- .put(new TupleTag<Object>(), duplicate)
- .put(new TupleTag<Object>(), duplicate)
+ .put(new TupleTag<Integer>(), duplicate)
+ .put(new TupleTag<Integer>(), duplicate)
.build(),
Collections.<TupleTag<?>, PValue>singletonMap(
- new TupleTag<Object>(),
+ new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
- Flatten.pCollections(),
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+ Flatten.<Integer>pCollections(),
p);
assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(true));
@@ -523,15 +524,15 @@ public class PTransformMatchersTest implements Serializable {
public void flattenWithDuplicateInputsNonFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>of(
+ .<PCollection<Iterable<Integer>>, PCollection<Integer>, Flatten.Iterables<Integer>>of(
"EmptyFlatten",
Collections.<TupleTag<?>, PValue>emptyMap(),
Collections.<TupleTag<?>, PValue>singletonMap(
- new TupleTag<Object>(),
+ new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
- Flatten.iterables() /* This isn't actually possible to construct,
- * but for the sake of example */,
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+ /* This isn't actually possible to construct, but for the sake of example */
+ Flatten.<Integer>iterables(),
p);
assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
index f8d01e9..0165e4b 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import java.util.Map;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
@@ -50,23 +52,23 @@ public class ReplacementOutputsTest {
private PCollection<Integer> ints =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
private PCollection<Integer> moreInts =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
private PCollection<String> strs =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, StringUtf8Coder.of());
private PCollection<Integer> replacementInts =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
private PCollection<Integer> moreReplacementInts =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
private PCollection<String> replacementStrs =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, StringUtf8Coder.of());
@Test
public void singletonSucceeds() {
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
index fca3c76..1fdf07c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
@@ -111,12 +111,10 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
@Override
public PCollection<KV<K, Iterable<WindowedValue<V>>>> expand(PCollection<KV<K, V>> input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
- }
-
- @Override
- public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
- return GroupByKey.getOutputKvCoder(input.getCoder());
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.isBounded(),
+ (Coder) GroupByKey.getOutputKvCoder(input.getCoder()));
}
}
@@ -244,9 +242,8 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder);
Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
- return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
- input.getPipeline(), windowingStrategy, input.isBounded())
- .setCoder(outputKvCoder);
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), windowingStrategy, input.isBounded(), outputKvCoder);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 6e97645..af720fd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -72,8 +72,15 @@ public class SplittableParDoViaKeyedWorkItems {
PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
@Override
public PCollection<KeyedWorkItem<KeyT, InputT>> expand(PCollection<KV<KeyT, InputT>> input) {
+ KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder();
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ input.isBounded(),
+ KeyedWorkItemCoder.of(
+ kvCoder.getKeyCoder(),
+ kvCoder.getValueCoder(),
+ input.getWindowingStrategy().getWindowFn().windowCoder()));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 06b8e29..3ba04e7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -24,7 +24,6 @@ import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.construction.ForwardingPTransform;
import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -78,22 +77,18 @@ class DirectGroupByKey<K, V>
@Override
public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ input.isBounded(),
+ KeyedWorkItemCoder.of(
+ GroupByKey.getKeyCoder(input.getCoder()),
+ GroupByKey.getInputValueCoder(input.getCoder()),
+ input.getWindowingStrategy().getWindowFn().windowCoder()));
}
DirectGroupByKeyOnly() {}
@Override
- protected Coder<?> getDefaultOutputCoder(
- @SuppressWarnings("unused") PCollection<KV<K, V>> input)
- throws CannotProvideCoderException {
- return KeyedWorkItemCoder.of(
- GroupByKey.getKeyCoder(input.getCoder()),
- GroupByKey.getInputValueCoder(input.getCoder()),
- input.getWindowingStrategy().getWindowFn().windowCoder());
- }
-
- @Override
public String getUrn() {
return DIRECT_GBKO_URN;
}
@@ -135,17 +130,11 @@ class DirectGroupByKey<K, V>
}
@Override
- protected Coder<?> getDefaultOutputCoder(
- @SuppressWarnings("unused") PCollection<KeyedWorkItem<K, V>> input)
- throws CannotProvideCoderException {
- KeyedWorkItemCoder<K, V> inputCoder = getKeyedWorkItemCoder(input.getCoder());
- return KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getElementCoder()));
- }
-
- @Override
public PCollection<KV<K, Iterable<V>>> expand(PCollection<KeyedWorkItem<K, V>> input) {
+ KeyedWorkItemCoder<K, V> inputCoder = getKeyedWorkItemCoder(input.getCoder());
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), outputWindowingStrategy, input.isBounded());
+ input.getPipeline(), outputWindowingStrategy, input.isBounded(),
+ KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getElementCoder())));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 891d102..3f04b56 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkState;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.KeyedWorkItem;
@@ -248,6 +249,8 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
PCollectionTuple.ofPrimitiveOutputsInternal(
input.getPipeline(),
TupleTagList.of(getMainOutputTag()).and(getAdditionalOutputTags().getAll()),
+ // TODO
+ Collections.<TupleTag<?>, Coder<?>>emptyMap(),
input.getWindowingStrategy(),
input.isBounded());
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 16c8589..49e7be7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -207,9 +207,11 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
@Override
public PCollection<T> expand(PBegin input) {
runner.setClockSupplier(new TestClockSupplier());
- return PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
- .setCoder(original.getValueCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ IsBounded.UNBOUNDED,
+ original.getValueCoder());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 5dcf016..c2255fe 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -115,9 +115,8 @@ class ViewOverrideFactory<ElemT, ViewT>
@Override
@SuppressWarnings("deprecation")
public PCollection<Iterable<ElemT>> expand(PCollection<Iterable<ElemT>> input) {
- return PCollection.<Iterable<ElemT>>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
- .setCoder(input.getCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
}
@SuppressWarnings("deprecation")
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index 8b95b34..29ed55d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import org.apache.beam.runners.direct.CommittedResult.OutputType;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
@@ -113,13 +114,24 @@ public class CommittedResultTest implements Serializable {
@Test
public void getOutputsEqualInput() {
- List<? extends CommittedBundle<?>> outputs =
- ImmutableList.of(bundleFactory.createBundle(PCollection.createPrimitiveOutputInternal(p,
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.BOUNDED)).commit(Instant.now()),
- bundleFactory.createBundle(PCollection.createPrimitiveOutputInternal(p,
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.UNBOUNDED)).commit(Instant.now()));
+ List<? extends CommittedBundle<Integer>> outputs =
+ ImmutableList.of(
+ bundleFactory
+ .createBundle(
+ PCollection.createPrimitiveOutputInternal(
+ p,
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.BOUNDED,
+ VarIntCoder.of()))
+ .commit(Instant.now()),
+ bundleFactory
+ .createBundle(
+ PCollection.createPrimitiveOutputInternal(
+ p,
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED,
+ VarIntCoder.of()))
+ .commit(Instant.now()));
CommittedResult result =
CommittedResult.create(
StepTransformResult.withoutHold(transform).build(),
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 699a318..cc9ce60 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -40,6 +40,7 @@ import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.GenerateSequence;
@@ -127,8 +128,11 @@ public class EvaluationContextTest {
public void writeToViewWriterThenReadReads() {
PCollectionViewWriter<Integer, Iterable<Integer>> viewWriter =
context.createPCollectionViewWriter(
- PCollection.<Iterable<Integer>>createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED),
+ PCollection.createPrimitiveOutputInternal(
+ p,
+ WindowingStrategy.globalDefault(),
+ IsBounded.BOUNDED,
+ IterableCoder.of(VarIntCoder.of())),
view);
BoundedWindow window = new TestBoundedWindow(new Instant(1024L));
BoundedWindow second = new TestBoundedWindow(new Instant(899999L));
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
index 0cc3aec..3114a6f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
@@ -120,9 +120,8 @@ class CreateStreamingFlinkView<ElemT, ViewT>
@Override
public PCollection<List<ElemT>> expand(PCollection<List<ElemT>> input) {
- return PCollection.<List<ElemT>>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
- .setCoder(input.getCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
}
public PCollectionView<ViewT> getView() {
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
index 572b005..d015d2b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
@@ -59,8 +59,8 @@ class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
transform.getOutputStrategyInternal(input.getWindowingStrategy());
if (transform.getWindowFn() != null) {
// If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
- return PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), outputStrategy, input.isBounded());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), outputStrategy, input.isBounded(), input.getCoder());
} else {
// If the windowFn didn't change, we just run a pass-through transform and then set the
// new windowing strategy.
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index ad3faed..9a77b4b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -1258,18 +1258,13 @@ class BatchViewOverrides {
@Override
public PCollection<KV<K1, Iterable<KV<K2, V>>>> expand(PCollection<KV<K1, KV<K2, V>>> input) {
- PCollection<KV<K1, Iterable<KV<K2, V>>>> rval =
- PCollection.<KV<K1, Iterable<KV<K2, V>>>>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- IsBounded.BOUNDED);
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- KvCoder<K1, KV<K2, V>> inputCoder = (KvCoder) input.getCoder();
- rval.setCoder(
- KvCoder.of(inputCoder.getKeyCoder(),
- IterableCoder.of(inputCoder.getValueCoder())));
- return rval;
+ @SuppressWarnings("unchecked")
+ KvCoder<K1, KV<K2, V>> inputCoder = (KvCoder<K1, KV<K2, V>>) input.getCoder();
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ IsBounded.BOUNDED,
+ KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
index caad7f8..3b01d69 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -37,9 +37,8 @@ public class CreateDataflowView<ElemT, ViewT>
@Override
public PCollection<ElemT> expand(PCollection<ElemT> input) {
- return PCollection.<ElemT>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
- .setCoder(input.getCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
}
public PCollectionView<ViewT> getView() {
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 8fce5b4..6999616 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -321,7 +321,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
overridesBuilder.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
- new ReflectiveRootOverrideFactory(StreamingPubsubIORead.class, this)));
+ new StreamingPubsubIOReadOverrideFactory()));
}
if (!hasExperiment(options, "enable_custom_pubsub_sink")) {
overridesBuilder.add(
@@ -359,11 +359,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// must precede it
PTransformOverride.of(
PTransformMatchers.classEqualTo(Read.Bounded.class),
- new ReflectiveRootOverrideFactory(StreamingBoundedRead.class, this)))
+ new StreamingBoundedReadOverrideFactory()))
.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(Read.Unbounded.class),
- new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, this)))
+ new StreamingUnboundedReadOverrideFactory()))
.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
@@ -448,38 +448,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
- private static class ReflectiveRootOverrideFactory<T>
- implements PTransformOverrideFactory<
- PBegin, PCollection<T>, PTransform<PInput, PCollection<T>>> {
- private final Class<PTransform<PBegin, PCollection<T>>> replacement;
- private final DataflowRunner runner;
-
- private ReflectiveRootOverrideFactory(
- Class<PTransform<PBegin, PCollection<T>>> replacement, DataflowRunner runner) {
- this.replacement = replacement;
- this.runner = runner;
- }
-
- @Override
- public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
- AppliedPTransform<PBegin, PCollection<T>, PTransform<PInput, PCollection<T>>> transform) {
- PTransform<PInput, PCollection<T>> original = transform.getTransform();
- return PTransformReplacement.of(
- transform.getPipeline().begin(),
- InstanceBuilder.ofType(replacement)
- .withArg(DataflowRunner.class, runner)
- .withArg(
- (Class<? super PTransform<PInput, PCollection<T>>>) original.getClass(), original)
- .build());
- }
-
- @Override
- public Map<PValue, ReplacementOutput> mapOutputs(
- Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
- return ReplacementOutputs.singleton(outputs, newOutput);
- }
- }
-
private String debuggerMessage(String projectId, String uniquifier) {
return String.format("To debug your job, visit Google Cloud Debugger at: "
+ "https://console.developers.google.com/debug?project=%s&dbgee=%s",
@@ -838,6 +806,24 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// PubsubIO translations
// ================================================================================
+ private static class StreamingPubsubIOReadOverrideFactory
+ implements PTransformOverrideFactory<
+ PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> {
+ @Override
+ public PTransformReplacement<PBegin, PCollection<PubsubMessage>> getReplacementTransform(
+ AppliedPTransform<PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> transform) {
+ return PTransformReplacement.of(
+ transform.getPipeline().begin(), new StreamingPubsubIORead(transform.getTransform()));
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PValue> outputs, PCollection<PubsubMessage> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+
+
/**
* Suppress application of {@link PubsubUnboundedSource#expand} in streaming mode so that we can
* instead defer to Windmill's implementation.
@@ -846,9 +832,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
extends PTransform<PBegin, PCollection<PubsubMessage>> {
private final PubsubUnboundedSource transform;
- /** Builds an instance of this class from the overridden transform. */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public StreamingPubsubIORead(DataflowRunner runner, PubsubUnboundedSource transform) {
+ public StreamingPubsubIORead(PubsubUnboundedSource transform) {
this.transform = transform;
}
@@ -858,9 +842,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
public PCollection<PubsubMessage> expand(PBegin input) {
- return PCollection.<PubsubMessage>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
- .setCoder(new PubsubMessageWithAttributesCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ IsBounded.UNBOUNDED,
+ new PubsubMessageWithAttributesCoder());
}
@Override
@@ -1129,12 +1115,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
public PCollection<byte[]> expand(PBegin input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), isBounded);
- }
-
- @Override
- protected Coder<?> getDefaultOutputCoder() {
- return ByteArrayCoder.of();
+ input.getPipeline(), WindowingStrategy.globalDefault(), isBounded, ByteArrayCoder.of());
}
private static class Translator implements TransformTranslator<Impulse> {
@@ -1157,6 +1138,22 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
+ private static class StreamingUnboundedReadOverrideFactory<T>
+ implements PTransformOverrideFactory<PBegin, PCollection<T>, Read.Unbounded<T>> {
+ @Override
+ public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
+ AppliedPTransform<PBegin, PCollection<T>, Read.Unbounded<T>> transform) {
+ return PTransformReplacement.of(
+ transform.getPipeline().begin(), new StreamingUnboundedRead<>(transform.getTransform()));
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+
/**
* Specialized implementation for
* {@link org.apache.beam.sdk.io.Read.Unbounded Read.Unbounded} for the
@@ -1168,18 +1165,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
private static class StreamingUnboundedRead<T> extends PTransform<PBegin, PCollection<T>> {
private final UnboundedSource<T, ?> source;
- /** Builds an instance of this class from the overridden transform. */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public StreamingUnboundedRead(DataflowRunner runner, Read.Unbounded<T> transform) {
+ public StreamingUnboundedRead(Read.Unbounded<T> transform) {
this.source = transform.getSource();
}
@Override
- protected Coder<T> getDefaultOutputCoder() {
- return source.getOutputCoder();
- }
-
- @Override
public final PCollection<T> expand(PBegin input) {
source.validate();
@@ -1206,13 +1196,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
public final PCollection<ValueWithRecordId<T>> expand(PInput input) {
- return PCollection.<ValueWithRecordId<T>>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED);
- }
-
- @Override
- protected Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
- return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getOutputCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED,
+ ValueWithRecordId.ValueWithRecordIdCoder.of(source.getOutputCoder()));
}
@Override
@@ -1276,6 +1262,22 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
+ private static class StreamingBoundedReadOverrideFactory<T>
+ implements PTransformOverrideFactory<PBegin, PCollection<T>, Read.Bounded<T>> {
+ @Override
+ public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
+ AppliedPTransform<PBegin, PCollection<T>, Read.Bounded<T>> transform) {
+ return PTransformReplacement.of(
+ transform.getPipeline().begin(), new StreamingBoundedRead<>(transform.getTransform()));
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+
/**
* Specialized implementation for {@link org.apache.beam.sdk.io.Read.Bounded Read.Bounded} for the
* Dataflow runner in streaming mode.
@@ -1283,18 +1285,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
private static class StreamingBoundedRead<T> extends PTransform<PBegin, PCollection<T>> {
private final BoundedSource<T> source;
- /** Builds an instance of this class from the overridden transform. */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public StreamingBoundedRead(DataflowRunner runner, Read.Bounded<T> transform) {
+ public StreamingBoundedRead(Read.Bounded<T> transform) {
this.source = transform.getSource();
}
@Override
- protected Coder<T> getDefaultOutputCoder() {
- return source.getOutputCoder();
- }
-
- @Override
public final PCollection<T> expand(PBegin input) {
source.validate();
@@ -1404,15 +1399,19 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
static class CombineGroupedValues<K, InputT, OutputT>
extends PTransform<PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>> {
private final Combine.GroupedValues<K, InputT, OutputT> original;
+ private final Coder<KV<K, OutputT>> outputCoder;
- CombineGroupedValues(GroupedValues<K, InputT, OutputT> original) {
+ CombineGroupedValues(
+ GroupedValues<K, InputT, OutputT> original, Coder<KV<K, OutputT>> outputCoder) {
this.original = original;
+ this.outputCoder = outputCoder;
}
@Override
public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, Iterable<InputT>>> input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(),
+ outputCoder);
}
public Combine.GroupedValues<K, InputT, OutputT> getOriginalCombine() {
@@ -1433,7 +1432,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
- new CombineGroupedValues<>(transform.getTransform()));
+ new CombineGroupedValues<>(
+ transform.getTransform(),
+ PTransformReplacements.getSingletonMainOutput(transform).getCoder()));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index 8611d3c..9252c64 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.beam.runners.core.construction.ForwardingPTransform;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.DisplayData;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -49,7 +50,9 @@ public class PrimitiveParDoSingleFactory<InputT, OutputT>
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
- new ParDoSingle<>(transform.getTransform()));
+ new ParDoSingle<>(
+ transform.getTransform(),
+ PTransformReplacements.getSingletonMainOutput(transform).getCoder()));
}
/**
@@ -58,15 +61,18 @@ public class PrimitiveParDoSingleFactory<InputT, OutputT>
public static class ParDoSingle<InputT, OutputT>
extends ForwardingPTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
private final ParDo.SingleOutput<InputT, OutputT> original;
+ private final Coder<OutputT> outputCoder;
- private ParDoSingle(ParDo.SingleOutput<InputT, OutputT> original) {
+ private ParDoSingle(SingleOutput<InputT, OutputT> original, Coder<OutputT> outputCoder) {
this.original = original;
+ this.outputCoder = outputCoder;
}
@Override
public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(),
+ outputCoder);
}
public DoFn<InputT, OutputT> getFn() {
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 9a0bdf8..7a99f75 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -650,10 +650,13 @@ public class DataflowPipelineTranslatorTest implements Serializable {
// Fails here when attempting to construct a tuple with an unbound object.
return PCollectionTuple.of(sumTag, sum)
- .and(doneTag, PCollection.<Void>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- input.isBounded()));
+ .and(
+ doneTag,
+ PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ input.isBounded(),
+ VoidCoder.of()));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 9db73c6..55264a1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -72,7 +72,6 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOption
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
@@ -953,15 +952,11 @@ public class DataflowRunnerTest implements Serializable {
@Override
public PCollection<Integer> expand(PCollection<Integer> input) {
- return PCollection.<Integer>createPrimitiveOutputInternal(
+ return PCollection.createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
- input.isBounded());
- }
-
- @Override
- protected Coder<?> getDefaultOutputCoder(PCollection<Integer> input) {
- return input.getCoder();
+ input.isBounded(),
+ input.getCoder());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
index 737b408..c198ebf 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
@@ -36,7 +37,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
import org.junit.Before;
@@ -105,11 +105,11 @@ public class DataflowGroupByKeyTest {
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
public PCollection<KV<String, Integer>> expand(PBegin input) {
- return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.UNBOUNDED)
- .setTypeDescriptor(new TypeDescriptor<KV<String, Integer>>() {});
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED,
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
}
});
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
index dea96b9..e2e42a6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
@@ -21,6 +21,9 @@ import com.google.api.services.dataflow.Dataflow;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
@@ -33,7 +36,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
@@ -94,11 +96,11 @@ public class DataflowViewTest {
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
public PCollection<KV<String, Integer>> expand(PBegin input) {
- return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.UNBOUNDED)
- .setTypeDescriptor(new TypeDescriptor<KV<String, Integer>>() {});
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED,
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
}
})
.apply(view);
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index fdcea99..d485d25 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -27,7 +27,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -203,11 +202,9 @@ public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
@Override
public PCollection<T> expand(PBegin input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
- }
-
- @Override
- protected Coder<T> getDefaultOutputCoder() throws CannotProvideCoderException {
- return coder;
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED,
+ coder);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
index 0ecfa75..b236ce7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.spark.translation;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
@@ -32,12 +31,7 @@ public final class StorageLevelPTransform extends PTransform<PCollection<?>, PCo
public PCollection<String> expand(PCollection<?> input) {
return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
WindowingStrategy.globalDefault(),
- PCollection.IsBounded.BOUNDED);
+ PCollection.IsBounded.BOUNDED,
+ StringUtf8Coder.of());
}
-
- @Override
- public Coder getDefaultOutputCoder() {
- return StringUtf8Coder.of();
- }
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
deleted file mode 100644
index 299f5ba..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.util;
-
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * A {@link PTransform} wrapping another transform.
- */
-public class SinglePrimitiveOutputPTransform<T> extends PTransform<PInput, PCollection<T>> {
- private PTransform<PInput, PCollection<T>> transform;
-
- public SinglePrimitiveOutputPTransform(PTransform<PInput, PCollection<T>> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollection<T> expand(PInput input) {
- try {
- PCollection<T> collection = PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
- collection.setCoder(transform.getDefaultOutputCoder(input, collection));
- return collection;
- } catch (CannotProvideCoderException e) {
- throw new IllegalArgumentException(
- "Unable to infer a coder and no Coder was specified. "
- + "Please set a coder by invoking Create.withCoder() explicitly.",
- e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 6e6750d..574ba0c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -95,17 +95,14 @@ public class Read {
}
@Override
- protected Coder<T> getDefaultOutputCoder() {
- return source.getOutputCoder();
- }
-
- @Override
public final PCollection<T> expand(PBegin input) {
source.validate();
- return PCollection.<T>createPrimitiveOutputInternal(input.getPipeline(),
- WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
- .setCoder(getDefaultOutputCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ IsBounded.BOUNDED,
+ source.getOutputCoder());
}
/**
@@ -170,9 +167,11 @@ public class Read {
@Override
public final PCollection<T> expand(PBegin input) {
source.validate();
-
- return PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED);
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ IsBounded.UNBOUNDED,
+ source.getOutputCoder());
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index d13fcf1..45f4413 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -253,9 +253,8 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
@Override
public PCollection<T> expand(PBegin input) {
- return PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
- .setCoder(coder);
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, coder);
}
public Coder<T> getValueCoder() {
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 25d9c05..8247a58 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.transforms;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableLikeCoder;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -129,25 +128,12 @@ public class Flatten {
windowingStrategy = WindowingStrategy.globalDefault();
}
- return PCollection.<T>createPrimitiveOutputInternal(
+ return PCollection.createPrimitiveOutputInternal(
inputs.getPipeline(),
windowingStrategy,
- isBounded);
- }
-
- @Override
- protected Coder<?> getDefaultOutputCoder(PCollectionList<T> input)
- throws CannotProvideCoderException {
-
- // Take coder from first collection
- for (PCollection<T> pCollection : input.getAll()) {
- return pCollection.getCoder();
- }
-
- // No inputs
- throw new CannotProvideCoderException(
- this.getClass().getSimpleName() + " cannot provide a Coder for"
- + " empty " + PCollectionList.class.getSimpleName());
+ isBounded,
+ // Take coder from first collection. If there are none, will be left unspecified.
+ inputs.getAll().isEmpty() ? null : inputs.get(0).getCoder());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index 7516b25..3cb0d23 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -217,13 +217,11 @@ public class GroupByKey<K, V>
// merging windows as needed, using the windows assigned to the
// key/value input elements and the window merge operation of the
// window function associated with the input PCollection.
- return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
- updateWindowingStrategy(input.getWindowingStrategy()), input.isBounded());
- }
-
- @Override
- protected Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
- return getOutputKvCoder(input.getCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ updateWindowingStrategy(input.getWindowingStrategy()),
+ input.isBounded(),
+ getOutputKvCoder(input.getCoder()));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 0d03835..bc4f629 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -767,6 +767,8 @@ public class ParDo {
PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
input.getPipeline(),
TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
+ // TODO
+ Collections.<TupleTag<?>, Coder<?>>emptyMap(),
input.getWindowingStrategy(),
input.isBounded());
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 57dccbc..f6f3af5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -509,9 +509,8 @@ public class View {
@Override
public PCollection<ElemT> expand(PCollection<ElemT> input) {
- return PCollection.<ElemT>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
- .setCoder(input.getCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index a12be6d..af583e5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -484,7 +484,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
@Override
public PCollection<T> expand(PCollection<T> input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), updatedStrategy, input.isBounded());
+ input.getPipeline(), updatedStrategy, input.isBounded(), input.getCoder());
}
@Override