You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/27 03:32:01 UTC

[1/2] beam git commit: Revert "Make WindowedValueCoder an Interface"

Repository: beam
Updated Branches:
  refs/heads/master bb12a56e6 -> 3bcbba121


Revert "Make WindowedValueCoder an Interface"

This reverts commit 691269565b537370c133598c2c32bd3f87eb8c29.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8b61969
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8b61969
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8b61969

Branch: refs/heads/master
Commit: d8b61969f6adc276741db95567af0cb98f2ff6f9
Parents: bb12a56
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 26 20:12:04 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Apr 26 20:12:04 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/util/WindowedValue.java | 74 +++++++++++---------
 1 file changed, 42 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d8b61969/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 5d692f2..6b75951 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -38,7 +38,6 @@ import java.util.Set;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CollectionCoder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -573,24 +572,35 @@ public abstract class WindowedValue<T> {
     return ValueOnlyWindowedValueCoder.of(valueCoder);
   }
 
-  /** Abstract class for {@code WindowedValue} coder. */
-  public interface WindowedValueCoder<T> extends Coder<WindowedValue<T>> {
+  /**
+   * Abstract class for {@code WindowedValue} coder.
+   */
+  public abstract static class WindowedValueCoder<T>
+      extends StandardCoder<WindowedValue<T>> {
+    final Coder<T> valueCoder;
+
+    WindowedValueCoder(Coder<T> valueCoder) {
+      this.valueCoder = checkNotNull(valueCoder);
+    }
+
     /**
-     * Returns the coder used to encode the values within this {@link WindowedValueCoder}.
+     * Returns the value coder.
      */
-    Coder<T> getValueCoder();
+    public Coder<T> getValueCoder() {
+      return valueCoder;
+    }
 
     /**
-     * Returns a new {@code WindowedValueCoder} that is a copy of this one, but with a different
-     * value coder.
+     * Returns a new {@code WindowedValueCoder} that is a copy of this one,
+     * but with a different value coder.
      */
-    <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder);
+    public abstract <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder);
   }
 
-  /** Coder for {@code WindowedValue}. */
-  public static class FullWindowedValueCoder<T> extends StandardCoder<WindowedValue<T>>
-      implements WindowedValueCoder<T> {
-    private final Coder<T> valueCoder;
+  /**
+   * Coder for {@code WindowedValue}.
+   */
+  public static class FullWindowedValueCoder<T> extends WindowedValueCoder<T> {
     private final Coder<? extends BoundedWindow> windowCoder;
     // Precompute and cache the coder for a list of windows.
     private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
@@ -614,7 +624,7 @@ public abstract class WindowedValue<T> {
 
     FullWindowedValueCoder(Coder<T> valueCoder,
                            Coder<? extends BoundedWindow> windowCoder) {
-      this.valueCoder = checkNotNull(valueCoder);
+      super(valueCoder);
       this.windowCoder = checkNotNull(windowCoder);
       // It's not possible to statically type-check correct use of the
       // windowCoder (we have to ensure externally that we only get
@@ -635,14 +645,6 @@ public abstract class WindowedValue<T> {
       return windowsCoder;
     }
 
-    /**
-     * Returns the value coder.
-     */
-    @Override
-    public Coder<T> getValueCoder() {
-      return valueCoder;
-    }
-
     @Override
     public <NewT> WindowedValueCoder<NewT> withValueCoder(Coder<NewT> valueCoder) {
       return new FullWindowedValueCoder<>(valueCoder, windowCoder);
@@ -715,23 +717,24 @@ public abstract class WindowedValue<T> {
    * Coder for {@code WindowedValue}.
    *
    * <p>A {@code ValueOnlyWindowedValueCoder} only encodes and decodes the value. It drops
-   * timestamps and windows when encoding, and uses a default timestamp and window when decoding.
+   * timestamp and windows for encoding, and uses defaults timestamp, and windows for decoding.
    */
-  public static class ValueOnlyWindowedValueCoder<T> extends CustomCoder<WindowedValue<T>>
-      implements WindowedValueCoder<T> {
-    public static <T> ValueOnlyWindowedValueCoder<T> of(Coder<T> valueCoder) {
+  public static class ValueOnlyWindowedValueCoder<T> extends WindowedValueCoder<T> {
+    public static <T> ValueOnlyWindowedValueCoder<T> of(
+        Coder<T> valueCoder) {
       return new ValueOnlyWindowedValueCoder<>(valueCoder);
     }
 
-    private final Coder<T> valueCoder;
-
-    ValueOnlyWindowedValueCoder(Coder<T> valueCoder) {
-      this.valueCoder = valueCoder;
+    @JsonCreator
+    public static ValueOnlyWindowedValueCoder<?> of(
+        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+        List<Coder<?>> components) {
+      checkArgument(components.size() == 1, "Expecting 1 component, got " + components.size());
+      return of(components.get(0));
     }
 
-    @Override
-    public Coder<T> getValueCoder() {
-      return valueCoder;
+    ValueOnlyWindowedValueCoder(Coder<T> valueCoder) {
+      super(valueCoder);
     }
 
     @Override
@@ -767,6 +770,13 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
+    public CloudObject initializeCloudObject() {
+      CloudObject result = CloudObject.forClass(getClass());
+      addBoolean(result, PropertyNames.IS_WRAPPER, true);
+      return result;
+    }
+
+    @Override
     public List<? extends Coder<?>> getCoderArguments() {
       return Arrays.<Coder<?>>asList(valueCoder);
     }


[2/2] beam git commit: This closes #2726

Posted by tg...@apache.org.
This closes #2726


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bcbba12
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bcbba12
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bcbba12

Branch: refs/heads/master
Commit: 3bcbba121c896556c90fc3cff860e97e70313511
Parents: bb12a56 d8b6196
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 26 20:25:20 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Apr 26 20:25:20 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/util/WindowedValue.java | 74 +++++++++++---------
 1 file changed, 42 insertions(+), 32 deletions(-)
----------------------------------------------------------------------