You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/25 17:30:23 UTC

[28/50] [abbrv] beam git commit: Remove getSideInputWindow

Remove getSideInputWindow

Callers should instead get the Default WindowMappingFn if no explicit
WindowMappingFn is available.

Migrate all existing callers within the SDK and runners.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/79b066da
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/79b066da
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/79b066da

Branch: refs/heads/jstorm-runner
Commit: 79b066da4ed26fae63035fb16c03508ea77bf6db
Parents: 075b621
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 4 10:38:36 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 17 13:09:39 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/runners/core/OldDoFn.java     |  3 ++-
 .../beam/runners/spark/util/SparkSideInputReader.java  |  3 +--
 .../apache/beam/sdk/transforms/windowing/WindowFn.java | 13 -------------
 .../org/apache/beam/sdk/testing/StaticWindowsTest.java | 10 +++++++---
 4 files changed, 10 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/79b066da/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
index 507ee50..323edf9 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Duration;
@@ -241,7 +242,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
      * window of the main input element.
      *
      * <p>See
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn#getSideInputWindow}
+     * {@link WindowMappingFn#getSideInputWindow}
      * for how this corresponding window is determined.
      *
      * @throws IllegalArgumentException if this is not a side input

http://git-wip-us.apache.org/repos/asf/beam/blob/79b066da/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
index c8e9850..d6e1a94 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
@@ -54,9 +54,8 @@ public class SparkSideInputReader implements SideInputReader {
     checkNotNull(windowedBroadcastHelper, "SideInput for view " + view + " is not available.");
 
     //--- sideInput window
-    WindowingStrategy<?, ?> sideInputWindowStrategy = windowedBroadcastHelper.getKey();
     final BoundedWindow sideInputWindow =
-        sideInputWindowStrategy.getWindowFn().getSideInputWindow(window);
+        view.getWindowMappingFn().getSideInputWindow(window);
 
     //--- match the appropriate sideInput window.
     // a tag will point to all matching sideInputs, that is all windows.

http://git-wip-us.apache.org/repos/asf/beam/blob/79b066da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
index 2f9e6c1..5ebbb41 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
@@ -124,19 +124,6 @@ public abstract class WindowFn<T, W extends BoundedWindow>
   public abstract Coder<W> windowCoder();
 
   /**
-   * Returns the window of the side input corresponding to the given window of
-   * the main input. If not overridden, will use the window returned by calling
-   * {@link WindowMappingFn#getSideInputWindow(BoundedWindow)} on the result of
-   * {@link #getDefaultWindowMappingFn()}.
-   *
-   * @deprecated see {@link #getDefaultWindowMappingFn()}
-   */
-  @Deprecated
-  public W getSideInputWindow(BoundedWindow window) {
-    return getDefaultWindowMappingFn().getSideInputWindow(window);
-  }
-
-  /**
    * Returns the default {@link WindowMappingFn} to use to map main input windows to side input
    * windows. This should accept arbitrary main input windows, and produce a {@link BoundedWindow}
    * that can be produced by this {@link WindowFn}.

http://git-wip-us.apache.org/repos/asf/beam/blob/79b066da/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
index e662619..7ee48c8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
@@ -70,8 +70,12 @@ public class StaticWindowsTest {
     WindowFn<Object, BoundedWindow> fn =
         StaticWindows.of(IntervalWindow.getCoder(), ImmutableList.of(first, second));
 
-    assertThat(fn.getSideInputWindow(first), Matchers.<BoundedWindow>equalTo(first));
-    assertThat(fn.getSideInputWindow(second), Matchers.<BoundedWindow>equalTo(second));
+    assertThat(
+        fn.getDefaultWindowMappingFn().getSideInputWindow(first),
+        Matchers.<BoundedWindow>equalTo(first));
+    assertThat(
+        fn.getDefaultWindowMappingFn().getSideInputWindow(second),
+        Matchers.<BoundedWindow>equalTo(second));
   }
 
   @Test
@@ -80,7 +84,7 @@ public class StaticWindowsTest {
         StaticWindows.of(IntervalWindow.getCoder(), ImmutableList.of(second));
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("contains");
-    fn.getSideInputWindow(first);
+    fn.getDefaultWindowMappingFn().getSideInputWindow(first);
   }
 
   @Test