You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/21 22:00:26 UTC

[1/2] beam git commit: This closes #2625

Repository: beam
Updated Branches:
  refs/heads/master 552ddb4ad -> bebee2a72


This closes #2625


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bebee2a7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bebee2a7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bebee2a7

Branch: refs/heads/master
Commit: bebee2a7284ca3c463ce69dee217e4dfa998be57
Parents: 552ddb4 f8a10ff
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 21 15:00:07 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 21 15:00:07 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/dataflow/internal/IsmFormat.java  |  2 +-
 .../src/main/java/org/apache/beam/sdk/coders/Coder.java   |  2 +-
 .../java/org/apache/beam/sdk/coders/DelegateCoder.java    | 10 ++++++++--
 .../src/main/java/org/apache/beam/sdk/coders/KvCoder.java |  2 +-
 .../java/org/apache/beam/sdk/coders/NullableCoder.java    |  2 +-
 .../java/org/apache/beam/sdk/coders/StandardCoder.java    |  2 +-
 .../org/apache/beam/sdk/coders/StringDelegateCoder.java   |  2 +-
 .../org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java    |  2 +-
 8 files changed, 15 insertions(+), 9 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Coder.structuralValue(T) should never throw

Posted by tg...@apache.org.
Coder.structuralValue(T) should never throw

In the worst case, encoding to a byte array should never fail due to IO.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f8a10ffd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f8a10ffd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f8a10ffd

Branch: refs/heads/master
Commit: f8a10ffd7af93705d8011d5287eb2225e540a1fd
Parents: 552ddb4
Author: Thomas Groh <tg...@google.com>
Authored: Thu Apr 20 20:00:07 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 21 15:00:07 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/dataflow/internal/IsmFormat.java  |  2 +-
 .../src/main/java/org/apache/beam/sdk/coders/Coder.java   |  2 +-
 .../java/org/apache/beam/sdk/coders/DelegateCoder.java    | 10 ++++++++--
 .../src/main/java/org/apache/beam/sdk/coders/KvCoder.java |  2 +-
 .../java/org/apache/beam/sdk/coders/NullableCoder.java    |  2 +-
 .../java/org/apache/beam/sdk/coders/StandardCoder.java    |  2 +-
 .../org/apache/beam/sdk/coders/StringDelegateCoder.java   |  2 +-
 .../org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java    |  2 +-
 8 files changed, 15 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f8a10ffd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index 6daddc6..33c27f8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -403,7 +403,7 @@ public class IsmFormat {
     }
 
     @Override
-    public Object structuralValue(IsmRecord<V> record) throws Exception {
+    public Object structuralValue(IsmRecord<V> record) {
       checkNotNull(record);
       checkState(record.getKeyComponents().size() == keyComponentCoders.size(),
           "Expected the number of key component coders %s "

http://git-wip-us.apache.org/repos/asf/beam/blob/f8a10ffd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
index 39efaf2..779961e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
@@ -198,7 +198,7 @@ public interface Coder<T> extends Serializable {
    *
    * <p>See also {@link #consistentWithEquals()}.
    */
-  Object structuralValue(T value) throws Exception;
+  Object structuralValue(T value);
 
   /**
    * Returns whether {@link #registerByteSizeObserver} cheap enough to

http://git-wip-us.apache.org/repos/asf/beam/blob/f8a10ffd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
index 1762243..7e1154a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
@@ -107,8 +107,14 @@ public final class DelegateCoder<T, IntermediateT> extends CustomCoder<T> {
    *         coder.
    */
   @Override
-  public Object structuralValue(T value) throws Exception {
-    return coder.structuralValue(toFn.apply(value));
+  public Object structuralValue(T value) {
+    try {
+      IntermediateT intermediate = toFn.apply(value);
+      return coder.structuralValue(intermediate);
+    } catch (Exception exn) {
+      throw new IllegalArgumentException(
+          "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/f8a10ffd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index 3c61bf6..fcb906c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -114,7 +114,7 @@ public class KvCoder<K, V> extends StandardCoder<KV<K, V>> {
   }
 
   @Override
-  public Object structuralValue(KV<K, V> kv) throws Exception {
+  public Object structuralValue(KV<K, V> kv) {
     if (consistentWithEquals()) {
       return kv;
     } else {

http://git-wip-us.apache.org/repos/asf/beam/blob/f8a10ffd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
index d1e1370..c92470a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
@@ -124,7 +124,7 @@ public class NullableCoder<T> extends StandardCoder<T> {
   }
 
   @Override
-  public Object structuralValue(@Nullable T value) throws Exception {
+  public Object structuralValue(@Nullable T value) {
     if (value == null) {
       return Optional.absent();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/f8a10ffd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
index d41694f..c67fe97 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
@@ -241,7 +241,7 @@ public abstract class StandardCoder<T> implements Coder<T> {
   }
 
   @Override
-  public Object structuralValue(T value) throws Exception {
+  public Object structuralValue(T value) {
     if (value != null && consistentWithEquals()) {
       return value;
     } else {

http://git-wip-us.apache.org/repos/asf/beam/blob/f8a10ffd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
index ad7e28c..d4b4ae8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
@@ -118,7 +118,7 @@ public final class StringDelegateCoder<T> extends CustomCoder<T> {
   }
 
   @Override
-  public Object structuralValue(T value) throws Exception {
+  public Object structuralValue(T value) {
     return delegateCoder.structuralValue(value);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f8a10ffd/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index 2043a4c..25ef7df 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -102,7 +102,7 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
 
   @SuppressWarnings("unchecked")
   @Override
-  public Object structuralValue(KafkaRecord<K, V> value) throws Exception {
+  public Object structuralValue(KafkaRecord<K, V> value) {
     if (consistentWithEquals()) {
       return value;
     } else {