You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/22 23:03:59 UTC
[7/9] beam git commit: Tidy LateDataDroppingDoFnRunner
Tidy LateDataDroppingDoFnRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4e5db51
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4e5db51
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4e5db51
Branch: refs/heads/master
Commit: d4e5db51a025a831ddf4e3bc0e003caebabf647b
Parents: 497cfab
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 11:56:53 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 13:58:08 2017 -0700
----------------------------------------------------------------------
.../core/LateDataDroppingDoFnRunner.java | 33 ++++++++++----------
1 file changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d4e5db51/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 1cf1509..28938c1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -134,26 +134,27 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWin
// The element is too late for this window.
droppedDueToLateness.inc();
WindowTracing.debug(
- "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
- + "since too far behind inputWatermark:{}; outputWatermark:{}",
- input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
+ "{}: Dropping element at {} for key:{}; window:{} "
+ + "since too far behind inputWatermark:{}; outputWatermark:{}",
+ LateDataFilter.class.getSimpleName(),
+ input.getTimestamp(),
+ key,
+ window,
+ timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
}
}
- Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter(
- concatElements,
- new Predicate<WindowedValue<InputT>>() {
- @Override
- public boolean apply(WindowedValue<InputT> input) {
- BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
- if (canDropDueToExpiredWindow(window)) {
- return false;
- } else {
- return true;
- }
- }
- });
+ Iterable<WindowedValue<InputT>> nonLateElements =
+ Iterables.filter(
+ concatElements,
+ new Predicate<WindowedValue<InputT>>() {
+ @Override
+ public boolean apply(WindowedValue<InputT> input) {
+ BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
+ return !canDropDueToExpiredWindow(window);
+ }
+ });
return nonLateElements;
}