You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/09 00:32:33 UTC

[1/2] incubator-beam git commit: Closes #559

Repository: incubator-beam
Updated Branches:
  refs/heads/master 90abca193 -> 9d7002545


Closes #559


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d700254
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d700254
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d700254

Branch: refs/heads/master
Commit: 9d700254530dea9a15772e744439ed80d6c25c49
Parents: 90abca1 4d6a102
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 8 17:32:28 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 8 17:32:28 2016 -0700

----------------------------------------------------------------------
 .../flink/streaming/StateSerializationTest.java | 20 ++++++++
 .../apache/beam/sdk/coders/DelegateCoder.java   | 26 +++++++++-
 .../beam/sdk/coders/StringDelegateCoder.java    | 51 +++++++++++++++++++-
 .../beam/sdk/coders/DelegateCoderTest.java      | 43 +++++++++++++++++
 4 files changed, 136 insertions(+), 4 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Provide equals and hashCode in DelegateCoder

Posted by dh...@apache.org.
Provide equals and hashCode in DelegateCoder


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4d6a1020
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4d6a1020
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4d6a1020

Branch: refs/heads/master
Commit: 4d6a10203df3ba6d3923efc2c8776576de5b0d38
Parents: 90abca1
Author: Pei He <pe...@google.com>
Authored: Wed Jun 29 14:21:42 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 8 17:32:28 2016 -0700

----------------------------------------------------------------------
 .../flink/streaming/StateSerializationTest.java | 20 ++++++++
 .../apache/beam/sdk/coders/DelegateCoder.java   | 26 +++++++++-
 .../beam/sdk/coders/StringDelegateCoder.java    | 51 +++++++++++++++++++-
 .../beam/sdk/coders/DelegateCoderTest.java      | 43 +++++++++++++++++
 4 files changed, 136 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d6a1020/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java
index 44f4ecb..6635d32 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java
@@ -99,6 +99,16 @@ public class StateSerializationTest {
       public Integer apply(int[] accumulator) {
         return accumulator[0];
       }
+
+      @Override
+      public boolean equals(Object o) {
+        return o != null && this.getClass() == o.getClass();
+      }
+
+     @Override
+      public int hashCode() {
+        return this.getClass().hashCode();
+      }
     },
     new DelegateCoder.CodingFunction<Integer, int[]>() {
       @Override
@@ -107,6 +117,16 @@ public class StateSerializationTest {
         a[0] = value;
         return a;
       }
+
+      @Override
+      public boolean equals(Object o) {
+        return o != null && this.getClass() == o.getClass();
+      }
+
+      @Override
+      public int hashCode() {
+        return this.getClass().hashCode();
+      }
     });
 
   private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d6a1020/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
index 905178b..385c149 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.coders;
 
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 
 import java.io.IOException;
@@ -42,7 +44,7 @@ import java.util.List;
  * @param <T> The type of objects coded by this Coder.
  * @param <IntermediateT> The type of objects a {@code T} will be converted to for coding.
  */
