You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/23 03:05:05 UTC

[39/50] beam git commit: Tidy LateDataDroppingDoFnRunner

Tidy LateDataDroppingDoFnRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4e5db51
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4e5db51
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4e5db51

Branch: refs/heads/gearpump-runner
Commit: d4e5db51a025a831ddf4e3bc0e003caebabf647b
Parents: 497cfab
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 11:56:53 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../core/LateDataDroppingDoFnRunner.java        | 33 ++++++++++----------
 1 file changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d4e5db51/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 1cf1509..28938c1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -134,26 +134,27 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWin
           // The element is too late for this window.
           droppedDueToLateness.inc();
           WindowTracing.debug(
-              "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
-              + "since too far behind inputWatermark:{}; outputWatermark:{}",
-              input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
+              "{}: Dropping element at {} for key:{}; window:{} "
+                  + "since too far behind inputWatermark:{}; outputWatermark:{}",
+              LateDataFilter.class.getSimpleName(),
+              input.getTimestamp(),
+              key,
+              window,
+              timerInternals.currentInputWatermarkTime(),
               timerInternals.currentOutputWatermarkTime());
         }
       }
 
-      Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter(
-          concatElements,
-          new Predicate<WindowedValue<InputT>>() {
-            @Override
-            public boolean apply(WindowedValue<InputT> input) {
-              BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
-              if (canDropDueToExpiredWindow(window)) {
-                return false;
-              } else {
-                return true;
-              }
-            }
-          });
+      Iterable<WindowedValue<InputT>> nonLateElements =
+          Iterables.filter(
+              concatElements,
+              new Predicate<WindowedValue<InputT>>() {
+                @Override
+                public boolean apply(WindowedValue<InputT> input) {
+                  BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
+                  return !canDropDueToExpiredWindow(window);
+                }
+              });
       return nonLateElements;
     }