You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/15 17:33:13 UTC

[2/2] beam git commit: Add Create.TimestampedValues.withType

Add Create.TimestampedValues.withType

This brings parity between Create.Values and Create.TimestampedValues.

Update CreateTest to ensure that create coder inference would fail if it
ran.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/43825093
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/43825093
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/43825093

Branch: refs/heads/master
Commit: 43825093e233c25e5920eb2bacca98673e25de75
Parents: 7c71036
Author: Aviem Zur <av...@gmail.com>
Authored: Mon Mar 13 21:45:21 2017 +0200
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Mar 15 10:32:58 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Create.java  | 71 +++++++++++++++-----
 .../apache/beam/sdk/transforms/CreateTest.java  | 32 ++++++++-
 2 files changed, 82 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/43825093/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 4f746d0..ffc2d8d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -207,7 +207,10 @@ public class Create<T> {
    * Otherwise, use {@link Create.TimestampedValues#withCoder} to set the coder explicitly.
    */
   public static <T> TimestampedValues<T> timestamped(Iterable<TimestampedValue<T>> elems) {
-    return new TimestampedValues<>(elems, Optional.<Coder<T>>absent());
+    return new TimestampedValues<>(
+        elems,
+        Optional.<Coder<T>>absent(),
+        Optional.<TypeDescriptor<T>>absent());
   }
 
   /**
@@ -495,27 +498,32 @@ public class Create<T> {
      * is used.
      */
     public TimestampedValues<T> withCoder(Coder<T> coder) {
-      return new TimestampedValues<>(timestampedElements, Optional.<Coder<T>>of(coder));
+      return new TimestampedValues<>(timestampedElements, Optional.of(coder), typeDescriptor);
+    }
+
+    /**
+     * Returns a {@link Create.TimestampedValues} PTransform like this one that uses the given
+     * {@code TypeDescriptor<T>} to determine the {@code Coder} to use to decode each of the
+     * objects into a value of type {@code T}. Note that a default coder must be registered for the
+     * class described in the {@code TypeDescriptor<T>}.
+     *
+     * <p>By default, {@code Create.TimestampedValues} can automatically determine the {@code Coder}
+     * to use if all elements have the same non-parameterized run-time class, and a default coder is
+     * registered for that class. See {@link CoderRegistry} for details on how defaults are
+     * determined.
+     *
+     * <p>Note that for {@link Create.TimestampedValues} with no elements, the {@link VoidCoder} is
+     * used.
+     */
+    public TimestampedValues<T> withType(TypeDescriptor<T> type) {
+      return new TimestampedValues<>(timestampedElements, elementCoder, Optional.of(type));
     }
 
     @Override
     public PCollection<T> expand(PBegin input) {
       try {
-        Iterable<T> rawElements =
-            Iterables.transform(
-                timestampedElements,
-                new Function<TimestampedValue<T>, T>() {
-                  @Override
-                  public T apply(TimestampedValue<T> input) {
-                    return input.getValue();
-                  }
-                });
-        Coder<T> coder;
-        if (elementCoder.isPresent()) {
-          coder = elementCoder.get();
-        } else {
-          coder = getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), rawElements);
-        }
+        Coder<T> coder = getDefaultOutputCoder(input);
+
         PCollection<TimestampedValue<T>> intermediate = Pipeline.applyTransform(input,
             Create.of(timestampedElements).withCoder(TimestampedValueCoder.of(coder)));
 
@@ -533,12 +541,19 @@ public class Create<T> {
     /** The timestamped elements of the resulting PCollection. */
     private final transient Iterable<TimestampedValue<T>> timestampedElements;
 
+    /** The coder used to encode the values to and from a binary representation. */
     private final transient Optional<Coder<T>> elementCoder;
 
+    /** The value type. */
+    private final transient Optional<TypeDescriptor<T>> typeDescriptor;
+
     private TimestampedValues(
-        Iterable<TimestampedValue<T>> timestampedElements, Optional<Coder<T>> elementCoder) {
+        Iterable<TimestampedValue<T>> timestampedElements,
+        Optional<Coder<T>> elementCoder,
+        Optional<TypeDescriptor<T>> typeDescriptor) {
       this.timestampedElements = timestampedElements;
       this.elementCoder = elementCoder;
+      this.typeDescriptor = typeDescriptor;
     }
 
     private static class ConvertTimestamps<T> extends DoFn<TimestampedValue<T>, T> {
@@ -547,6 +562,26 @@ public class Create<T> {
         c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp());
       }
     }
+
+    @Override
+    public Coder<T> getDefaultOutputCoder(PBegin input) throws CannotProvideCoderException {
+      if (elementCoder.isPresent()) {
+        return elementCoder.get();
+      } else if (typeDescriptor.isPresent()) {
+        return input.getPipeline().getCoderRegistry().getDefaultCoder(typeDescriptor.get());
+      } else {
+        Iterable<T> rawElements =
+            Iterables.transform(
+                timestampedElements,
+                new Function<TimestampedValue<T>, T>() {
+                  @Override
+                  public T apply(TimestampedValue<T> input) {
+                    return input.getValue();
+                  }
+                });
+        return getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), rawElements);
+      }
+    }
   }
 
   private static <T> Coder<T> getDefaultCreateCoder(CoderRegistry registry, Iterable<T> elems)

