You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/04 01:02:27 UTC
[2/6] beam git commit: Requires specifying a Coder on
PCollection.createPrimitiveOutputInternal
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index 4063d11..e8bf9b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -366,10 +366,15 @@ public class PCollection<T> extends PValueBase implements PValue {
public static <T> PCollection<T> createPrimitiveOutputInternal(
Pipeline pipeline,
WindowingStrategy<?, ?> windowingStrategy,
- IsBounded isBounded) {
- return new PCollection<T>(pipeline)
+ IsBounded isBounded,
+ @Nullable Coder<T> coder) {
+ PCollection<T> res = new PCollection<T>(pipeline)
.setWindowingStrategyInternal(windowingStrategy)
.setIsBoundedInternal(isBounded);
+ if (coder != null) {
+ res.setCoder(coder);
+ }
+ return res;
}
private static class CoderOrFailure<T> {
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
index 793994f..9799d0e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -201,6 +202,7 @@ public class PCollectionTuple implements PInput, POutput {
public static PCollectionTuple ofPrimitiveOutputsInternal(
Pipeline pipeline,
TupleTagList outputTags,
+ Map<TupleTag<?>, Coder<?>> coders,
WindowingStrategy<?, ?> windowingStrategy,
IsBounded isBounded) {
Map<TupleTag<?>, PCollection<?>> pcollectionMap = new LinkedHashMap<>();
@@ -217,10 +219,10 @@ public class PCollectionTuple implements PInput, POutput {
// erasure as the correct type. When a transform adds
// elements to `outputCollection` they will be of type T.
@SuppressWarnings("unchecked")
- TypeDescriptor<Object> token = (TypeDescriptor<Object>) outputTag.getTypeDescriptor();
- PCollection<Object> outputCollection = PCollection
- .createPrimitiveOutputInternal(pipeline, windowingStrategy, isBounded)
- .setTypeDescriptor(token);
+ PCollection outputCollection =
+ PCollection.createPrimitiveOutputInternal(
+ pipeline, windowingStrategy, isBounded, coders.get(outputTag))
+ .setTypeDescriptor((TypeDescriptor) outputTag.getTypeDescriptor());
pcollectionMap.put(outputTag, outputCollection);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 93650dd..12fe633 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
@@ -110,7 +111,7 @@ public class TransformHierarchyTest implements Serializable {
public void emptyCompositeSucceeds() {
PCollection<Long> created =
PCollection.createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
TransformHierarchy.Node node = hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1));
hierarchy.setOutput(created);
hierarchy.popNode();
@@ -139,7 +140,7 @@ public class TransformHierarchyTest implements Serializable {
public void producingOwnAndOthersOutputsFails() {
PCollection<Long> created =
PCollection.createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1));
hierarchy.setOutput(created);
hierarchy.popNode();
@@ -147,8 +148,11 @@ public class TransformHierarchyTest implements Serializable {
final PCollectionList<Long> appended =
pcList.and(
- PCollection.<Long>createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
+ PCollection.createPrimitiveOutputInternal(
+ pipeline,
+ WindowingStrategy.globalDefault(),
+ IsBounded.BOUNDED,
+ VarLongCoder.of())
.setName("prim"));
hierarchy.pushNode(
"AddPc",
@@ -171,7 +175,7 @@ public class TransformHierarchyTest implements Serializable {
public void producingOwnOutputWithCompositeFails() {
final PCollection<Long> comp =
PCollection.createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
PTransform<PBegin, PCollection<Long>> root =
new PTransform<PBegin, PCollection<Long>>() {
@Override
@@ -327,7 +331,7 @@ public class TransformHierarchyTest implements Serializable {
PCollection<Long> created =
PCollection.createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
SingleOutput<Long, Long> pardo =
ParDo.of(
@@ -340,7 +344,7 @@ public class TransformHierarchyTest implements Serializable {
PCollection<Long> mapped =
PCollection.createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
TransformHierarchy.Node compositeNode = hierarchy.pushNode("Create", begin, create);
hierarchy.finishSpecifyingInput();
@@ -499,13 +503,11 @@ public class TransformHierarchyTest implements Serializable {
@Test
public void visitIsTopologicallyOrdered() {
PCollection<String> one =
- PCollection.<String>createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
- .setCoder(StringUtf8Coder.of());
+ PCollection.createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, StringUtf8Coder.of());
final PCollection<Integer> two =
- PCollection.<Integer>createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
- .setCoder(VarIntCoder.of());
+ PCollection.createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, VarIntCoder.of());
final PDone done = PDone.in(pipeline);
final TupleTag<String> oneTag = new TupleTag<String>() {};
final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};
@@ -617,13 +619,14 @@ public class TransformHierarchyTest implements Serializable {
@Test
public void visitDoesNotVisitSkippedNodes() {
PCollection<String> one =
- PCollection.<String>createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
- .setCoder(StringUtf8Coder.of());
+ PCollection.createPrimitiveOutputInternal(
+ pipeline,
+ WindowingStrategy.globalDefault(),
+ IsBounded.BOUNDED,
+ StringUtf8Coder.of());
final PCollection<Integer> two =
- PCollection.<Integer>createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
- .setCoder(VarIntCoder.of());
+ PCollection.createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, VarIntCoder.of());
final PDone done = PDone.in(pipeline);
final TupleTag<String> oneTag = new TupleTag<String>() {};
final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index e7b680a..bf06d78 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.EnumSet;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
@@ -85,10 +86,13 @@ public class TransformTreeTest {
// Issue below: PCollection.createPrimitiveOutput should not be used
// from within a composite transform.
return PCollectionList.of(
- Arrays.asList(result, PCollection.<String>createPrimitiveOutputInternal(
- b.getPipeline(),
- WindowingStrategy.globalDefault(),
- result.isBounded())));
+ Arrays.asList(
+ result,
+ PCollection.createPrimitiveOutputInternal(
+ b.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ result.isBounded(),
+ StringUtf8Coder.of())));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index a8cb843..5dbe176 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -228,7 +228,7 @@ public class FlattenTest implements Serializable {
public void testFlattenNoListsNoCoder() {
// not ValidatesRunner because it should fail at pipeline construction time anyhow.
thrown.expect(IllegalStateException.class);
- thrown.expectMessage("cannot provide a Coder for empty");
+ thrown.expectMessage("Unable to return a default Coder");
PCollectionList.<ClassWithoutCoder>empty(p)
.apply(Flatten.<ClassWithoutCoder>pCollections());
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 8fcb4c0..a76714c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -70,7 +70,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.hamcrest.Matcher;
import org.joda.time.Duration;
@@ -423,11 +422,11 @@ public class GroupByKeyTest implements Serializable {
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
public PCollection<KV<String, Integer>> expand(PBegin input) {
- return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.UNBOUNDED)
- .setTypeDescriptor(new TypeDescriptor<KV<String, Integer>>() {});
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED,
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
}
});
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index cdd03d9..bfb8b5a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -60,7 +60,6 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
@@ -1340,11 +1339,11 @@ public class ViewTest implements Serializable {
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
public PCollection<KV<String, Integer>> expand(PBegin input) {
- return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.UNBOUNDED)
- .setTypeDescriptor(new TypeDescriptor<KV<String, Integer>>() {});
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED,
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
}
})
.apply(view);
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index 58e2bbd..33503b6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -59,9 +60,9 @@ public final class PCollectionTupleTest implements Serializable {
@Test
public void testOfThenHas() {
- PCollection<Object> pCollection = PCollection.createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
- TupleTag<Object> tag = new TupleTag<>();
+ PCollection<Integer> pCollection = PCollection.createPrimitiveOutputInternal(
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
+ TupleTag<Integer> tag = new TupleTag<>();
assertTrue(PCollectionTuple.of(tag, pCollection).has(tag));
}