You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/06 17:20:42 UTC
[36/50] [abbrv] incubator-beam git commit: Make WindowedValue like an
interface, allow external implementations
Make WindowedValue like an interface, allow external implementations
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f79e2bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f79e2bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f79e2bd
Branch: refs/heads/runners-spark2
Commit: 9f79e2bd0384f2894b0fc6e45c2ab44136541fa3
Parents: a313e33
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jun 22 07:40:38 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 6 10:18:52 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/util/WindowedValue.java | 181 ++++++++++++-------
1 file changed, 112 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f79e2bd/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index d78da41..d21b6c8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -59,9 +59,6 @@ import java.util.Set;
*/
public abstract class WindowedValue<T> {
- protected final T value;
- protected final PaneInfo pane;
-
/**
* Returns a {@code WindowedValue} with the given value, timestamp,
* and windows.
@@ -146,11 +143,6 @@ public abstract class WindowedValue<T> {
return new ValueInEmptyWindows<T>(value, pane);
}
- private WindowedValue(T value, PaneInfo pane) {
- this.value = value;
- this.pane = checkNotNull(pane);
- }
-
/**
* Returns a new {@code WindowedValue} that is a copy of this one, but with a different value,
* which may have a new type {@code NewT}.
@@ -160,9 +152,7 @@ public abstract class WindowedValue<T> {
/**
* Returns the value of this {@code WindowedValue}.
*/
- public T getValue() {
- return value;
- }
+ public abstract T getValue();
/**
* Returns the timestamp of this {@code WindowedValue}.
@@ -175,6 +165,11 @@ public abstract class WindowedValue<T> {
public abstract Collection<? extends BoundedWindow> getWindows();
/**
+ * Returns the pane of this {@code WindowedValue} in its window.
+ */
+ public abstract PaneInfo getPane();
+
+ /**
* Returns a collection of {@link WindowedValue WindowedValues} identical to this one, except each
* is in exactly one of the windows that this {@link WindowedValue} is in.
*/
@@ -186,18 +181,28 @@ public abstract class WindowedValue<T> {
return windowedValues.build();
}
- /**
- * Returns the pane of this {@code WindowedValue} in its window.
- */
- public PaneInfo getPane() {
- return pane;
- }
-
@Override
- public abstract boolean equals(Object o);
+ public boolean equals(Object other) {
+ if (!(other instanceof WindowedValue)) {
+ return false;
+ } else {
+ WindowedValue<?> that = (WindowedValue<?>) other;
+
+ // Compare timestamps first as they are most likely to differ.
+ // Also compare timestamps according to millis-since-epoch because otherwise expensive
+ // comparisons are made on their Chronology objects.
+ return this.getTimestamp().isEqual(that.getTimestamp())
+ && Objects.equals(this.getValue(), that.getValue())
+ && Objects.equals(this.getWindows(), that.getWindows())
+ && Objects.equals(this.getPane(), that.getPane());
+ }
+ }
@Override
- public abstract int hashCode();
+ public int hashCode() {
+ // Hash only the millis of the timestamp to be consistent with equals
+ return Objects.hash(getValue(), getTimestamp().getMillis(), getWindows(), getPane());
+ }
@Override
public abstract String toString();
@@ -206,11 +211,34 @@ public abstract class WindowedValue<T> {
Collections.singletonList(GlobalWindow.INSTANCE);
/**
+ * An abstract superclass for implementations of {@link WindowedValue} that stores the value
+ * and pane info.
+ */
+ private abstract static class SimpleWindowedValue<T> extends WindowedValue<T> {
+ private final T value;
+ private final PaneInfo pane;
+
+ protected SimpleWindowedValue(T value, PaneInfo pane) {
+ this.value = value;
+ this.pane = checkNotNull(pane);
+ }
+
+ @Override
+ public PaneInfo getPane() {
+ return pane;
+ }
+ @Override
+ public T getValue() {
+ return value;
+ }
+ }
+
+ /**
* The abstract superclass of WindowedValue representations where
* timestamp == MIN.
*/
private abstract static class MinTimestampWindowedValue<T>
- extends WindowedValue<T> {
+ extends SimpleWindowedValue<T> {
public MinTimestampWindowedValue(T value, PaneInfo pane) {
super(value, pane);
}
@@ -232,8 +260,8 @@ public abstract class WindowedValue<T> {
}
@Override
- public <NewT> WindowedValue<NewT> withValue(NewT value) {
- return new ValueInGlobalWindow<>(value, pane);
+ public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
+ return new ValueInGlobalWindow<>(newValue, getPane());
}
@Override
@@ -245,23 +273,23 @@ public abstract class WindowedValue<T> {
public boolean equals(Object o) {
if (o instanceof ValueInGlobalWindow) {
ValueInGlobalWindow<?> that = (ValueInGlobalWindow<?>) o;
- return Objects.equals(that.pane, this.pane)
- && Objects.equals(that.value, this.value);
+ return Objects.equals(that.getPane(), this.getPane())
+ && Objects.equals(that.getValue(), this.getValue());
} else {
- return false;
+ return super.equals(o);
}
}
@Override
public int hashCode() {
- return Objects.hash(value, pane);
+ return Objects.hash(getValue(), getPane());
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("value", value)
- .add("pane", pane)
+ .add("value", getValue())
+ .add("pane", getPane())
.toString();
}
}
@@ -277,8 +305,8 @@ public abstract class WindowedValue<T> {
}
@Override
- public <NewT> WindowedValue<NewT> withValue(NewT value) {
- return new ValueInEmptyWindows<>(value, pane);
+ public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
+ return new ValueInEmptyWindows<>(newValue, getPane());
}
@Override
@@ -290,23 +318,23 @@ public abstract class WindowedValue<T> {
public boolean equals(Object o) {
if (o instanceof ValueInEmptyWindows) {
ValueInEmptyWindows<?> that = (ValueInEmptyWindows<?>) o;
- return Objects.equals(that.pane, this.pane)
- && Objects.equals(that.value, this.value);
+ return Objects.equals(that.getPane(), this.getPane())
+ && Objects.equals(that.getValue(), this.getValue());
} else {
- return false;
+ return super.equals(o);
}
}
@Override
public int hashCode() {
- return Objects.hash(value, pane);
+ return Objects.hash(getValue(), getPane());
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("value", value)
- .add("pane", pane)
+ .add("value", getValue())
+ .add("pane", getPane())
.toString();
}
}
@@ -316,8 +344,8 @@ public abstract class WindowedValue<T> {
* timestamp is arbitrary.
*/
private abstract static class TimestampedWindowedValue<T>
- extends WindowedValue<T> {
- protected final Instant timestamp;
+ extends SimpleWindowedValue<T> {
+ private final Instant timestamp;
public TimestampedWindowedValue(T value,
Instant timestamp,
@@ -345,8 +373,8 @@ public abstract class WindowedValue<T> {
}
@Override
- public <NewT> WindowedValue<NewT> withValue(NewT value) {
- return new TimestampedValueInGlobalWindow<>(value, timestamp, pane);
+ public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
+ return new TimestampedValueInGlobalWindow<>(newValue, getTimestamp(), getPane());
}
@Override
@@ -359,25 +387,29 @@ public abstract class WindowedValue<T> {
if (o instanceof TimestampedValueInGlobalWindow) {
TimestampedValueInGlobalWindow<?> that =
(TimestampedValueInGlobalWindow<?>) o;
- return this.timestamp.isEqual(that.timestamp) // don't compare chronology objects
- && Objects.equals(that.pane, this.pane)
- && Objects.equals(that.value, this.value);
+ // Compare timestamps first as they are most likely to differ.
+ // Also compare timestamps according to millis-since-epoch because otherwise expensive
+ // comparisons are made on their Chronology objects.
+ return this.getTimestamp().isEqual(that.getTimestamp())
+ && Objects.equals(that.getPane(), this.getPane())
+ && Objects.equals(that.getValue(), this.getValue());
} else {
- return false;
+ return super.equals(o);
}
}
@Override
public int hashCode() {
- return Objects.hash(value, pane, timestamp.getMillis());
+ // Hash only the millis of the timestamp to be consistent with equals
+ return Objects.hash(getValue(), getPane(), getTimestamp().getMillis());
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("value", value)
- .add("timestamp", timestamp)
- .add("pane", pane)
+ .add("value", getValue())
+ .add("timestamp", getTimestamp())
+ .add("pane", getPane())
.toString();
}
}
@@ -399,8 +431,8 @@ public abstract class WindowedValue<T> {
}
@Override
- public <NewT> WindowedValue<NewT> withValue(NewT value) {
- return new TimestampedValueInSingleWindow<>(value, timestamp, window, pane);
+ public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
+ return new TimestampedValueInSingleWindow<>(newValue, getTimestamp(), window, getPane());
}
@Override
@@ -413,27 +445,31 @@ public abstract class WindowedValue<T> {
if (o instanceof TimestampedValueInSingleWindow) {
TimestampedValueInSingleWindow<?> that =
(TimestampedValueInSingleWindow<?>) o;
- return Objects.equals(that.value, this.value)
- && this.timestamp.isEqual(that.timestamp) // don't compare chronology objects
- && Objects.equals(that.pane, this.pane)
+ // Compare timestamps first as they are most likely to differ.
+ // Also compare timestamps according to millis-since-epoch because otherwise expensive
+ // comparisons are made on their Chronology objects.
+ return this.getTimestamp().isEqual(that.getTimestamp())
+ && Objects.equals(that.getValue(), this.getValue())
+ && Objects.equals(that.getPane(), this.getPane())
&& Objects.equals(that.window, this.window);
} else {
- return false;
+ return super.equals(o);
}
}
@Override
public int hashCode() {
- return Objects.hash(value, timestamp.getMillis(), pane, window);
+ // Hash only the millis of the timestamp to be consistent with equals
+ return Objects.hash(getValue(), getTimestamp().getMillis(), getPane(), window);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("value", value)
- .add("timestamp", timestamp)
+ .add("value", getValue())
+ .add("timestamp", getTimestamp())
.add("window", window)
- .add("pane", pane)
+ .add("pane", getPane())
.toString();
}
}
@@ -456,8 +492,8 @@ public abstract class WindowedValue<T> {
}
@Override
- public <NewT> WindowedValue<NewT> withValue(NewT value) {
- return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane);
+ public <NewT> WindowedValue<NewT> withValue(NewT newValue) {
+ return new TimestampedValueInMultipleWindows<>(newValue, getTimestamp(), windows, getPane());
}
@Override
@@ -470,30 +506,37 @@ public abstract class WindowedValue<T> {
if (o instanceof TimestampedValueInMultipleWindows) {
TimestampedValueInMultipleWindows<?> that =
(TimestampedValueInMultipleWindows<?>) o;
- if (this.timestamp.isEqual(that.timestamp) // don't compare chronology objects
- && Objects.equals(that.value, this.value)
- && Objects.equals(that.pane, this.pane)) {
+ // Compare timestamps first as they are most likely to differ.
+ // Also compare timestamps according to millis-since-epoch because otherwise expensive
+ // comparisons are made on their Chronology objects.
+ if (this.getTimestamp().isEqual(that.getTimestamp())
+ && Objects.equals(that.getValue(), this.getValue())
+ && Objects.equals(that.getPane(), this.getPane())) {
ensureWindowsAreASet();
that.ensureWindowsAreASet();
return that.windows.equals(this.windows);
+ } else {
+ return false;
}
+ } else {
+ return super.equals(o);
}
- return false;
}
@Override
public int hashCode() {
+ // Hash only the millis of the timestamp to be consistent with equals
ensureWindowsAreASet();
- return Objects.hash(value, timestamp.getMillis(), pane, windows);
+ return Objects.hash(getValue(), getTimestamp().getMillis(), getPane(), windows);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("value", value)
- .add("timestamp", timestamp)
+ .add("value", getValue())
+ .add("timestamp", getTimestamp())
.add("windows", windows)
- .add("pane", pane)
+ .add("pane", getPane())
.toString();
}