You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/27 03:32:01 UTC
[1/2] beam git commit: Revert "Make WindowedValueCoder an Interface"
Repository: beam
Updated Branches:
refs/heads/master bb12a56e6 -> 3bcbba121
Revert "Make WindowedValueCoder an Interface"
This reverts commit 691269565b537370c133598c2c32bd3f87eb8c29.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8b61969
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8b61969
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8b61969
Branch: refs/heads/master
Commit: d8b61969f6adc276741db95567af0cb98f2ff6f9
Parents: bb12a56
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 26 20:12:04 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Apr 26 20:12:04 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/util/WindowedValue.java | 74 +++++++++++---------
1 file changed, 42 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d8b61969/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 5d692f2..6b75951 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -38,7 +38,6 @@ import java.util.Set;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CollectionCoder;
-import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -573,24 +572,35 @@ public abstract class WindowedValue<T> {
return ValueOnlyWindowedValueCoder.of(valueCoder);
}
- /** Abstract class for {@code WindowedValue} coder. */
- public interface WindowedValueCoder<T> extends Coder<WindowedValue<T>> {
+ /**
+ * Abstract class for {@code WindowedValue} coder.
+ */
+ public abstract static class WindowedValueCoder<T>
+ extends StandardCoder<WindowedValue<T>> {
+ final Coder<T> valueCoder;
+
+ WindowedValueCoder(Coder<T> valueCoder) {
+ this.valueCoder = checkNotNull(valueCoder);
+ }
+
/**
- * Returns the coder used to encode the values within this {@link WindowedValueCoder}.
+ * Returns the value coder.
*/
- Coder<T> getValueCoder();
+ public Coder<T> getValueCoder() {
+ return valueCoder;
+ }
/**
- * Returns a new {@code WindowedValueCoder} that is a copy of this one, but with a different
- * value coder.
+ * Returns a new {@code WindowedValueCoder} that is a copy of this one,
+ * but with a different value coder.
*/
- <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder);
+ public abstract <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder);
}
- /** Coder for {@code WindowedValue}. */
- public static class FullWindowedValueCoder<T> extends StandardCoder<WindowedValue<T>>
- implements WindowedValueCoder<T> {
- private final Coder<T> valueCoder;
+ /**
+ * Coder for {@code WindowedValue}.
+ */
+ public static class FullWindowedValueCoder<T> extends WindowedValueCoder<T> {
private final Coder<? extends BoundedWindow> windowCoder;
// Precompute and cache the coder for a list of windows.
private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
@@ -614,7 +624,7 @@ public abstract class WindowedValue<T> {
FullWindowedValueCoder(Coder<T> valueCoder,
Coder<? extends BoundedWindow> windowCoder) {
- this.valueCoder = checkNotNull(valueCoder);
+ super(valueCoder);
this.windowCoder = checkNotNull(windowCoder);
// It's not possible to statically type-check correct use of the
// windowCoder (we have to ensure externally that we only get
@@ -635,14 +645,6 @@ public abstract class WindowedValue<T> {
return windowsCoder;
}
- /**
- * Returns the value coder.
- */
- @Override
- public Coder<T> getValueCoder() {
- return valueCoder;
- }
-
@Override
public <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder) {
return new FullWindowedValueCoder<>(valueCoder, windowCoder);
@@ -715,23 +717,24 @@ public abstract class WindowedValue<T> {
* Coder for {@code WindowedValue}.
*
* <p>A {@code ValueOnlyWindowedValueCoder} only encodes and decodes the value. It drops
- * timestamps and windows when encoding, and uses a default timestamp and window when decoding.
+ * timestamp and windows for encoding, and uses defaults timestamp, and windows for decoding.
*/
- public static class ValueOnlyWindowedValueCoder<T> extends CustomCoder<WindowedValue<T>>
- implements WindowedValueCoder<T> {
- public static <T> ValueOnlyWindowedValueCoder<T> of(Coder<T> valueCoder) {
+ public static class ValueOnlyWindowedValueCoder<T> extends WindowedValueCoder<T> {
+ public static <T> ValueOnlyWindowedValueCoder<T> of(
+ Coder<T> valueCoder) {
return new ValueOnlyWindowedValueCoder<>(valueCoder);
}
- private final Coder<T> valueCoder;
-
- ValueOnlyWindowedValueCoder(Coder<T> valueCoder) {
- this.valueCoder = valueCoder;
+ @JsonCreator
+ public static ValueOnlyWindowedValueCoder<?> of(
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+ List<Coder<?>> components) {
+ checkArgument(components.size() == 1, "Expecting 1 component, got " + components.size());
+ return of(components.get(0));
}
- @Override
- public Coder<T> getValueCoder() {
- return valueCoder;
+ ValueOnlyWindowedValueCoder(Coder<T> valueCoder) {
+ super(valueCoder);
}
@Override
@@ -767,6 +770,13 @@ public abstract class WindowedValue<T> {
}
@Override
+ public CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
+ addBoolean(result, PropertyNames.IS_WRAPPER, true);
+ return result;
+ }
+
+ @Override
public List<? extends Coder<?>> getCoderArguments() {
return Arrays.<Coder<?>>asList(valueCoder);
}
[2/2] beam git commit: This closes #2726
Posted by tg...@apache.org.
This closes #2726
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bcbba12
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bcbba12
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bcbba12
Branch: refs/heads/master
Commit: 3bcbba121c896556c90fc3cff860e97e70313511
Parents: bb12a56 d8b6196
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 26 20:25:20 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Apr 26 20:25:20 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/util/WindowedValue.java | 74 +++++++++++---------
1 file changed, 42 insertions(+), 32 deletions(-)
----------------------------------------------------------------------