You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/05 03:37:33 UTC
[1/2] beam git commit: [BEAM-2734] Unbreaks some Dataflow
ValidatesRunner tests
Repository: beam
Updated Branches:
refs/heads/master df36bd9d7 -> aadbe36ff
[BEAM-2734] Unbreaks some Dataflow ValidatesRunner tests
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97b12d53
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97b12d53
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97b12d53
Branch: refs/heads/master
Commit: 97b12d53f956dfffd7594ae5b9433ac9df9c793a
Parents: df36bd9
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Aug 4 16:46:42 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Aug 4 17:49:41 2017 -0700
----------------------------------------------------------------------
.../construction/UnboundedReadFromBoundedSource.java | 11 ++++++++---
.../src/main/java/org/apache/beam/sdk/io/Source.java | 14 +++++++++++++-
2 files changed, 21 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/97b12d53/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 55f9519..24eb384 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -91,6 +91,11 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
}
@Override
+ protected Coder<T> getDefaultOutputCoder() {
+ return source.getDefaultOutputCoder();
+ }
+
+ @Override
public String getKindString() {
return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
}
@@ -161,14 +166,14 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
}
@Override
- public Coder<T> getOutputCoder() {
- return boundedSource.getOutputCoder();
+ public Coder<T> getDefaultOutputCoder() {
+ return boundedSource.getDefaultOutputCoder();
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
- return new CheckpointCoder<>(boundedSource.getOutputCoder());
+ return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/beam/blob/97b12d53/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
index 32a7270..872c135 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
@@ -64,11 +64,23 @@ public abstract class Source<T> implements Serializable, HasDisplayData {
/** @deprecated Override {@link #getOutputCoder()} instead. */
@Deprecated
public Coder<T> getDefaultOutputCoder() {
- throw new UnsupportedOperationException("Source needs to override getOutputCoder()");
+ // If the subclass doesn't override getDefaultOutputCoder(), hopefully it overrides the proper
+ // version - getOutputCoder(). Check that it does, before calling the method (if subclass
+ // doesn't override it, we'll call the default implementation and get infinite recursion).
+ try {
+ if (getClass().getMethod("getOutputCoder").getDeclaringClass().equals(Source.class)) {
+ throw new UnsupportedOperationException(
+ getClass() + " needs to override getOutputCoder().");
+ }
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException(e);
+ }
+ return getOutputCoder();
}
/** Returns the {@code Coder} to use for the data read from this source. */
public Coder<T> getOutputCoder() {
+ // Call the old method for compatibility.
return getDefaultOutputCoder();
}
[2/2] beam git commit: This closes #3693: [BEAM-2734] Unbreaks some
Dataflow ValidatesRunner tests
Posted by jk...@apache.org.
This closes #3693: [BEAM-2734] Unbreaks some Dataflow ValidatesRunner tests
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aadbe36f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aadbe36f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aadbe36f
Branch: refs/heads/master
Commit: aadbe36ff66b69e7eb55fb4977cb59b93b3269e2
Parents: df36bd9 97b12d5
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Aug 4 20:37:14 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Aug 4 20:37:14 2017 -0700
----------------------------------------------------------------------
.../construction/UnboundedReadFromBoundedSource.java | 11 ++++++++---
.../src/main/java/org/apache/beam/sdk/io/Source.java | 14 +++++++++++++-
2 files changed, 21 insertions(+), 4 deletions(-)
----------------------------------------------------------------------