You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/06 01:08:14 UTC
[2/2] beam git commit: Use WinodwMappingFn where possible
Use WinodwMappingFn where possible
Migrates callers away from the user of WindowingStrategyInternal,
permitting future changes to have a configurable WindowMappingFn.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e8cf0c5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e8cf0c5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e8cf0c5
Branch: refs/heads/master
Commit: 9e8cf0c5ea7f47a9d7ec05272d56508962c86918
Parents: cc5f78d
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 5 11:53:03 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Apr 5 18:08:03 2017 -0700
----------------------------------------------------------------------
...tputAndTimeBoundedSplittableProcessElementInvoker.java | 3 +--
.../beam/runners/core/PushbackSideInputDoFnRunner.java | 2 +-
.../apache/beam/runners/core/ReduceFnContextFactory.java | 3 +--
.../org/apache/beam/runners/core/SimpleDoFnRunner.java | 2 +-
.../org/apache/beam/runners/core/SimpleOldDoFnRunner.java | 2 +-
.../org/apache/beam/runners/core/ReduceFnRunnerTest.java | 10 ++++------
.../java/org/apache/beam/sdk/transforms/DoFnTester.java | 3 +--
.../org/apache/beam/sdk/util/CombineContextFactory.java | 2 +-
8 files changed, 11 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 5aa7605..357094c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -226,8 +226,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
public <T> T sideInput(PCollectionView<T> view) {
return sideInputReader.get(
view,
- view.getWindowingStrategyInternal()
- .getWindowFn()
+ view.getWindowMappingFn()
.getSideInputWindow(Iterables.getOnlyElement(element.getWindows())));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 2962832..4ad20b5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -98,7 +98,7 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<
}
for (PCollectionView<?> view : views) {
BoundedWindow sideInputWindow =
- view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
+ view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
if (!sideInputReader.isReady(view, sideInputWindow)) {
return false;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index 66a6ef8..8493474 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -514,8 +514,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
public <T> T sideInput(PCollectionView<T> view) {
return sideInputReader.get(
view,
- view.getWindowingStrategyInternal()
- .getWindowFn()
+ view.getWindowMappingFn()
.getSideInputWindow(mainInputWindow));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index dfa9645..77286b2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -533,7 +533,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
}
return context.sideInput(
- view, view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window));
+ view, view.getWindowMappingFn().getSideInputWindow(window));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index c21ed77..c88f1c9 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -389,7 +389,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
}
}
return context.sideInput(
- view, view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window));
+ view, view.getWindowMappingFn().getSideInputWindow(window));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 1bd717f..0d4d992 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -65,6 +65,7 @@ import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowedValue;
@@ -360,8 +361,9 @@ public class ReduceFnRunnerTest {
WindowingStrategy.of(FixedWindows.of(Duration.millis(2)))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);
- WindowingStrategy<?, IntervalWindow> sideInputWindowingStrategy =
- WindowingStrategy.of(FixedWindows.of(Duration.millis(4)));
+ WindowMappingFn<?> sideInputWindowMappingFn =
+ FixedWindows.of(Duration.millis(4)).getDefaultWindowMappingFn();
+ when(mockView.getWindowMappingFn()).thenReturn((WindowMappingFn) sideInputWindowMappingFn);
TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
options.setValue(expectedValue);
@@ -384,10 +386,6 @@ public class ReduceFnRunnerTest {
}
});
- @SuppressWarnings({"rawtypes", "unchecked", "unused"})
- Object suppressWarningsVar = when(mockView.getWindowingStrategyInternal())
- .thenReturn((WindowingStrategy) sideInputWindowingStrategy);
-
SumAndVerifyContextFn combineFn = new SumAndVerifyContextFn(mockView, expectedValue);
ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
mainInputWindowingStrategy, mockTriggerStateMachine, combineFn.<String>asKeyedFn(),
http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 01c639a..01f0291 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -650,8 +650,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
Map<BoundedWindow, ?> viewValues = sideInputs.get(view);
if (viewValues != null) {
BoundedWindow sideInputWindow =
- view.getWindowingStrategyInternal()
- .getWindowFn()
+ view.getWindowMappingFn()
.getSideInputWindow(element.getWindow());
@SuppressWarnings("unchecked")
T windowValue = (T) viewValues.get(sideInputWindow);
http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
index a983057..31d1f64 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
@@ -83,7 +83,7 @@ public class CombineContextFactory {
}
BoundedWindow sideInputWindow =
- view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
+ view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
return sideInputReader.get(view, sideInputWindow);
}
};