You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/06 17:20:42 UTC

[36/50] [abbrv] incubator-beam git commit: Make WindowedValue like an interface, allow external implementations

Make WindowedValue like an interface, allow external implementations


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f79e2bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f79e2bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f79e2bd

Branch: refs/heads/runners-spark2
Commit: 9f79e2bd0384f2894b0fc6e45c2ab44136541fa3
Parents: a313e33
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jun 22 07:40:38 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 6 10:18:52 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/util/WindowedValue.java | 181 ++++++++++++-------
 1 file changed, 112 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f79e2bd/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index d78da41..d21b6c8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -59,9 +59,6 @@ import java.util.Set;
  */
 public abstract class WindowedValue<T> {
 
-  protected final T value;
-  protected final PaneInfo pane;
-
   /**
    * Returns a {@code WindowedValue} with the given value, timestamp,
    * and windows.
@@ -146,11 +143,6 @@ public abstract class WindowedValue<T> {
     return new ValueInEmptyWindows<T>(value, pane);
   }
 
-  private WindowedValue(T value, PaneInfo pane) {
-    this.value = value;
-    this.pane = checkNotNull(pane);
-  }
-
   /**
    * Returns a new {@code WindowedValue} that is a copy of this one, but with a different value,
    * which may have a new type {@code NewT}.
@@ -160,9 +152,7 @@ public abstract class WindowedValue<T> {
   /**
    * Returns the value of this {@code WindowedValue}.
    */
-  public T getValue() {
-    return value;
-  }
+  public abstract T getValue();
 
   /**
    * Returns the timestamp of this {@code WindowedValue}.
@@ -175,6 +165,11 @@ public abstract class WindowedValue<T> {
   public abstract Collection<? extends BoundedWindow> getWindows();
 
   /**
+   * Returns the pane of this {@code WindowedValue} in its window.
+   */
+  public abstract PaneInfo getPane();
+
+  /**
    * Returns a collection of {@link WindowedValue WindowedValues} identical to this one, except each
    * is in exactly one of the windows that this {@link WindowedValue} is in.
    */
@@ -186,18 +181,28 @@ public abstract class WindowedValue<T> {
     return windowedValues.build();
   }
 
-  /**
-   * Returns the pane of this {@code WindowedValue} in its window.
-   */
-  public PaneInfo getPane() {
-    return pane;
-  }
-
   @Override
-  public abstract boolean equals(Object o);
+  public boolean equals(Object other) {
+    if (!(other instanceof WindowedValue)) {
+      return false;
+    } else {
+      WindowedValue<?> that = (WindowedValue<?>) other;
+
+      // Compare timestamps first as they are most likely to differ.
+      // Also compare timestamps according to millis-since-epoch because otherwise expensive
+      // comparisons are made on their Chronology objects.
+      return this.getTimestamp().isEqual(that.getTimestamp())
+          && Objects.equals(this.getValue(), that.getValue())
+          && Objects.equals(this.getWindows(), that.getWindows())
+          && Objects.equals(this.getPane(), that.getPane());
+    }
+  }
 
   @Override
-  public abstract int hashCode();
+  public int hashCode() {
+    // Hash only the millis of the timestamp to be consistent with equals
+    return Objects.hash(getValue(), getTimestamp().getMillis(), getWindows(), getPane());
+  }
 
   @Override
   public abstract String toString();
@@ -206,11 +211,34 @@ public abstract class WindowedValue<T> {
       Collections.singletonList(GlobalWindow.INSTANCE);
 
   /**
+   * An abstract superclass for implementations of {@link WindowedValue} that stores the value
+   * and pane info.
+   */
+  private abstract static class SimpleWindowedValue<T> extends WindowedValue<T> {
+    private final T value;
+    private final PaneInfo pane;
+
+    protected SimpleWindowedValue(T value, PaneInfo pane) {
+      this.value = value;
+      this.pane = checkNotNull(pane);
+    }
+
+    @Override
+    public PaneInfo getPane() {
+      return pane;
+    }
+    @Override
+    public T getValue() {
+      return value;
+    }
+  }
+
+  /**
    * The abstract superclass of WindowedValue representations where
    * timestamp == MIN.
    */
   private abstract static class MinTimestampWindowedValue<T>
-      extends WindowedValue<T> {
+      extends SimpleWindowedValue<T> {
     public MinTimestampWindowedValue(T value, PaneInfo pane) {
       super(value, pane);
     }
@@ -232,8 +260,8 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
-    public <NewT> WindowedValue<NewT> withValue(NewT value) {
-      return new ValueInGlobalWindow<>(value, pane);
+    public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
+      return new ValueInGlobalWindow<>(newValue, getPane());
     }
 
     @Override
@@ -245,23 +273,23 @@ public abstract class WindowedValue<T> {
     public boolean equals(Object o) {
       if (o instanceof ValueInGlobalWindow) {
         ValueInGlobalWindow<?> that = (ValueInGlobalWindow<?>) o;
-        return Objects.equals(that.pane, this.pane)
-            && Objects.equals(that.value, this.value);
+        return Objects.equals(that.getPane(), this.getPane())
+            && Objects.equals(that.getValue(), this.getValue());
       } else {
-        return false;
+        return super.equals(o);
       }
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(value, pane);
+      return Objects.hash(getValue(), getPane());
     }
 
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(getClass())
-          .add("value", value)
-          .add("pane", pane)
+          .add("value", getValue())
+          .add("pane", getPane())
           .toString();
     }
   }
@@ -277,8 +305,8 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
-    public <NewT> WindowedValue<NewT> withValue(NewT value) {
-      return new ValueInEmptyWindows<>(value, pane);
+    public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
+      return new ValueInEmptyWindows<>(newValue, getPane());
     }
 
     @Override
@@ -290,23 +318,23 @@ public abstract class WindowedValue<T> {
     public boolean equals(Object o) {
       if (o instanceof ValueInEmptyWindows) {
         ValueInEmptyWindows<?> that = (ValueInEmptyWindows<?>) o;
-        return Objects.equals(that.pane, this.pane)
-            && Objects.equals(that.value, this.value);
+        return Objects.equals(that.getPane(), this.getPane())
+            && Objects.equals(that.getValue(), this.getValue());
       } else {
-        return false;
+        return super.equals(o);
       }
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(value, pane);
+      return Objects.hash(getValue(), getPane());
     }
 
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(getClass())
-          .add("value", value)
-          .add("pane", pane)
+          .add("value", getValue())
+          .add("pane", getPane())
           .toString();
     }
   }
@@ -316,8 +344,8 @@ public abstract class WindowedValue<T> {
    * timestamp is arbitrary.
    */
   private abstract static class TimestampedWindowedValue<T>
-      extends WindowedValue<T> {
-    protected final Instant timestamp;
+      extends SimpleWindowedValue<T> {
+    private final Instant timestamp;
 
     public TimestampedWindowedValue(T value,
                                     Instant timestamp,
@@ -345,8 +373,8 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
-    public <NewT> WindowedValue<NewT> withValue(NewT value) {
-      return new TimestampedValueInGlobalWindow<>(value, timestamp, pane);
+    public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
+      return new TimestampedValueInGlobalWindow<>(newValue, getTimestamp(), getPane());
     }
 
     @Override
@@ -359,25 +387,29 @@ public abstract class WindowedValue<T> {
       if (o instanceof TimestampedValueInGlobalWindow) {
         TimestampedValueInGlobalWindow<?> that =
             (TimestampedValueInGlobalWindow<?>) o;
-        return this.timestamp.isEqual(that.timestamp) // don't compare chronology objects
-            && Objects.equals(that.pane, this.pane)
-            && Objects.equals(that.value, this.value);
+        // Compare timestamps first as they are most likely to differ.
+        // Also compare timestamps according to millis-since-epoch because otherwise expensive
+        // comparisons are made on their Chronology objects.
+        return this.getTimestamp().isEqual(that.getTimestamp())
+            && Objects.equals(that.getPane(), this.getPane())
+            && Objects.equals(that.getValue(), this.getValue());
       } else {
-        return false;
+        return super.equals(o);
       }
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(value, pane, timestamp.getMillis());
+      // Hash only the millis of the timestamp to be consistent with equals
+      return Objects.hash(getValue(), getPane(), getTimestamp().getMillis());
     }
 
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(getClass())
-          .add("value", value)
-          .add("timestamp", timestamp)
-          .add("pane", pane)
+          .add("value", getValue())
+          .add("timestamp", getTimestamp())
+          .add("pane", getPane())
           .toString();
     }
   }
@@ -399,8 +431,8 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
-    public <NewT> WindowedValue<NewT> withValue(NewT value) {
-      return new TimestampedValueInSingleWindow<>(value, timestamp, window, pane);
+    public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
+      return new TimestampedValueInSingleWindow<>(newValue, getTimestamp(), window, getPane());
     }
 
     @Override
@@ -413,27 +445,31 @@ public abstract class WindowedValue<T> {
       if (o instanceof TimestampedValueInSingleWindow) {
         TimestampedValueInSingleWindow<?> that =
             (TimestampedValueInSingleWindow<?>) o;
-        return Objects.equals(that.value, this.value)
-            && this.timestamp.isEqual(that.timestamp) // don't compare chronology objects
-            && Objects.equals(that.pane, this.pane)
+        // Compare timestamps first as they are most likely to differ.
+        // Also compare timestamps according to millis-since-epoch because otherwise expensive
+        // comparisons are made on their Chronology objects.
+        return this.getTimestamp().isEqual(that.getTimestamp())
+            && Objects.equals(that.getValue(), this.getValue())
+            && Objects.equals(that.getPane(), this.getPane())
             && Objects.equals(that.window, this.window);
       } else {
-        return false;
+        return super.equals(o);
       }
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(value, timestamp.getMillis(), pane, window);
+      // Hash only the millis of the timestamp to be consistent with equals
+      return Objects.hash(getValue(), getTimestamp().getMillis(), getPane(), window);
     }
 
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(getClass())
-          .add("value", value)
-          .add("timestamp", timestamp)
+          .add("value", getValue())
+          .add("timestamp", getTimestamp())
           .add("window", window)
-          .add("pane", pane)
+          .add("pane", getPane())
           .toString();
     }
   }
@@ -456,8 +492,8 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
-    public <NewT> WindowedValue<NewT> withValue(NewT value) {
-      return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane);
+    public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
+      return new TimestampedValueInMultipleWindows<>(newValue, getTimestamp(), windows, getPane());
     }
 
     @Override
@@ -470,30 +506,37 @@ public abstract class WindowedValue<T> {
       if (o instanceof TimestampedValueInMultipleWindows) {
         TimestampedValueInMultipleWindows<?> that =
             (TimestampedValueInMultipleWindows<?>) o;
-        if (this.timestamp.isEqual(that.timestamp) // don't compare chronology objects
-            && Objects.equals(that.value, this.value)
-            && Objects.equals(that.pane, this.pane)) {
+        // Compare timestamps first as they are most likely to differ.
+        // Also compare timestamps according to millis-since-epoch because otherwise expensive
+        // comparisons are made on their Chronology objects.
+        if (this.getTimestamp().isEqual(that.getTimestamp())
+            && Objects.equals(that.getValue(), this.getValue())
+            && Objects.equals(that.getPane(), this.getPane())) {
           ensureWindowsAreASet();
           that.ensureWindowsAreASet();
           return that.windows.equals(this.windows);
+        } else {
+          return false;
         }
+      } else {
+        return super.equals(o);
       }
-      return false;
     }
 
     @Override
     public int hashCode() {
+      // Hash only the millis of the timestamp to be consistent with equals
       ensureWindowsAreASet();
-      return Objects.hash(value, timestamp.getMillis(), pane, windows);
+      return Objects.hash(getValue(), getTimestamp().getMillis(), getPane(), windows);
     }
 
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(getClass())
-          .add("value", value)
-          .add("timestamp", timestamp)
+          .add("value", getValue())
+          .add("timestamp", getTimestamp())
           .add("windows", windows)
-          .add("pane", pane)
+          .add("pane", getPane())
           .toString();
     }