You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/05 03:37:33 UTC

[1/2] beam git commit: [BEAM-2734] Unbreaks some Dataflow ValidatesRunner tests

Repository: beam
Updated Branches:
  refs/heads/master df36bd9d7 -> aadbe36ff


[BEAM-2734] Unbreaks some Dataflow ValidatesRunner tests


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97b12d53
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97b12d53
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97b12d53

Branch: refs/heads/master
Commit: 97b12d53f956dfffd7594ae5b9433ac9df9c793a
Parents: df36bd9
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Aug 4 16:46:42 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Aug 4 17:49:41 2017 -0700

----------------------------------------------------------------------
 .../construction/UnboundedReadFromBoundedSource.java  | 11 ++++++++---
 .../src/main/java/org/apache/beam/sdk/io/Source.java  | 14 +++++++++++++-
 2 files changed, 21 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/97b12d53/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 55f9519..24eb384 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -91,6 +91,11 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
   }
 
   @Override
+  protected Coder<T> getDefaultOutputCoder() {
+    return source.getDefaultOutputCoder();
+  }
+
+  @Override
   public String getKindString() {
     return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
   }
@@ -161,14 +166,14 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
     }
 
     @Override
-    public Coder<T> getOutputCoder() {
-      return boundedSource.getOutputCoder();
+    public Coder<T> getDefaultOutputCoder() {
+      return boundedSource.getDefaultOutputCoder();
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     @Override
     public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
-      return new CheckpointCoder<>(boundedSource.getOutputCoder());
+      return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/beam/blob/97b12d53/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
index 32a7270..872c135 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
@@ -64,11 +64,23 @@ public abstract class Source<T> implements Serializable, HasDisplayData {
   /** @deprecated Override {@link #getOutputCoder()} instead. */
   @Deprecated
   public Coder<T> getDefaultOutputCoder() {
-    throw new UnsupportedOperationException("Source needs to override getOutputCoder()");
+    // If the subclass doesn't override getDefaultOutputCoder(), hopefully it overrides the proper
+    // version - getOutputCoder(). Check that it does, before calling the method (if subclass
+    // doesn't override it, we'll call the default implementation and get infinite recursion).
+    try {
+      if (getClass().getMethod("getOutputCoder").getDeclaringClass().equals(Source.class)) {
+        throw new UnsupportedOperationException(
+            getClass() + " needs to override getOutputCoder().");
+      }
+    } catch (NoSuchMethodException e) {
+      throw new RuntimeException(e);
+    }
+    return getOutputCoder();
   }
 
   /** Returns the {@code Coder} to use for the data read from this source. */
   public Coder<T> getOutputCoder() {
+    // Call the old method for compatibility.
     return getDefaultOutputCoder();
   }
 


[2/2] beam git commit: This closes #3693: [BEAM-2734] Unbreaks some Dataflow ValidatesRunner tests

Posted by jk...@apache.org.
This closes #3693: [BEAM-2734] Unbreaks some Dataflow ValidatesRunner tests


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aadbe36f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aadbe36f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aadbe36f

Branch: refs/heads/master
Commit: aadbe36ff66b69e7eb55fb4977cb59b93b3269e2
Parents: df36bd9 97b12d5
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Aug 4 20:37:14 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Aug 4 20:37:14 2017 -0700

----------------------------------------------------------------------
 .../construction/UnboundedReadFromBoundedSource.java  | 11 ++++++++---
 .../src/main/java/org/apache/beam/sdk/io/Source.java  | 14 +++++++++++++-
 2 files changed, 21 insertions(+), 4 deletions(-)
----------------------------------------------------------------------