http://git-wip-us.apache.org/repos/asf/beam/blob/43825093/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index af917cf..d21e502 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -303,6 +303,33 @@ public class CreateTest {
   }
 
   @Test
+  public void testCreateTimestampedDefaultOutputCoderUsingCoder() throws Exception {
+    Coder<Record> coder = new RecordCoder();
+    PBegin pBegin = PBegin.in(p);
+    Create.TimestampedValues<Record> values =
+        Create.timestamped(
+            TimestampedValue.of(new Record(), new Instant(0)),
+            TimestampedValue.<Record>of(new Record2(), new Instant(0)))
+            .withCoder(coder);
+    Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
+    assertThat(defaultCoder, equalTo(coder));
+  }
+
+  @Test
+  public void testCreateTimestampedDefaultOutputCoderUsingTypeDescriptor() throws Exception {
+    Coder<Record> coder = new RecordCoder();
+    p.getCoderRegistry().registerCoder(Record.class, coder);
+    PBegin pBegin = PBegin.in(p);
+    Create.TimestampedValues<Record> values =
+        Create.timestamped(
+            TimestampedValue.of(new Record(), new Instant(0)),
+            TimestampedValue.<Record>of(new Record2(), new Instant(0)))
+            .withType(new TypeDescriptor<Record>() {});
+    Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
+    assertThat(defaultCoder, equalTo(coder));
+  }
+
+  @Test
   @Category(RunnableOnService.class)
   public void testCreateWithVoidType() throws Exception {
     PCollection<Void> output = p.apply(Create.of((Void) null, (Void) null));
@@ -346,7 +373,7 @@ public class CreateTest {
     Coder<Record> coder = new RecordCoder();
     PBegin pBegin = PBegin.in(p);
     Create.Values<Record> values =
-        Create.of(new Record(), new Record(), new Record()).withCoder(coder);
+        Create.of(new Record(), new Record2()).withCoder(coder);
     Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
     assertThat(defaultCoder, equalTo(coder));
   }
@@ -357,8 +384,7 @@ public class CreateTest {
     p.getCoderRegistry().registerCoder(Record.class, coder);
     PBegin pBegin = PBegin.in(p);
     Create.Values<Record> values =
-        Create.of(new Record(), new Record(), new Record())
-            .withType(new TypeDescriptor<Record>() {});
+        Create.of(new Record(), new Record2()).withType(new TypeDescriptor<Record>() {});
     Coder<Record> defaultCoder = values.getDefaultOutputCoder(pBegin);
     assertThat(defaultCoder, equalTo(coder));
   }