You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/27 19:55:41 UTC

[2/2] beam git commit: Make WindowMappingFn#maximumLookback Configurable but Final

Make WindowMappingFn#maximumLookback Configurable but Final

This enforces that it return a constant value.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/61bb6b4e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/61bb6b4e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/61bb6b4e

Branch: refs/heads/master
Commit: 61bb6b4e301a3675f20728730a2d691a79941156
Parents: 88ffc97
Author: Thomas Groh <tg...@google.com>
Authored: Thu Mar 23 15:20:06 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Mar 27 12:55:30 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/testing/StaticWindows.java  |  8 +------
 .../sdk/transforms/windowing/GlobalWindows.java |  6 -----
 .../windowing/PartitioningWindowFn.java         |  6 -----
 .../transforms/windowing/SlidingWindows.java    |  5 ----
 .../transforms/windowing/WindowMappingFn.java   | 24 +++++++++++++++++---
 .../sdk/util/IdentitySideInputWindowFn.java     |  6 -----
 6 files changed, 22 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
index 4be88c8..fde1669 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
@@ -103,7 +103,7 @@ final class StaticWindows extends NonMergingWindowFn<Object, BoundedWindow> {
 
   @Override
   public WindowMappingFn<BoundedWindow> getDefaultWindowMappingFn() {
-    return new WindowMappingFn<BoundedWindow>() {
+    return new WindowMappingFn<BoundedWindow>(Duration.millis(Long.MAX_VALUE)) {
       @Override
       public BoundedWindow getSideInputWindow(BoundedWindow mainWindow) {
         checkArgument(
@@ -112,12 +112,6 @@ final class StaticWindows extends NonMergingWindowFn<Object, BoundedWindow> {
             StaticWindows.class.getSimpleName());
         return mainWindow;
       }
-
-      @Override
-      public Duration maximumLookback() {
-        // TODO: This may be unsafe.
-        return Duration.millis(Long.MAX_VALUE);
-      }
     };
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
index e91fad1..400be1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms.windowing;
 import java.util.Collection;
 import java.util.Collections;
 import org.apache.beam.sdk.coders.Coder;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /**
@@ -56,11 +55,6 @@ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow> {
     public GlobalWindow getSideInputWindow(BoundedWindow mainWindow) {
       return GlobalWindow.INSTANCE;
     }
-
-    @Override
-    public Duration maximumLookback() {
-      return Duration.ZERO;
-    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
index 40cff8a..40ee68a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms.windowing;
 
 import java.util.Arrays;
 import java.util.Collection;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /**
@@ -52,11 +51,6 @@ public abstract class PartitioningWindowFn<T, W extends BoundedWindow>
         }
         return assignWindow(mainWindow.maxTimestamp());
       }
-
-      @Override
-      public Duration maximumLookback() {
-        return Duration.ZERO;
-      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
index b27f4e6..650dc37 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
@@ -139,11 +139,6 @@ public class SlidingWindows extends NonMergingWindowFn<Object, IntervalWindow> {
         long lastStart = lastStartFor(mainWindow.maxTimestamp().minus(size));
         return new IntervalWindow(new Instant(lastStart + period.getMillis()), size);
       }
-
-      @Override
-      public Duration maximumLookback() {
-        return Duration.ZERO;
-      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java
index 62bf544..910ed98 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java
@@ -29,14 +29,30 @@ import org.joda.time.Duration;
  * {@link BoundMulti#withSideInputs(PCollectionView[]) side input}.
  */
 public abstract class WindowMappingFn<TargetWindowT extends BoundedWindow> implements Serializable {
+  private final Duration maximumLookback;
+
+  /**
+   * Create a new {@link WindowMappingFn} with {@link Duration#ZERO zero} maximum lookback.
+   */
+  protected WindowMappingFn() {
+    this(Duration.ZERO);
+  }
+
+  /**
+   * Create a new {@link WindowMappingFn} with the specified maximum lookback.
+   */
+  protected WindowMappingFn(Duration maximumLookback) {
+    this.maximumLookback = maximumLookback;
+  }
+
   /**
    * Returns the window of the side input corresponding to the given window of the main input.
    */
   public abstract TargetWindowT getSideInputWindow(BoundedWindow mainWindow);
 
   /**
-   * The maximum distance between the end of any main input window {@code mainWindow}
-   * and the end of the side input window returned by {@link #getSideInputWindow(BoundedWindow)}
+   * The maximum distance between the end of any main input window {@code mainWindow} and the end of
+   * the side input window returned by {@link #getSideInputWindow(BoundedWindow)}
    *
    * <p>A side input window {@code w} becomes unreachable when the input watermarks for all
    * consumers surpasses the timestamp:
@@ -45,5 +61,7 @@ public abstract class WindowMappingFn<TargetWindowT extends BoundedWindow> imple
    *
    * <p>At this point, every main input window that could map to {@code w} is expired.
    */
-  public abstract Duration maximumLookback();
+  public final Duration maximumLookback() {
+    return maximumLookback;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
index 60d9afe..2171466 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
@@ -25,7 +25,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.joda.time.Duration;
 
 /**
  * A {@link WindowFn} for use during tests that returns the input window for calls to
@@ -56,11 +55,6 @@ public class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer, Bound
       public BoundedWindow getSideInputWindow(BoundedWindow window) {
         return window;
       }
-
-      @Override
-      public Duration maximumLookback() {
-        return Duration.ZERO;
-      }
     };
   }
 }