You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/08/11 20:45:34 UTC

[beam] branch master updated: [BEAM-8125] Add verifyDeterministic test to SchemaCoderTest (#12521)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b751c17  [BEAM-8125] Add verifyDeterministic test to SchemaCoderTest (#12521)
b751c17 is described below

commit b751c170d4f675fd189714684360e5d06af25247
Author: Brian Hulette <bh...@google.com>
AuthorDate: Tue Aug 11 13:45:16 2020 -0700

    [BEAM-8125] Add verifyDeterministic test to SchemaCoderTest (#12521)
    
    * Add verifyDeterministic test to SchemaCoderTest
    
    * Use verifyDeterministic
---
 .../apache/beam/sdk/schemas/SchemaCoderTest.java   | 152 +++++++++++++++++----
 1 file changed, 123 insertions(+), 29 deletions(-)

diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaCoderTest.java
index a28f6eb..8d71180 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaCoderTest.java
@@ -18,18 +18,28 @@
 package org.apache.beam.sdk.schemas;
 
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 import com.google.auto.value.AutoValue;
+import java.time.Instant;
 import java.util.Collection;
 import java.util.Objects;
+import java.util.function.Supplier;
 import org.apache.avro.reflect.AvroSchema;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
 import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.hamcrest.Matchers;
 import org.joda.time.DateTime;
 import org.junit.Test;
 import org.junit.experimental.runners.Enclosed;
@@ -40,8 +50,13 @@ import org.junit.runners.Parameterized;
 /** Unit tests for {@link Schema}. */
 @RunWith(Enclosed.class)
 public class SchemaCoderTest {
+
   private static final Schema INT32_SCHEMA =
       Schema.builder().addInt32Field("a").addInt32Field("b").build();
+  public static final Schema LOGICAL_NANOS_SCHEMA =
+      Schema.of(Field.of("logicalNanos", FieldType.logicalType(new NanosInstant())));
+  public static final Schema FLOATING_POINT_SCHEMA =
+      Schema.of(Field.of("float", FieldType.FLOAT), Field.of("double", FieldType.DOUBLE));
 
   @RunWith(JUnit4.class)
   public static class SingletonTests {
@@ -226,54 +241,113 @@ public class SchemaCoderTest {
   }
 
   @RunWith(Parameterized.class)
-  @DefaultSchema(AutoValueSchema.class)
-  public static class SchemaProviderTests {
+  public static class ParameterizedTests {
+
     @Parameterized.Parameter(0)
     public SchemaCoder coder;
 
     @Parameterized.Parameter(1)
-    public ImmutableList<Object> testValues;
+    public ImmutableList<Supplier<Object>> testValues;
+
+    @Parameterized.Parameter(2)
+    public boolean expectDeterministic;
 
     @Parameterized.Parameters(name = "{index}: coder = {0}")
     public static Collection<Object[]> data() throws NoSuchSchemaException {
       return ImmutableList.of(
           new Object[] {
             SchemaCoder.of(INT32_SCHEMA),
-            ImmutableList.of(
-                Row.withSchema(INT32_SCHEMA).addValues(9001, 9002).build(),
-                Row.withSchema(INT32_SCHEMA).addValues(3, 4).build())
+            ImmutableList.<Supplier<Object>>of(
+                () -> Row.withSchema(INT32_SCHEMA).addValues(9001, 9002).build(),
+                () -> Row.withSchema(INT32_SCHEMA).addValues(3, 4).build()),
+            true
           },
           new Object[] {
             coderFrom(TypeDescriptor.of(SimpleAutoValue.class)),
-            ImmutableList.of(
-                SimpleAutoValue.of(
-                    "foo", 9001, 0L, new DateTime().withDate(1979, 3, 14).withTime(10, 30, 0, 0)),
-                SimpleAutoValue.of(
-                    "bar", 9002, 1L, new DateTime().withDate(1989, 3, 14).withTime(10, 30, 0, 0)))
+            ImmutableList.<Supplier<Object>>of(
+                () ->
+                    SimpleAutoValue.of(
+                        "foo",
+                        9001,
+                        0L,
+                        new DateTime().withDate(1979, 3, 14).withTime(10, 30, 0, 0)),
+                () ->
+                    SimpleAutoValue.of(
+                        "bar",
+                        9002,
+                        1L,
+                        new DateTime().withDate(1989, 3, 14).withTime(10, 30, 0, 0))),
+            true
           },
           new Object[] {
             coderFrom(TypeDescriptor.of(SimpleBean.class)),
-            ImmutableList.of(
-                new SimpleBean(
-                    "foo", 9001, 0L, new DateTime().withDate(1979, 3, 14).withTime(10, 30, 0, 0)),
-                new SimpleBean(
-                    "bar", 9002, 1L, new DateTime().withDate(1989, 3, 14).withTime(10, 30, 0, 0)))
+            ImmutableList.<Supplier<Object>>of(
+                () ->
+                    new SimpleBean(
+                        "foo",
+                        9001,
+                        0L,
+                        new DateTime().withDate(1979, 3, 14).withTime(10, 30, 0, 0)),
+                () ->
+                    new SimpleBean(
+                        "bar",
+                        9002,
+                        1L,
+                        new DateTime().withDate(1989, 3, 14).withTime(10, 30, 0, 0))),
+            true
           },
           new Object[] {
             coderFrom(TypeDescriptor.of(SimplePojo.class)),
-            ImmutableList.of(
-                new SimplePojo(
-                    "foo", 9001, 0L, new DateTime().withDate(1979, 3, 14).withTime(10, 30, 0, 0)),
-                new SimplePojo(
-                    "bar", 9002, 1L, new DateTime().withDate(1989, 3, 14).withTime(10, 30, 0, 0)))
+            ImmutableList.<Supplier<Object>>of(
+                () ->
+                    new SimplePojo(
+                        "foo",
+                        9001,
+                        0L,
+                        new DateTime().withDate(1979, 3, 14).withTime(10, 30, 0, 0)),
+                () ->
+                    new SimplePojo(
+                        "bar",
+                        9002,
+                        1L,
+                        new DateTime().withDate(1989, 3, 14).withTime(10, 30, 0, 0))),
+            true
           },
           new Object[] {
             coderFrom(TypeDescriptor.of(SimpleAvro.class)),
-            ImmutableList.of(
-                new SimpleAvro(
-                    "foo", 9001, 0L, new DateTime().withDate(1979, 3, 14).withTime(10, 30, 0, 0)),
-                new SimpleAvro(
-                    "bar", 9002, 1L, new DateTime().withDate(1989, 3, 14).withTime(10, 30, 0, 0)))
+            ImmutableList.<Supplier<Object>>of(
+                () ->
+                    new SimpleAvro(
+                        "foo",
+                        9001,
+                        0L,
+                        new DateTime().withDate(1979, 3, 14).withTime(10, 30, 0, 0)),
+                () ->
+                    new SimpleAvro(
+                        "bar",
+                        9002,
+                        1L,
+                        new DateTime().withDate(1989, 3, 14).withTime(10, 30, 0, 0))),
+            true
+          },
+          new Object[] {
+            RowCoder.of(LOGICAL_NANOS_SCHEMA),
+            ImmutableList.<Supplier<Object>>of(
+                () ->
+                    Row.withSchema(LOGICAL_NANOS_SCHEMA)
+                        .withFieldValue("logicalNanos", Instant.ofEpochMilli(9001))
+                        .build()),
+            true
+          },
+          new Object[] {
+            RowCoder.of(FLOATING_POINT_SCHEMA),
+            ImmutableList.<Supplier<Object>>of(
+                () ->
+                    Row.withSchema(FLOATING_POINT_SCHEMA)
+                        .withFieldValue("float", (float) 1.0)
+                        .withFieldValue("double", 2.0)
+                        .build()),
+            false
           });
     }
 
@@ -284,11 +358,31 @@ public class SchemaCoderTest {
 
     @Test
     public void coderConsistentWithEquals() throws Exception {
-      for (Object testValueA : testValues) {
-        for (Object testValueB : testValues) {
-          CoderProperties.coderConsistentWithEquals(coder, testValueA, testValueB);
+      for (Supplier<Object> testValueA : testValues) {
+        for (Supplier<Object> testValueB : testValues) {
+          CoderProperties.coderConsistentWithEquals(coder, testValueA.get(), testValueB.get());
         }
       }
     }
+
+    @Test
+    public void verifyDeterministic() throws Exception {
+      if (expectDeterministic) {
+        for (Supplier<Object> testValue : testValues) {
+          CoderProperties.coderDeterministic(coder, testValue.get(), testValue.get());
+        }
+      } else {
+        assertNonDeterministic(coder);
+      }
+    }
+  }
+
+  private static void assertNonDeterministic(SchemaCoder<?> coder) {
+    try {
+      coder.verifyDeterministic();
+      fail("Expected " + coder + " to be non-deterministic.");
+    } catch (NonDeterministicException e) {
+      assertThat(e.getReasons(), Matchers.iterableWithSize(1));
+    }
   }
 }