You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/25 17:30:23 UTC
[28/50] [abbrv] beam git commit: Remove getSideInputWindow
Remove getSideInputWindow
Callers should instead get the Default WindowMappingFn if no explicit
WindowMappingFn is available.
Migrate all existing callers within the SDK and runners.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/79b066da
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/79b066da
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/79b066da
Branch: refs/heads/jstorm-runner
Commit: 79b066da4ed26fae63035fb16c03508ea77bf6db
Parents: 075b621
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 4 10:38:36 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 17 13:09:39 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/runners/core/OldDoFn.java | 3 ++-
.../beam/runners/spark/util/SparkSideInputReader.java | 3 +--
.../apache/beam/sdk/transforms/windowing/WindowFn.java | 13 -------------
.../org/apache/beam/sdk/testing/StaticWindowsTest.java | 10 +++++++---
4 files changed, 10 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/79b066da/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
index 507ee50..323edf9 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;
@@ -241,7 +242,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
* window of the main input element.
*
* <p>See
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn#getSideInputWindow}
+ * {@link WindowMappingFn#getSideInputWindow}
* for how this corresponding window is determined.
*
* @throws IllegalArgumentException if this is not a side input
http://git-wip-us.apache.org/repos/asf/beam/blob/79b066da/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
index c8e9850..d6e1a94 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
@@ -54,9 +54,8 @@ public class SparkSideInputReader implements SideInputReader {
checkNotNull(windowedBroadcastHelper, "SideInput for view " + view + " is not available.");
//--- sideInput window
- WindowingStrategy<?, ?> sideInputWindowStrategy = windowedBroadcastHelper.getKey();
final BoundedWindow sideInputWindow =
- sideInputWindowStrategy.getWindowFn().getSideInputWindow(window);
+ view.getWindowMappingFn().getSideInputWindow(window);
//--- match the appropriate sideInput window.
// a tag will point to all matching sideInputs, that is all windows.
http://git-wip-us.apache.org/repos/asf/beam/blob/79b066da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
index 2f9e6c1..5ebbb41 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
@@ -124,19 +124,6 @@ public abstract class WindowFn<T, W extends BoundedWindow>
public abstract Coder<W> windowCoder();
/**
- * Returns the window of the side input corresponding to the given window of
- * the main input. If not overridden, will use the window returned by calling
- * {@link WindowMappingFn#getSideInputWindow(BoundedWindow)} on the result of
- * {@link #getDefaultWindowMappingFn()}.
- *
- * @deprecated see {@link #getDefaultWindowMappingFn()}
- */
- @Deprecated
- public W getSideInputWindow(BoundedWindow window) {
- return getDefaultWindowMappingFn().getSideInputWindow(window);
- }
-
- /**
* Returns the default {@link WindowMappingFn} to use to map main input windows to side input
* windows. This should accept arbitrary main input windows, and produce a {@link BoundedWindow}
* that can be produced by this {@link WindowFn}.
http://git-wip-us.apache.org/repos/asf/beam/blob/79b066da/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
index e662619..7ee48c8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
@@ -70,8 +70,12 @@ public class StaticWindowsTest {
WindowFn<Object, BoundedWindow> fn =
StaticWindows.of(IntervalWindow.getCoder(), ImmutableList.of(first, second));
- assertThat(fn.getSideInputWindow(first), Matchers.<BoundedWindow>equalTo(first));
- assertThat(fn.getSideInputWindow(second), Matchers.<BoundedWindow>equalTo(second));
+ assertThat(
+ fn.getDefaultWindowMappingFn().getSideInputWindow(first),
+ Matchers.<BoundedWindow>equalTo(first));
+ assertThat(
+ fn.getDefaultWindowMappingFn().getSideInputWindow(second),
+ Matchers.<BoundedWindow>equalTo(second));
}
@Test
@@ -80,7 +84,7 @@ public class StaticWindowsTest {
StaticWindows.of(IntervalWindow.getCoder(), ImmutableList.of(second));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("contains");
- fn.getSideInputWindow(first);
+ fn.getDefaultWindowMappingFn().getSideInputWindow(first);
}
@Test