You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/24 20:43:13 UTC
[1/3] incubator-beam git commit: Remove inheritance from
Create.TimestampedValues
Repository: incubator-beam
Updated Branches:
refs/heads/master 65db44ce6 -> 5535fc3fd
Remove inheritance from Create.TimestampedValues
Previously, Create.TimestampedValues extends Create.Values. This
actually resulted in confusing behavior in one runner because
Create.Values was overridden using `instanceof` checks, which
accidentally pulled in Create.TimestampedValues.
Now Create.TimeStampedValues is a simple composite transform.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f0e12587
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f0e12587
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f0e12587
Branch: refs/heads/master
Commit: f0e125871d9ca6da9c8597a2216c3b44b9e85345
Parents: dc98211
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 19 20:28:36 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 19 20:28:36 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/Create.java | 144 +++++++++++--------
1 file changed, 81 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0e12587/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 89e9985..0752113 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -263,53 +263,9 @@ public class Create<T> {
public Coder<T> getDefaultOutputCoder(PInput input) throws CannotProvideCoderException {
if (coder.isPresent()) {
return coder.get();
+ } else {
+ return getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), elems);
}
- // First try to deduce a coder using the types of the elements.
- Class<?> elementClazz = Void.class;
- for (T elem : elems) {
- if (elem == null) {
- continue;
- }
- Class<?> clazz = elem.getClass();
- if (elementClazz.equals(Void.class)) {
- elementClazz = clazz;
- } else if (!elementClazz.equals(clazz)) {
- // Elements are not the same type, require a user-specified coder.
- throw new CannotProvideCoderException(
- "Cannot provide coder for Create: The elements are not all of the same class.");
- }
- }
-
- if (elementClazz.getTypeParameters().length == 0) {
- try {
- @SuppressWarnings("unchecked") // elementClazz is a wildcard type
- Coder<T> coder = (Coder<T>) input.getPipeline().getCoderRegistry()
- .getDefaultCoder(TypeDescriptor.of(elementClazz));
- return coder;
- } catch (CannotProvideCoderException exc) {
- // let the next stage try
- }
- }
-
- // If that fails, try to deduce a coder using the elements themselves
- Optional<Coder<T>> coder = Optional.absent();
- for (T elem : elems) {
- Coder<T> c = input.getPipeline().getCoderRegistry().getDefaultCoder(elem);
- if (!coder.isPresent()) {
- coder = Optional.of(c);
- } else if (!Objects.equals(c, coder.get())) {
- throw new CannotProvideCoderException(
- "Cannot provide coder for elements of " + Create.class.getSimpleName() + ":"
- + " For their common class, no coder could be provided."
- + " Based on their values, they do not all default to the same Coder.");
- }
- }
-
- if (!coder.isPresent()) {
- throw new CannotProvideCoderException("Unable to infer a coder. Please register "
- + "a coder for ");
- }
- return coder.get();
}
/////////////////////////////////////////////////////////////////////////////
@@ -468,7 +424,7 @@ public class Create<T> {
* A {@code PTransform} that creates a {@code PCollection} whose elements have
* associated timestamps.
*/
- public static class TimestampedValues<T> extends Values<T> {
+ public static class TimestampedValues<T> extends PTransform<PInput, PCollection<T>>{
/**
* Returns a {@link Create.TimestampedValues} PTransform like this one that uses the given
* {@code Coder<T>} to decode each of the objects into a
@@ -482,17 +438,30 @@ public class Create<T> {
* <p>Note that for {@link Create.TimestampedValues with no elements}, the {@link VoidCoder}
* is used.
*/
- @Override
public TimestampedValues<T> withCoder(Coder<T> coder) {
- return new TimestampedValues<>(elems, Optional.<Coder<T>>of(coder));
+ return new TimestampedValues<>(timestampedElements, Optional.<Coder<T>>of(coder));
}
@Override
public PCollection<T> apply(PInput input) {
try {
- Coder<T> coder = getDefaultOutputCoder(input);
+ Iterable<T> rawElements =
+ Iterables.transform(
+ timestampedElements,
+ new Function<TimestampedValue<T>, T>() {
+ @Override
+ public T apply(TimestampedValue<T> input) {
+ return input.getValue();
+ }
+ });
+ Coder<T> coder;
+ if (elementCoder.isPresent()) {
+ coder = elementCoder.get();
+ } else {
+ coder = getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), rawElements);
+ }
PCollection<TimestampedValue<T>> intermediate = Pipeline.applyTransform(input,
- Create.of(elems).withCoder(TimestampedValueCoder.of(coder)));
+ Create.of(timestampedElements).withCoder(TimestampedValueCoder.of(coder)));
PCollection<T> output = intermediate.apply(ParDo.of(new ConvertTimestamps<T>()));
output.setCoder(coder);
@@ -506,18 +475,14 @@ public class Create<T> {
/////////////////////////////////////////////////////////////////////////////
/** The timestamped elements of the resulting PCollection. */
- private final transient Iterable<TimestampedValue<T>> elems;
-
- private TimestampedValues(Iterable<TimestampedValue<T>> elems,
- Optional<Coder<T>> coder) {
- super(
- Iterables.transform(elems, new Function<TimestampedValue<T>, T>() {
- @Override
- public T apply(TimestampedValue<T> input) {
- return input.getValue();
- }
- }), coder);
- this.elems = elems;
+ private final transient Iterable<TimestampedValue<T>> timestampedElements;
+
+ private final transient Optional<Coder<T>> elementCoder;
+
+ private TimestampedValues(
+ Iterable<TimestampedValue<T>> timestampedElements, Optional<Coder<T>> elementCoder) {
+ this.timestampedElements = timestampedElements;
+ this.elementCoder = elementCoder;
}
private static class ConvertTimestamps<T> extends DoFn<TimestampedValue<T>, T> {
@@ -527,4 +492,57 @@ public class Create<T> {
}
}
}
+
+ private static <T> Coder<T> getDefaultCreateCoder(CoderRegistry registry, Iterable<T> elems)
+ throws CannotProvideCoderException {
+ // First try to deduce a coder using the types of the elements.
+ Class<?> elementClazz = Void.class;
+ for (T elem : elems) {
+ if (elem == null) {
+ continue;
+ }
+ Class<?> clazz = elem.getClass();
+ if (elementClazz.equals(Void.class)) {
+ elementClazz = clazz;
+ } else if (!elementClazz.equals(clazz)) {
+ // Elements are not the same type, require a user-specified coder.
+ throw new CannotProvideCoderException(
+ String.format(
+ "Cannot provide coder for %s: The elements are not all of the same class.",
+ Create.class.getSimpleName()));
+ }
+ }
+
+ if (elementClazz.getTypeParameters().length == 0) {
+ try {
+ @SuppressWarnings("unchecked") // elementClazz is a wildcard type
+ Coder<T> coder = (Coder<T>) registry.getDefaultCoder(TypeDescriptor.of(elementClazz));
+ return coder;
+ } catch (CannotProvideCoderException exc) {
+ // let the next stage try
+ }
+ }
+
+ // If that fails, try to deduce a coder using the elements themselves
+ Optional<Coder<T>> coder = Optional.absent();
+ for (T elem : elems) {
+ Coder<T> c = registry.getDefaultCoder(elem);
+ if (!coder.isPresent()) {
+ coder = Optional.of(c);
+ } else if (!Objects.equals(c, coder.get())) {
+ throw new CannotProvideCoderException(
+ "Cannot provide coder for elements of "
+ + Create.class.getSimpleName()
+ + ":"
+ + " For their common class, no coder could be provided."
+ + " Based on their values, they do not all default to the same Coder.");
+ }
+ }
+
+ if (!coder.isPresent()) {
+ throw new CannotProvideCoderException(
+ "Unable to infer a coder. Please register " + "a coder for ");
+ }
+ return coder.get();
+ }
}
[2/3] incubator-beam git commit: Clarify comment in Create coder
inference
Posted by ke...@apache.org.
Clarify comment in Create coder inference
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ae80b5da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ae80b5da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ae80b5da
Branch: refs/heads/master
Commit: ae80b5da3b4ea15ceadf38ecc30789b5d3b332e0
Parents: f0e1258
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 24 13:41:52 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 24 13:41:52 2016 -0700
----------------------------------------------------------------------
.../core/src/main/java/org/apache/beam/sdk/transforms/Create.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ae80b5da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 0752113..538966d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -519,7 +519,7 @@ public class Create<T> {
Coder<T> coder = (Coder<T>) registry.getDefaultCoder(TypeDescriptor.of(elementClazz));
return coder;
} catch (CannotProvideCoderException exc) {
- // let the next stage try
+ // Can't get a coder from the class of the elements, try with the elements next
}
}
[3/3] incubator-beam git commit: This closes #362
Posted by ke...@apache.org.
This closes #362
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5535fc3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5535fc3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5535fc3f
Branch: refs/heads/master
Commit: 5535fc3fdfa9e11fab2fedd09402fdeecad05496
Parents: 65db44c ae80b5d
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 24 13:42:50 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 24 13:42:50 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/Create.java | 144 +++++++++++--------
1 file changed, 81 insertions(+), 63 deletions(-)
----------------------------------------------------------------------