You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/13 16:08:01 UTC

[1/2] beam git commit: Test all Known Coders to ensure they Serialize via URN

Repository: beam
Updated Branches:
  refs/heads/master 21a2b96a1 -> 4c4fdf244


Test all Known Coders to ensure they Serialize via URN

Fix Coders to not encode VarIntCoders as VarLongCoders, and
VarLongCoders not as known coders.

Ensure that all known coders are tested.

Use getComponents rather than getCoderArguments for known coders.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/71204403
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/71204403
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/71204403

Branch: refs/heads/master
Commit: 71204403aa8b880549dc8a533efe46e67870dbd6
Parents: 21a2b96
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 12 16:31:53 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Apr 13 09:07:07 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/construction/Coders.java  |  26 ++--
 .../runners/core/construction/CodersTest.java   | 144 +++++++++++++------
 2 files changed, 122 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/71204403/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
index 7b96240..043a010 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
@@ -18,7 +18,10 @@
 
 package org.apache.beam.runners.core.construction;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableBiMap;
 import com.google.protobuf.Any;
@@ -32,7 +35,7 @@ import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
@@ -54,11 +57,12 @@ public class Coders {
   public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1";
 
   // The URNs for coders which are shared across languages
-  private static final BiMap<Class<? extends Coder>, String> KNOWN_CODER_URNS =
-      ImmutableBiMap.<Class<? extends Coder>, String>builder()
+  @VisibleForTesting
+  static final BiMap<Class<? extends StandardCoder>, String> KNOWN_CODER_URNS =
+      ImmutableBiMap.<Class<? extends StandardCoder>, String>builder()
           .put(ByteArrayCoder.class, "urn:beam:coders:bytes:0.1")
           .put(KvCoder.class, "urn:beam:coders:kv:0.1")
-          .put(VarIntCoder.class, "urn:beam:coders:varint:0.1")
+          .put(VarLongCoder.class, "urn:beam:coders:varint:0.1")
           .put(IntervalWindowCoder.class, "urn:beam:coders:interval_window:0.1")
           .put(IterableCoder.class, "urn:beam:coders:stream:0.1")
           .put(GlobalWindow.Coder.class, "urn:beam:coders:global_window:0.1")
@@ -75,11 +79,17 @@ public class Coders {
 
   private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents components)
       throws IOException {
+    checkArgument(
+        coder instanceof StandardCoder,
+        "A Known %s must implement %s, but %s of class %s does not",
+        Coder.class.getSimpleName(),
+        StandardCoder.class.getSimpleName(),
+        coder,
+        coder.getClass().getName());
+    StandardCoder<?> stdCoder = (StandardCoder<?>) coder;
     List<String> componentIds = new ArrayList<>();
-    if (coder.getCoderArguments() != null) {
-      for (Coder<?> componentCoder : coder.getCoderArguments()) {
-        componentIds.add(components.registerCoder(componentCoder));
-      }
+    for (Coder<?> componentCoder : stdCoder.getComponents()) {
+      componentIds.add(components.registerCoder(componentCoder));
     }
     return RunnerApi.Coder.newBuilder()
         .addAllComponentCoderIds(componentIds)

http://git-wip-us.apache.org/repos/asf/beam/blob/71204403/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
index 1a657b2..b2b9955 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -18,82 +18,146 @@
 
 package org.apache.beam.runners.core.construction;
 
+import static com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.hamcrest.Matchers;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 
-/**
- * Tests for {@link Coders}.
- */
-@RunWith(Parameterized.class)
+/** Tests for {@link Coders}. */
+@RunWith(Enclosed.class)
 public class CodersTest {
-  @Parameters(name = "{index}: {0}")
-  public static Iterable<Coder<?>> data() {
-    return ImmutableList.<Coder<?>>of(
-        StringUtf8Coder.of(),
-        IterableCoder.of(VarLongCoder.of()),
-        KvCoder.of(StringUtf8Coder.of(), ListCoder.of(VarLongCoder.of())),
-        SerializableCoder.of(Record.class),
-        new RecordCoder(),
-        KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class)));
+  private static final Set<StandardCoder<?>> KNOWN_CODERS =
+      ImmutableSet.<StandardCoder<?>>builder()
+          .add(ByteArrayCoder.of())
+          .add(KvCoder.of(VarLongCoder.of(), VarLongCoder.of()))
+          .add(VarLongCoder.of())
+          .add(IntervalWindowCoder.of())
+          .add(IterableCoder.of(ByteArrayCoder.of()))
+          .add(GlobalWindow.Coder.INSTANCE)
+          .add(
+              FullWindowedValueCoder.of(
+                  IterableCoder.of(VarLongCoder.of()), IntervalWindowCoder.of()))
+          .build();
+
+  /**
+   * Tests that all known coders are present in the parameters that will be used by
+   * {@link ToFromProtoTest}.
+   */
+  @RunWith(JUnit4.class)
+  public static class ValidateKnownCodersPresentTest {
+    @Test
+    public void validateKnownCoders() {
+      // Validates that every known coder in the Coders class is represented in a "Known Coder"
+      // tests, which demonstrates that they are serialized via components and specified URNs rather
+      // than java serialized
+      Set<Class<? extends StandardCoder>> knownCoderClasses = Coders.KNOWN_CODER_URNS.keySet();
+      Set<Class<? extends StandardCoder>> knownCoderTests = new HashSet<>();
+      for (StandardCoder<?> coder : KNOWN_CODERS) {
+        knownCoderTests.add(coder.getClass());
+      }
+      Set<Class<? extends StandardCoder>> missingKnownCoders = new HashSet<>(knownCoderClasses);
+      missingKnownCoders.removeAll(knownCoderTests);
+      checkState(
+          missingKnownCoders.isEmpty(),
+          "Missing validation of known coder %s in %s",
+          missingKnownCoders,
+          CodersTest.class.getSimpleName());
+    }
   }
 
-  @Parameter(0)
-  public Coder<?> coder;
 
-  @Test
-  public void toAndFromProto() throws Exception {
-    SdkComponents componentsBuilder = SdkComponents.create();
-    RunnerApi.Coder coderProto = Coders.toProto(coder, componentsBuilder);
+  /**
+   * Tests round-trip coder encodings for both known and unknown {@link Coder coders}.
+   */
+  @RunWith(Parameterized.class)
+  public static class ToFromProtoTest {
+    @Parameters(name = "{index}: {0}")
+    public static Iterable<Coder<?>> data() {
+      return ImmutableList.<Coder<?>>builder()
+          .addAll(KNOWN_CODERS)
+          .add(
+              StringUtf8Coder.of(),
+              SerializableCoder.of(Record.class),
+              new RecordCoder(),
+              KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class)))
+          .build();
+    }
 
-    Components encodedComponents = componentsBuilder.toComponents();
-    Coder<?> decodedCoder = Coders.fromProto(coderProto, encodedComponents);
-    assertThat(decodedCoder, Matchers.<Coder<?>>equalTo(coder));
-  }
+    @Parameter(0)
+    public Coder<?> coder;
 
-  static class Record implements Serializable {
-  }
+    @Test
+    public void toAndFromProto() throws Exception {
+      SdkComponents componentsBuilder = SdkComponents.create();
+      RunnerApi.Coder coderProto = Coders.toProto(coder, componentsBuilder);
 
-  private static class RecordCoder extends CustomCoder<Record> {
-    @Override
-    public void encode(Record value, OutputStream outStream, Context context)
-        throws CoderException, IOException {}
+      Components encodedComponents = componentsBuilder.toComponents();
+      Coder<?> decodedCoder = Coders.fromProto(coderProto, encodedComponents);
+      assertThat(decodedCoder, Matchers.<Coder<?>>equalTo(coder));
 
-    @Override
-    public Record decode(InputStream inStream, Context context) throws CoderException, IOException {
-      return new Record();
+      if (KNOWN_CODERS.contains(coder)) {
+        for (RunnerApi.Coder encodedCoder : encodedComponents.getCodersMap().values()) {
+          assertThat(
+              encodedCoder.getSpec().getSpec().getUrn(), not(equalTo(Coders.CUSTOM_CODER_URN)));
+        }
+      }
     }
 
-    @Override
-    public boolean equals(Object other) {
-      return other != null && getClass().equals(other.getClass());
-    }
+    static class Record implements Serializable {}
+
+    private static class RecordCoder extends CustomCoder<Record> {
+      @Override
+      public void encode(Record value, OutputStream outStream, Context context)
+          throws CoderException, IOException {}
+
+      @Override
+      public Record decode(InputStream inStream, Context context)
+          throws CoderException, IOException {
+        return new Record();
+      }
+
+      @Override
+      public boolean equals(Object other) {
+        return other != null && getClass().equals(other.getClass());
+      }
 
-    @Override
-    public int hashCode() {
-      return getClass().hashCode();
+      @Override
+      public int hashCode() {
+        return getClass().hashCode();
+      }
     }
   }
 }


[2/2] beam git commit: est all Known Coders to ensure they Serialize via URN

Posted by lc...@apache.org.
est all Known Coders to ensure they Serialize via URN

This closes #2517


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4c4fdf24
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4c4fdf24
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4c4fdf24

Branch: refs/heads/master
Commit: 4c4fdf24460df91b9fd423606cf67cddfe091453
Parents: 21a2b96 7120440
Author: Luke Cwik <lc...@google.com>
Authored: Thu Apr 13 09:07:42 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Apr 13 09:07:42 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/construction/Coders.java  |  26 ++--
 .../runners/core/construction/CodersTest.java   | 144 +++++++++++++------
 2 files changed, 122 insertions(+), 48 deletions(-)
----------------------------------------------------------------------