-public class DelegateCoder<T, IntermediateT> extends CustomCoder<T> {
+public final class DelegateCoder<T, IntermediateT> extends CustomCoder<T> {
   /**
    * A {@link DelegateCoder.CodingFunction CodingFunction&lt;InputT, OutputT&gt;} is a serializable
    * function from {@code InputT} to {@code OutputT} that may throw any {@link Exception}.
@@ -101,8 +103,28 @@ public class DelegateCoder<T, IntermediateT> extends CustomCoder<T> {
   }
 
   @Override
+  public boolean equals(Object o) {
+    if (o == null || this.getClass() != o.getClass()) {
+      return false;
+    }
+    DelegateCoder<?, ?> that = (DelegateCoder<?, ?>) o;
+    return Objects.equal(this.coder, that.coder)
+        && Objects.equal(this.toFn, that.toFn)
+        && Objects.equal(this.fromFn, that.fromFn);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(this.coder, this.toFn, this.fromFn);
+  }
+
+  @Override
   public String toString() {
-    return "DelegateCoder(" + coder + ")";
+    return MoreObjects.toStringHelper(getClass())
+        .add("coder", coder)
+        .add("toFn", toFn)
+        .add("fromFn", fromFn)
+        .toString();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d6a1020/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
index 0e62311..c498a8a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
@@ -17,9 +17,14 @@
  */
 package org.apache.beam.sdk.coders;
 
+import org.apache.beam.sdk.coders.DelegateCoder.CodingFunction;
 import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
 
 /**
  * A {@link Coder} that wraps a {@code Coder<String>}
@@ -43,7 +48,7 @@ import java.lang.reflect.InvocationTargetException;
  *
  * @param <T> The type of objects coded.
  */
-public class StringDelegateCoder<T> extends DelegateCoder<T, String> {
+public final class StringDelegateCoder<T> extends CustomCoder<T> {
   public static <T> StringDelegateCoder<T> of(Class<T> clazz) {
     return new StringDelegateCoder<T>(clazz);
   }
@@ -53,10 +58,11 @@ public class StringDelegateCoder<T> extends DelegateCoder<T, String> {
     return "StringDelegateCoder(" + clazz + ")";
   }
 
+  private final DelegateCoder<T, String> delegateCoder;
   private final Class<T> clazz;
 
   protected StringDelegateCoder(final Class<T> clazz) {
-    super(StringUtf8Coder.of(),
+    delegateCoder = DelegateCoder.of(StringUtf8Coder.of(),
       new CodingFunction<T, String>() {
         @Override
         public String apply(T input) {
@@ -77,6 +83,41 @@ public class StringDelegateCoder<T> extends DelegateCoder<T, String> {
     this.clazz = clazz;
   }
 
+  @Override
+  public boolean equals(Object o) {
+    if (o == null || this.getClass() != o.getClass()) {
+      return false;
+    }
+    StringDelegateCoder<?> that = (StringDelegateCoder<?>) o;
+    return this.clazz.equals(that.clazz);
+  }
+
+  @Override
+  public int hashCode() {
+    return this.clazz.hashCode();
+  }
+
+  @Override
+  public void encode(T value, OutputStream outStream, Context context)
+      throws CoderException, IOException {
+    delegateCoder.encode(value, outStream, context);
+  }
+
+  @Override
+  public T decode(InputStream inStream, Context context) throws CoderException, IOException {
+    return delegateCoder.decode(inStream, context);
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    delegateCoder.verifyDeterministic();
+  }
+
+  @Override
+  public Object structuralValue(T value) throws Exception {
+    return delegateCoder.structuralValue(value);
+  }
+
   /**
    * The encoding id is the fully qualified name of the encoded/decoded class.
    */
@@ -84,4 +125,10 @@ public class StringDelegateCoder<T> extends DelegateCoder<T, String> {
   public String getEncodingId() {
     return clazz.getName();
   }
+
+  @Override
+  public Collection<String> getAllowedEncodings() {
+    return delegateCoder.getAllowedEncodings();
+  }
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d6a1020/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
index b40457c..cf770aa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
 import org.apache.beam.sdk.testing.CoderProperties;
 
 import com.google.common.collect.Lists;
@@ -52,12 +55,32 @@ public class DelegateCoderTest implements Serializable {
         public List<Integer> apply(Set<Integer> input) {
           return Lists.newArrayList(input);
         }
+
+        @Override
+        public boolean equals(Object o) {
+          return o != null && this.getClass() == o.getClass();
+        }
+
+        @Override
+        public int hashCode() {
+          return this.getClass().hashCode();
+        }
       },
       new DelegateCoder.CodingFunction<List<Integer>, Set<Integer>>() {
         @Override
         public Set<Integer> apply(List<Integer> input) {
           return Sets.newHashSet(input);
         }
+
+        @Override
+        public boolean equals(Object o) {
+          return o != null && this.getClass() == o.getClass();
+        }
+
+        @Override
+        public int hashCode() {
+          return this.getClass().hashCode();
+        }
       });
 
   @Test
@@ -140,4 +163,24 @@ public class DelegateCoderTest implements Serializable {
         trivialDelegateCoder,
         TestAllowedEncodingsCoder.class.getName() + ":" + TEST_ALLOWED_ENCODING);
   }
+
+  @Test
+  public void testCoderEquals() throws Exception {
+    DelegateCoder.CodingFunction<Integer, Integer> identityFn =
+        new DelegateCoder.CodingFunction<Integer, Integer>() {
+          @Override
+          public Integer apply(Integer input) {
+            return input;
+          }
+        };
+    Coder<Integer> varIntCoder1 = DelegateCoder.of(VarIntCoder.of(), identityFn, identityFn);
+    Coder<Integer> varIntCoder2 = DelegateCoder.of(VarIntCoder.of(), identityFn, identityFn);
+    Coder<Integer> bigEndianIntegerCoder =
+        DelegateCoder.of(BigEndianIntegerCoder.of(), identityFn, identityFn);
+
+    assertEquals(varIntCoder1, varIntCoder2);
+    assertEquals(varIntCoder1.hashCode(), varIntCoder2.hashCode());
+    assertNotEquals(varIntCoder1, bigEndianIntegerCoder);
+    assertNotEquals(varIntCoder1.hashCode(), bigEndianIntegerCoder.hashCode());
+  }
 }