You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/09 00:32:34 UTC
[2/2] incubator-beam git commit: Provide equals and hashCode in
DelegateCoder
Provide equals and hashCode in DelegateCoder
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4d6a1020
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4d6a1020
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4d6a1020
Branch: refs/heads/master
Commit: 4d6a10203df3ba6d3923efc2c8776576de5b0d38
Parents: 90abca1
Author: Pei He <pe...@google.com>
Authored: Wed Jun 29 14:21:42 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 8 17:32:28 2016 -0700
----------------------------------------------------------------------
.../flink/streaming/StateSerializationTest.java | 20 ++++++++
.../apache/beam/sdk/coders/DelegateCoder.java | 26 +++++++++-
.../beam/sdk/coders/StringDelegateCoder.java | 51 +++++++++++++++++++-
.../beam/sdk/coders/DelegateCoderTest.java | 43 +++++++++++++++++
4 files changed, 136 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d6a1020/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java
index 44f4ecb..6635d32 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java
@@ -99,6 +99,16 @@ public class StateSerializationTest {
public Integer apply(int[] accumulator) {
return accumulator[0];
}
+
+ @Override
+ public boolean equals(Object o) {
+ return o != null && this.getClass() == o.getClass();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.getClass().hashCode();
+ }
},
new DelegateCoder.CodingFunction<Integer, int[]>() {
@Override
@@ -107,6 +117,16 @@ public class StateSerializationTest {
a[0] = value;
return a;
}
+
+ @Override
+ public boolean equals(Object o) {
+ return o != null && this.getClass() == o.getClass();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.getClass().hashCode();
+ }
});
private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d6a1020/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
index 905178b..385c149 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.coders;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import java.io.IOException;
@@ -42,7 +44,7 @@ import java.util.List;
* @param <T> The type of objects coded by this Coder.
* @param <IntermediateT> The type of objects a {@code T} will be converted to for coding.
*/
-public class DelegateCoder<T, IntermediateT> extends CustomCoder<T> {
+public final class DelegateCoder<T, IntermediateT> extends CustomCoder<T> {
/**
* A {@link DelegateCoder.CodingFunction CodingFunction<InputT, OutputT>} is a serializable
* function from {@code InputT} to {@code OutputT} that may throw any {@link Exception}.
@@ -101,8 +103,28 @@ public class DelegateCoder<T, IntermediateT> extends CustomCoder<T> {
}
@Override
+ public boolean equals(Object o) {
+ if (o == null || this.getClass() != o.getClass()) {
+ return false;
+ }
+ DelegateCoder<?, ?> that = (DelegateCoder<?, ?>) o;
+ return Objects.equal(this.coder, that.coder)
+ && Objects.equal(this.toFn, that.toFn)
+ && Objects.equal(this.fromFn, that.fromFn);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(this.coder, this.toFn, this.fromFn);
+ }
+
+ @Override
public String toString() {
- return "DelegateCoder(" + coder + ")";
+ return MoreObjects.toStringHelper(getClass())
+ .add("coder", coder)
+ .add("toFn", toFn)
+ .add("fromFn", fromFn)
+ .toString();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d6a1020/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
index 0e62311..c498a8a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
@@ -17,9 +17,14 @@
*/
package org.apache.beam.sdk.coders;
+import org.apache.beam.sdk.coders.DelegateCoder.CodingFunction;
import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
/**
* A {@link Coder} that wraps a {@code Coder<String>}
@@ -43,7 +48,7 @@ import java.lang.reflect.InvocationTargetException;
*
* @param <T> The type of objects coded.
*/
-public class StringDelegateCoder<T> extends DelegateCoder<T, String> {
+public final class StringDelegateCoder<T> extends CustomCoder<T> {
public static <T> StringDelegateCoder<T> of(Class<T> clazz) {
return new StringDelegateCoder<T>(clazz);
}
@@ -53,10 +58,11 @@ public class StringDelegateCoder<T> extends DelegateCoder<T, String> {
return "StringDelegateCoder(" + clazz + ")";
}
+ private final DelegateCoder<T, String> delegateCoder;
private final Class<T> clazz;
protected StringDelegateCoder(final Class<T> clazz) {
- super(StringUtf8Coder.of(),
+ delegateCoder = DelegateCoder.of(StringUtf8Coder.of(),
new CodingFunction<T, String>() {
@Override
public String apply(T input) {
@@ -77,6 +83,41 @@ public class StringDelegateCoder<T> extends DelegateCoder<T, String> {
this.clazz = clazz;
}
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || this.getClass() != o.getClass()) {
+ return false;
+ }
+ StringDelegateCoder<?> that = (StringDelegateCoder<?>) o;
+ return this.clazz.equals(that.clazz);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.clazz.hashCode();
+ }
+
+ @Override
+ public void encode(T value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ delegateCoder.encode(value, outStream, context);
+ }
+
+ @Override
+ public T decode(InputStream inStream, Context context) throws CoderException, IOException {
+ return delegateCoder.decode(inStream, context);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ delegateCoder.verifyDeterministic();
+ }
+
+ @Override
+ public Object structuralValue(T value) throws Exception {
+ return delegateCoder.structuralValue(value);
+ }
+
/**
* The encoding id is the fully qualified name of the encoded/decoded class.
*/
@@ -84,4 +125,10 @@ public class StringDelegateCoder<T> extends DelegateCoder<T, String> {
public String getEncodingId() {
return clazz.getName();
}
+
+ @Override
+ public Collection<String> getAllowedEncodings() {
+ return delegateCoder.getAllowedEncodings();
+ }
}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d6a1020/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
index b40457c..cf770aa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.coders;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
import org.apache.beam.sdk.testing.CoderProperties;
import com.google.common.collect.Lists;
@@ -52,12 +55,32 @@ public class DelegateCoderTest implements Serializable {
public List<Integer> apply(Set<Integer> input) {
return Lists.newArrayList(input);
}
+
+ @Override
+ public boolean equals(Object o) {
+ return o != null && this.getClass() == o.getClass();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.getClass().hashCode();
+ }
},
new DelegateCoder.CodingFunction<List<Integer>, Set<Integer>>() {
@Override
public Set<Integer> apply(List<Integer> input) {
return Sets.newHashSet(input);
}
+
+ @Override
+ public boolean equals(Object o) {
+ return o != null && this.getClass() == o.getClass();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.getClass().hashCode();
+ }
});
@Test
@@ -140,4 +163,24 @@ public class DelegateCoderTest implements Serializable {
trivialDelegateCoder,
TestAllowedEncodingsCoder.class.getName() + ":" + TEST_ALLOWED_ENCODING);
}
+
+ @Test
+ public void testCoderEquals() throws Exception {
+ DelegateCoder.CodingFunction<Integer, Integer> identityFn =
+ new DelegateCoder.CodingFunction<Integer, Integer>() {
+ @Override
+ public Integer apply(Integer input) {
+ return input;
+ }
+ };
+ Coder<Integer> varIntCoder1 = DelegateCoder.of(VarIntCoder.of(), identityFn, identityFn);
+ Coder<Integer> varIntCoder2 = DelegateCoder.of(VarIntCoder.of(), identityFn, identityFn);
+ Coder<Integer> bigEndianIntegerCoder =
+ DelegateCoder.of(BigEndianIntegerCoder.of(), identityFn, identityFn);
+
+ assertEquals(varIntCoder1, varIntCoder2);
+ assertEquals(varIntCoder1.hashCode(), varIntCoder2.hashCode());
+ assertNotEquals(varIntCoder1, bigEndianIntegerCoder);
+ assertNotEquals(varIntCoder1.hashCode(), bigEndianIntegerCoder.hashCode());
+ }
}