You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/04 01:02:27 UTC

[2/6] beam git commit: Requires specifying a Coder on PCollection.createPrimitiveOutputInternal

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index 4063d11..e8bf9b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -366,10 +366,15 @@ public class PCollection<T> extends PValueBase implements PValue {
   public static <T> PCollection<T> createPrimitiveOutputInternal(
       Pipeline pipeline,
       WindowingStrategy<?, ?> windowingStrategy,
-      IsBounded isBounded) {
-    return new PCollection<T>(pipeline)
+      IsBounded isBounded,
+      @Nullable Coder<T> coder) {
+    PCollection<T> res = new PCollection<T>(pipeline)
         .setWindowingStrategyInternal(windowingStrategy)
         .setIsBoundedInternal(isBounded);
+    if (coder != null) {
+      res.setCoder(coder);
+    }
+    return res;
   }
 
   private static class CoderOrFailure<T> {

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
index 793994f..9799d0e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -201,6 +202,7 @@ public class PCollectionTuple implements PInput, POutput {
   public static PCollectionTuple ofPrimitiveOutputsInternal(
       Pipeline pipeline,
       TupleTagList outputTags,
+      Map<TupleTag<?>, Coder<?>> coders,
       WindowingStrategy<?, ?> windowingStrategy,
       IsBounded isBounded) {
     Map<TupleTag<?>, PCollection<?>> pcollectionMap = new LinkedHashMap<>();
@@ -217,10 +219,10 @@ public class PCollectionTuple implements PInput, POutput {
       // erasure as the correct type. When a transform adds
       // elements to `outputCollection` they will be of type T.
       @SuppressWarnings("unchecked")
-      TypeDescriptor<Object> token = (TypeDescriptor<Object>) outputTag.getTypeDescriptor();
-      PCollection<Object> outputCollection = PCollection
-          .createPrimitiveOutputInternal(pipeline, windowingStrategy, isBounded)
-          .setTypeDescriptor(token);
+      PCollection outputCollection =
+          PCollection.createPrimitiveOutputInternal(
+                  pipeline, windowingStrategy, isBounded, coders.get(outputTag))
+              .setTypeDescriptor((TypeDescriptor) outputTag.getTypeDescriptor());
 
       pcollectionMap.put(outputTag, outputCollection);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 93650dd..12fe633 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
@@ -110,7 +111,7 @@ public class TransformHierarchyTest implements Serializable {
   public void emptyCompositeSucceeds() {
     PCollection<Long> created =
         PCollection.createPrimitiveOutputInternal(
-            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
     TransformHierarchy.Node node = hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1));
     hierarchy.setOutput(created);
     hierarchy.popNode();
@@ -139,7 +140,7 @@ public class TransformHierarchyTest implements Serializable {
   public void producingOwnAndOthersOutputsFails() {
     PCollection<Long> created =
         PCollection.createPrimitiveOutputInternal(
-            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
     hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1));
     hierarchy.setOutput(created);
     hierarchy.popNode();
@@ -147,8 +148,11 @@ public class TransformHierarchyTest implements Serializable {
 
     final PCollectionList<Long> appended =
         pcList.and(
-            PCollection.<Long>createPrimitiveOutputInternal(
-                    pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
+            PCollection.createPrimitiveOutputInternal(
+                    pipeline,
+                    WindowingStrategy.globalDefault(),
+                    IsBounded.BOUNDED,
+                    VarLongCoder.of())
                 .setName("prim"));
     hierarchy.pushNode(
         "AddPc",
@@ -171,7 +175,7 @@ public class TransformHierarchyTest implements Serializable {
   public void producingOwnOutputWithCompositeFails() {
     final PCollection<Long> comp =
         PCollection.createPrimitiveOutputInternal(
-            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
     PTransform<PBegin, PCollection<Long>> root =
         new PTransform<PBegin, PCollection<Long>>() {
           @Override
@@ -327,7 +331,7 @@ public class TransformHierarchyTest implements Serializable {
 
     PCollection<Long> created =
         PCollection.createPrimitiveOutputInternal(
-            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
 
     SingleOutput<Long, Long> pardo =
         ParDo.of(
@@ -340,7 +344,7 @@ public class TransformHierarchyTest implements Serializable {
 
     PCollection<Long> mapped =
         PCollection.createPrimitiveOutputInternal(
-            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarLongCoder.of());
 
     TransformHierarchy.Node compositeNode = hierarchy.pushNode("Create", begin, create);
     hierarchy.finishSpecifyingInput();
@@ -499,13 +503,11 @@ public class TransformHierarchyTest implements Serializable {
   @Test
   public void visitIsTopologicallyOrdered() {
     PCollection<String> one =
-        PCollection.<String>createPrimitiveOutputInternal(
-                pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
-            .setCoder(StringUtf8Coder.of());
+        PCollection.createPrimitiveOutputInternal(
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, StringUtf8Coder.of());
     final PCollection<Integer> two =
-        PCollection.<Integer>createPrimitiveOutputInternal(
-                pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-            .setCoder(VarIntCoder.of());
+        PCollection.createPrimitiveOutputInternal(
+            pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, VarIntCoder.of());
     final PDone done = PDone.in(pipeline);
     final TupleTag<String> oneTag = new TupleTag<String>() {};
     final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};
@@ -617,13 +619,14 @@ public class TransformHierarchyTest implements Serializable {
   @Test
   public void visitDoesNotVisitSkippedNodes() {
     PCollection<String> one =
-        PCollection.<String>createPrimitiveOutputInternal(
-                pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
-            .setCoder(StringUtf8Coder.of());
+        PCollection.createPrimitiveOutputInternal(
+                pipeline,
+                WindowingStrategy.globalDefault(),
+                IsBounded.BOUNDED,
+                StringUtf8Coder.of());
     final PCollection<Integer> two =
-        PCollection.<Integer>createPrimitiveOutputInternal(
-                pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-            .setCoder(VarIntCoder.of());
+        PCollection.createPrimitiveOutputInternal(
+                pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, VarIntCoder.of());
     final PDone done = PDone.in(pipeline);
     final TupleTag<String> oneTag = new TupleTag<String>() {};
     final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index e7b680a..bf06d78 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.EnumSet;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
@@ -85,10 +86,13 @@ public class TransformTreeTest {
       // Issue below: PCollection.createPrimitiveOutput should not be used
       // from within a composite transform.
       return PCollectionList.of(
-          Arrays.asList(result, PCollection.<String>createPrimitiveOutputInternal(
-              b.getPipeline(),
-              WindowingStrategy.globalDefault(),
-              result.isBounded())));
+          Arrays.asList(
+              result,
+              PCollection.createPrimitiveOutputInternal(
+                  b.getPipeline(),
+                  WindowingStrategy.globalDefault(),
+                  result.isBounded(),
+                  StringUtf8Coder.of())));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index a8cb843..5dbe176 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -228,7 +228,7 @@ public class FlattenTest implements Serializable {
   public void testFlattenNoListsNoCoder() {
     // not ValidatesRunner because it should fail at pipeline construction time anyhow.
     thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("cannot provide a Coder for empty");
+    thrown.expectMessage("Unable to return a default Coder");
 
     PCollectionList.<ClassWithoutCoder>empty(p)
         .apply(Flatten.<ClassWithoutCoder>pCollections());

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 8fcb4c0..a76714c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -70,7 +70,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Matcher;
 import org.joda.time.Duration;
@@ -423,11 +422,11 @@ public class GroupByKeyTest implements Serializable {
             new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
               @Override
               public PCollection<KV<String, Integer>> expand(PBegin input) {
-                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
-                        input.getPipeline(),
-                        WindowingStrategy.globalDefault(),
-                        PCollection.IsBounded.UNBOUNDED)
-                    .setTypeDescriptor(new TypeDescriptor<KV<String, Integer>>() {});
+                return PCollection.createPrimitiveOutputInternal(
+                    input.getPipeline(),
+                    WindowingStrategy.globalDefault(),
+                    PCollection.IsBounded.UNBOUNDED,
+                    KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
               }
             });
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index cdd03d9..bfb8b5a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -60,7 +60,6 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
@@ -1340,11 +1339,11 @@ public class ViewTest implements Serializable {
             new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
               @Override
               public PCollection<KV<String, Integer>> expand(PBegin input) {
-                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
-                        input.getPipeline(),
-                        WindowingStrategy.globalDefault(),
-                        PCollection.IsBounded.UNBOUNDED)
-                    .setTypeDescriptor(new TypeDescriptor<KV<String, Integer>>() {});
+                return PCollection.createPrimitiveOutputInternal(
+                    input.getPipeline(),
+                    WindowingStrategy.globalDefault(),
+                    PCollection.IsBounded.UNBOUNDED,
+                    KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
               }
             })
         .apply(view);

http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index 58e2bbd..33503b6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -59,9 +60,9 @@ public final class PCollectionTupleTest implements Serializable {
   @Test
   public void testOfThenHas() {
 
-    PCollection<Object> pCollection = PCollection.createPrimitiveOutputInternal(
-        pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
-    TupleTag<Object> tag = new TupleTag<>();
+    PCollection<Integer> pCollection = PCollection.createPrimitiveOutputInternal(
+        pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
+    TupleTag<Integer> tag = new TupleTag<>();
 
     assertTrue(PCollectionTuple.of(tag, pCollection).has(tag));
   }