You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2019/10/04 05:04:01 UTC
[beam] branch master updated: [BEAM-8111] Enable
CloudObjectsTest$DefaultCoders (#9446)
This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e14f3f8 [BEAM-8111] Enable CloudObjectsTest$DefaultCoders (#9446)
e14f3f8 is described below
commit e14f3f84ba0da6288db4795b92199e2905134a17
Author: Brian Hulette <bh...@google.com>
AuthorDate: Thu Oct 3 22:03:17 2019 -0700
[BEAM-8111] Enable CloudObjectsTest$DefaultCoders (#9446)
---
.../beam/runners/dataflow/util/CloudObjects.java | 2 +
.../runners/dataflow/util/CloudObjectsTest.java | 54 +++++++++++++++++-----
.../java/org/apache/beam/sdk/coders/RowCoder.java | 18 ++++++++
.../org/apache/beam/sdk/schemas/SchemaCoder.java | 48 +++++++++++++++++--
4 files changed, 108 insertions(+), 14 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
index ec51351..ef47e1f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
@@ -29,6 +29,7 @@ import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
@@ -49,6 +50,7 @@ public class CloudObjects {
ByteArrayCoder.class,
KvCoder.class,
VarLongCoder.class,
+ DoubleCoder.class,
IntervalWindowCoder.class,
IterableCoder.class,
Timer.Coder.class,
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index 215567e..26b721c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -17,11 +17,12 @@
*/
package org.apache.beam.runners.dataflow.util;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -32,7 +33,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.beam.runners.core.construction.ModelCoderRegistrar;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
@@ -50,7 +50,9 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.schemas.LogicalTypes;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
@@ -62,7 +64,9 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.Builder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
@@ -70,7 +74,22 @@ import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
/** Tests for {@link CloudObjects}. */
+@RunWith(Enclosed.class)
public class CloudObjectsTest {
+ private static final Schema TEST_SCHEMA =
+ Schema.builder()
+ .addBooleanField("bool")
+ .addByteField("int8")
+ .addInt16Field("int16")
+ .addInt32Field("int32")
+ .addInt64Field("int64")
+ .addFloatField("float")
+ .addDoubleField("double")
+ .addStringField("string")
+ .addArrayField("list_int32", FieldType.INT32)
+ .addLogicalTypeField("fixed_bytes", LogicalTypes.FixedBytes.of(4))
+ .build();
+
/** Tests that all of the Default Coders are tested. */
@RunWith(JUnit4.class)
public static class DefaultsPresentTest {
@@ -143,7 +162,8 @@ public class CloudObjectsTest {
CoGbkResultSchema.of(
ImmutableList.of(new TupleTag<Long>(), new TupleTag<byte[]>())),
UnionCoder.of(ImmutableList.of(VarLongCoder.of(), ByteArrayCoder.of()))))
- .add(SchemaCoder.of(Schema.builder().build()));
+ .add(SchemaCoder.of(Schema.builder().build()))
+ .add(SchemaCoder.of(TEST_SCHEMA));
for (Class<? extends Coder> atomicCoder :
DefaultCoderCloudObjectTranslatorRegistrar.KNOWN_ATOMIC_CODERS) {
dataBuilder.add(InstanceBuilder.ofType(atomicCoder).fromFactoryMethod("of").build());
@@ -177,21 +197,33 @@ public class CloudObjectsTest {
private static void checkPipelineProtoCoderIds(
Coder<?> coder, CloudObject cloudObject, SdkComponents sdkComponents) throws Exception {
- if (ModelCoderRegistrar.isKnownCoder(coder)) {
+ if (CloudObjects.DATAFLOW_KNOWN_CODERS.contains(coder.getClass())) {
assertFalse(cloudObject.containsKey(PropertyNames.PIPELINE_PROTO_CODER_ID));
} else {
assertTrue(cloudObject.containsKey(PropertyNames.PIPELINE_PROTO_CODER_ID));
assertEquals(
sdkComponents.registerCoder(coder),
- cloudObject.get(PropertyNames.PIPELINE_PROTO_CODER_ID));
+ ((CloudObject) cloudObject.get(PropertyNames.PIPELINE_PROTO_CODER_ID))
+ .get(PropertyNames.VALUE));
+ }
+ List<? extends Coder<?>> expectedComponents;
+ if (coder instanceof StructuredCoder) {
+ expectedComponents = ((StructuredCoder) coder).getComponents();
+ } else {
+ expectedComponents = coder.getCoderArguments();
}
- List<? extends Coder<?>> coderArguments = coder.getCoderArguments();
Object cloudComponentsObject = cloudObject.get(PropertyNames.COMPONENT_ENCODINGS);
- assertTrue(cloudComponentsObject instanceof List);
- List<CloudObject> cloudComponents = (List<CloudObject>) cloudComponentsObject;
- assertEquals(coderArguments.size(), cloudComponents.size());
- for (int i = 0; i < coderArguments.size(); i++) {
- checkPipelineProtoCoderIds(coderArguments.get(i), cloudComponents.get(i), sdkComponents);
+ List<CloudObject> cloudComponents;
+ if (cloudComponentsObject == null) {
+ cloudComponents = Lists.newArrayList();
+ } else {
+ assertThat(cloudComponentsObject, instanceOf(List.class));
+ cloudComponents = (List<CloudObject>) cloudComponentsObject;
+ }
+ assertEquals(expectedComponents.size(), cloudComponents.size());
+ for (int i = 0; i < expectedComponents.size(); i++) {
+ checkPipelineProtoCoderIds(
+ expectedComponents.get(i), cloudComponents.get(i), sdkComponents);
}
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
index f6cfe6a..79faa91 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -247,4 +248,21 @@ public class RowCoder extends CustomCoder<Row> {
String string = "Schema: " + schema + " UUID: " + id + " delegateCoder: " + getDelegateCoder();
return string;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RowCoder rowCoder = (RowCoder) o;
+ return schema.equals(rowCoder.schema);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schema);
+ }
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
index 0199534..9e06b44 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
@@ -20,12 +20,12 @@ package org.apache.beam.sdk.schemas;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Objects;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.values.Row;
/** {@link SchemaCoder} is used as the coder for types that have schemas registered. */
@@ -57,8 +57,7 @@ public class SchemaCoder<T> extends CustomCoder<T> {
/** Returns a {@link SchemaCoder} for {@link Row} classes. */
public static SchemaCoder<Row> of(Schema schema) {
- return new SchemaCoder<>(
- schema, SerializableFunctions.identity(), SerializableFunctions.identity());
+ return new SchemaCoder<>(schema, identity(), identity());
}
/** Returns the schema associated with this type. */
@@ -100,4 +99,47 @@ public class SchemaCoder<T> extends CustomCoder<T> {
public String toString() {
return "SchemaCoder: " + rowCoder.toString();
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SchemaCoder<?> that = (SchemaCoder<?>) o;
+ return rowCoder.equals(that.rowCoder)
+ && toRowFunction.equals(that.toRowFunction)
+ && fromRowFunction.equals(that.fromRowFunction);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(rowCoder, toRowFunction, fromRowFunction);
+ }
+
+ private static RowIdentity identity() {
+ return new RowIdentity();
+ }
+
+ private static class RowIdentity implements SerializableFunction<Row, Row> {
+ @Override
+ public Row apply(Row input) {
+ return input;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ return o != null && getClass() == o.getClass();
+ }
+ }
}