You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/27 19:55:41 UTC
[2/2] beam git commit: Make WindowMappingFn#maximumLookback
Configurable but Final
Make WindowMappingFn#maximumLookback Configurable but Final
This enforces that it return a constant value.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/61bb6b4e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/61bb6b4e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/61bb6b4e
Branch: refs/heads/master
Commit: 61bb6b4e301a3675f20728730a2d691a79941156
Parents: 88ffc97
Author: Thomas Groh <tg...@google.com>
Authored: Thu Mar 23 15:20:06 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Mar 27 12:55:30 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/testing/StaticWindows.java | 8 +------
.../sdk/transforms/windowing/GlobalWindows.java | 6 -----
.../windowing/PartitioningWindowFn.java | 6 -----
.../transforms/windowing/SlidingWindows.java | 5 ----
.../transforms/windowing/WindowMappingFn.java | 24 +++++++++++++++++---
.../sdk/util/IdentitySideInputWindowFn.java | 6 -----
6 files changed, 22 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
index 4be88c8..fde1669 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
@@ -103,7 +103,7 @@ final class StaticWindows extends NonMergingWindowFn<Object, BoundedWindow> {
@Override
public WindowMappingFn<BoundedWindow> getDefaultWindowMappingFn() {
- return new WindowMappingFn<BoundedWindow>() {
+ return new WindowMappingFn<BoundedWindow>(Duration.millis(Long.MAX_VALUE)) {
@Override
public BoundedWindow getSideInputWindow(BoundedWindow mainWindow) {
checkArgument(
@@ -112,12 +112,6 @@ final class StaticWindows extends NonMergingWindowFn<Object, BoundedWindow> {
StaticWindows.class.getSimpleName());
return mainWindow;
}
-
- @Override
- public Duration maximumLookback() {
- // TODO: This may be unsafe.
- return Duration.millis(Long.MAX_VALUE);
- }
};
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
index e91fad1..400be1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms.windowing;
import java.util.Collection;
import java.util.Collections;
import org.apache.beam.sdk.coders.Coder;
-import org.joda.time.Duration;
import org.joda.time.Instant;
/**
@@ -56,11 +55,6 @@ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow> {
public GlobalWindow getSideInputWindow(BoundedWindow mainWindow) {
return GlobalWindow.INSTANCE;
}
-
- @Override
- public Duration maximumLookback() {
- return Duration.ZERO;
- }
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
index 40cff8a..40ee68a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms.windowing;
import java.util.Arrays;
import java.util.Collection;
-import org.joda.time.Duration;
import org.joda.time.Instant;
/**
@@ -52,11 +51,6 @@ public abstract class PartitioningWindowFn<T, W extends BoundedWindow>
}
return assignWindow(mainWindow.maxTimestamp());
}
-
- @Override
- public Duration maximumLookback() {
- return Duration.ZERO;
- }
};
}
http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
index b27f4e6..650dc37 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
@@ -139,11 +139,6 @@ public class SlidingWindows extends NonMergingWindowFn<Object, IntervalWindow> {
long lastStart = lastStartFor(mainWindow.maxTimestamp().minus(size));
return new IntervalWindow(new Instant(lastStart + period.getMillis()), size);
}
-
- @Override
- public Duration maximumLookback() {
- return Duration.ZERO;
- }
};
}
http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java
index 62bf544..910ed98 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java
@@ -29,14 +29,30 @@ import org.joda.time.Duration;
* {@link BoundMulti#withSideInputs(PCollectionView[]) side input}.
*/
public abstract class WindowMappingFn<TargetWindowT extends BoundedWindow> implements Serializable {
+ private final Duration maximumLookback;
+
+ /**
+ * Create a new {@link WindowMappingFn} with {@link Duration#ZERO zero} maximum lookback.
+ */
+ protected WindowMappingFn() {
+ this(Duration.ZERO);
+ }
+
+ /**
+ * Create a new {@link WindowMappingFn} with the specified maximum lookback.
+ */
+ protected WindowMappingFn(Duration maximumLookback) {
+ this.maximumLookback = maximumLookback;
+ }
+
/**
* Returns the window of the side input corresponding to the given window of the main input.
*/
public abstract TargetWindowT getSideInputWindow(BoundedWindow mainWindow);
/**
- * The maximum distance between the end of any main input window {@code mainWindow}
- * and the end of the side input window returned by {@link #getSideInputWindow(BoundedWindow)}
+ * The maximum distance between the end of any main input window {@code mainWindow} and the end of
+ * the side input window returned by {@link #getSideInputWindow(BoundedWindow)}
*
* <p>A side input window {@code w} becomes unreachable when the input watermarks for all
* consumers surpasses the timestamp:
@@ -45,5 +61,7 @@ public abstract class WindowMappingFn<TargetWindowT extends BoundedWindow> imple
*
* <p>At this point, every main input window that could map to {@code w} is expired.
*/
- public abstract Duration maximumLookback();
+ public final Duration maximumLookback() {
+ return maximumLookback;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
index 60d9afe..2171466 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
@@ -25,7 +25,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.joda.time.Duration;
/**
* A {@link WindowFn} for use during tests that returns the input window for calls to
@@ -56,11 +55,6 @@ public class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer, Bound
public BoundedWindow getSideInputWindow(BoundedWindow window) {
return window;
}
-
- @Override
- public Duration maximumLookback() {
- return Duration.ZERO;
- }
};
}
}