You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2020/05/04 08:51:00 UTC

[beam] branch release-2.21.0 updated (9364bba -> fe15aec)

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a change to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from 9364bba  Merge pull request #11537: [BEAM-6661] Get rid of a few logging annoyances for execution and shutdown
     new 6f1c2f4  [BEAM-9801] Pass in fire timestamp and pane info to timer callback
     new 13f1e7b  [BEAM-9733] Repeatedly fire in batch mode until there are no more timers
     new f2bfed6  Merge pull request #11595: [BEAM-9801] Fire timers set within timers in Spark
     new fe15aec  [BEAM-9874] Support clearing timers in portable batch mode (Spark/Flink)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../beam/runners/core/InMemoryTimerInternals.java  | 22 ++---
 .../functions/FlinkExecutableStageFunction.java    | 32 +++-----
 .../translation/PipelineTranslatorUtils.java       | 27 ++++--
 .../translation/SparkExecutableStageFunction.java  | 95 ++++++++++------------
 sdks/python/apache_beam/runners/common.py          | 16 ++--
 .../runners/direct/transform_evaluator.py          |  4 +-
 .../runners/portability/fn_api_runner/execution.py |  6 ++
 .../runners/portability/fn_api_runner/fn_runner.py |  7 +-
 .../portability/fn_api_runner/fn_runner_test.py    | 12 ++-
 .../apache_beam/runners/worker/operations.py       |  3 +-
 10 files changed, 121 insertions(+), 103 deletions(-)


[beam] 02/04: [BEAM-9733] Repeatedly fire in batch mode until there are no more timers

Posted by mx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 13f1e7b15de08054b32d57ce0fbca4911c91f8d2
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Sat Apr 25 14:09:26 2020 +0200

    [BEAM-9733] Repeatedly fire in batch mode until there are no more timers
---
 .../beam/runners/core/InMemoryTimerInternals.java  |  7 ++++
 .../functions/FlinkExecutableStageFunction.java    | 40 ++++++++++++----------
 2 files changed, 28 insertions(+), 19 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 7fbfaf0..21420a7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -66,6 +66,13 @@ public class InMemoryTimerInternals implements TimerInternals {
     return outputWatermarkTime;
   }
 
+  /** Returns true when there are still timers to be fired. */
+  public boolean hasPendingTimers() {
+    return !(watermarkTimers.isEmpty()
+        && processingTimers.isEmpty()
+        && synchronizedProcessingTimers.isEmpty());
+  }
+
   /**
    * Returns when the next timer in the given time domain will fire, or {@code null} if there are no
    * timers scheduled in that time domain.
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index ea1c583..f8c9c24 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -247,25 +247,27 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
     timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     // Now we fire the timers and process elements generated by timers (which may be timers itself)
-    try (RemoteBundle bundle =
-        stageBundleFactory.getBundle(
-            receiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler)) {
-
-      PipelineTranslatorUtils.fireEligibleTimers(
-          timerInternals,
-          (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
-            FnDataReceiver<Timer> fnTimerReceiver =
-                bundle.getTimerReceivers().get(transformAndTimerId);
-            Preconditions.checkNotNull(
-                fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
-            try {
-              fnTimerReceiver.accept(timerValue);
-            } catch (Exception e) {
-              throw new RuntimeException(
-                  String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
-            }
-          },
-          currentTimerKey);
+    while (timerInternals.hasPendingTimers()) {
+      try (RemoteBundle bundle =
+          stageBundleFactory.getBundle(
+              receiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler)) {
+
+        PipelineTranslatorUtils.fireEligibleTimers(
+            timerInternals,
+            (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
+              FnDataReceiver<Timer> fnTimerReceiver =
+                  bundle.getTimerReceivers().get(transformAndTimerId);
+              Preconditions.checkNotNull(
+                  fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
+              try {
+                fnTimerReceiver.accept(timerValue);
+              } catch (Exception e) {
+                throw new RuntimeException(
+                    String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
+              }
+            },
+            currentTimerKey);
+      }
     }
   }
 


[beam] 01/04: [BEAM-9801] Pass in fire timestamp and pane info to timer callback

Posted by mx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6f1c2f4e81930e0f1f0b482dedc1dbdcca2caac0
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Wed Apr 22 17:08:59 2020 +0200

    [BEAM-9801] Pass in fire timestamp and pane info to timer callback
    
    Pass in the timestamp to avoid:
    
    ```
    INFO:apache_beam.utils.subprocess_server:Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 4: Traceback (most recent call last):
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 245, in _execute
    INFO:apache_beam.utils.subprocess_server: response = task()
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda>
    INFO:apache_beam.utils.subprocess_server: lambda: self.create_worker().do_instruction(request), request)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction
    INFO:apache_beam.utils.subprocess_server: getattr(request, request_type), request.instruction_id)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle
    INFO:apache_beam.utils.subprocess_server: bundle_processor.process_bundle(instruction_id))
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/bundle_processor.py", line 910, in process_bundle
    INFO:apache_beam.utils.subprocess_server: element.timer_family_id, timer_data)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/operations.py", line 688, in process_timer
    INFO:apache_beam.utils.subprocess_server: timer_data.fire_timestamp)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 990, in process_user_timer
    INFO:apache_beam.utils.subprocess_server: self._reraise_augmented(exn)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 1043, in _reraise_augmented
    INFO:apache_beam.utils.subprocess_server: raise_with_traceback(new_exn)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 988, in process_user_timer
    INFO:apache_beam.utils.subprocess_server: self.do_fn_invoker.invoke_user_timer(timer_spec, key, window, timestamp)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 517, in invoke_user_timer
    INFO:apache_beam.utils.subprocess_server: self.user_state_context, key, window, timestamp))
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 1093, in process_outputs
    INFO:apache_beam.utils.subprocess_server: for result in results:
    INFO:apache_beam.utils.subprocess_server: File "/Users/max/Dev/beam/sdks/python/apache_beam/testing/load_tests/pardo_test.py", line 185, in process_timer
    INFO:apache_beam.utils.subprocess_server: timer.set(0)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/bundle_processor.py", line 589, in set
    INFO:apache_beam.utils.subprocess_server: self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/coders/coder_impl.py", line 651, in encode_to_stream
    INFO:apache_beam.utils.subprocess_server: value.hold_timestamp, out, True)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/coders/coder_impl.py", line 608, in encode_to_stream
    INFO:apache_beam.utils.subprocess_server: millis = value.micros // 1000
    INFO:apache_beam.utils.subprocess_server:AttributeError: 'NoneType' object has no attribute 'micros' [while running 'GenerateLoad']
    ```
---
 sdks/python/apache_beam/runners/common.py                | 16 +++++++++-------
 .../apache_beam/runners/direct/transform_evaluator.py    |  4 +++-
 .../runners/portability/fn_api_runner/execution.py       |  6 ++++++
 .../runners/portability/fn_api_runner/fn_runner.py       |  7 +++++--
 .../runners/portability/fn_api_runner/fn_runner_test.py  | 12 ++++++++++--
 sdks/python/apache_beam/runners/worker/operations.py     |  3 ++-
 6 files changed, 35 insertions(+), 13 deletions(-)

diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 156340f..cafa4a1 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -221,7 +221,8 @@ class MethodWrapper(object):
     if self.watermark_estimator_provider is None:
       self.watermark_estimator_provider = NoOpWatermarkEstimatorProvider()
 
-  def invoke_timer_callback(self, user_state_context, key, window, timestamp):
+  def invoke_timer_callback(
+      self, user_state_context, key, window, timestamp, pane_info):
     # TODO(ccy): support side inputs.
     kwargs = {}
     if self.has_userstate_arguments:
@@ -229,10 +230,10 @@ class MethodWrapper(object):
         kwargs[kw] = user_state_context.get_state(state_spec, key, window)
       for kw, timer_spec in self.timer_args_to_replace.items():
         kwargs[kw] = user_state_context.get_timer(
-            timer_spec, key, window, None, None)
+            timer_spec, key, window, timestamp, pane_info)
 
     if self.timestamp_arg_name:
-      kwargs[self.timestamp_arg_name] = Timestamp(seconds=timestamp)
+      kwargs[self.timestamp_arg_name] = Timestamp.of(timestamp)
     if self.window_arg_name:
       kwargs[self.window_arg_name] = window
     if self.key_arg_name:
@@ -509,12 +510,12 @@ class DoFnInvoker(object):
     """
     self.signature.teardown_lifecycle_method.method_value()
 
-  def invoke_user_timer(self, timer_spec, key, window, timestamp):
+  def invoke_user_timer(self, timer_spec, key, window, timestamp, pane_info):
     # self.output_processor is Optional, but in practice it won't be None here
     self.output_processor.process_outputs(
         WindowedValue(None, timestamp, (window, )),
         self.signature.timer_methods[timer_spec].invoke_timer_callback(
-            self.user_state_context, key, window, timestamp))
+            self.user_state_context, key, window, timestamp, pane_info))
 
   def invoke_create_watermark_estimator(self, estimator_state):
     return self.signature.create_watermark_estimator_method.method_value(
@@ -983,9 +984,10 @@ class DoFnRunner:
     assert isinstance(self.do_fn_invoker, PerWindowInvoker)
     return self.do_fn_invoker.current_element_progress()
 
-  def process_user_timer(self, timer_spec, key, window, timestamp):
+  def process_user_timer(self, timer_spec, key, window, timestamp, pane_info):
     try:
-      self.do_fn_invoker.invoke_user_timer(timer_spec, key, window, timestamp)
+      self.do_fn_invoker.invoke_user_timer(
+          timer_spec, key, window, timestamp, pane_info)
     except BaseException as exn:
       self._reraise_augmented(exn)
 
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 474ad0d..e2037d6 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -867,7 +867,9 @@ class _ParDoEvaluator(_TransformEvaluator):
         timer_spec,
         self.key_coder.decode(timer_firing.encoded_key),
         timer_firing.window,
-        timer_firing.timestamp)
+        timer_firing.timestamp,
+        # TODO Add paneinfo to timer_firing in DirectRunner
+        None)
 
   def process_element(self, element):
     self.runner.process(element)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
index e62d8a8..b552016 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
@@ -115,6 +115,12 @@ class ListBuffer(object):
     self._inputs = []
     self._grouped_output = None
 
+  def reset(self):
+    """Resets a cleared buffer for reuse."""
+    if not self.cleared:
+      raise RuntimeError('Trying to reset a non-cleared ListBuffer.')
+    self.cleared = False
+
 
 class GroupingBuffer(object):
   """Used to accumulate groupded (shuffled) results."""
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index 34b605f..d3e378e 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -966,10 +966,13 @@ class BundleManager(object):
           (result_future.is_done() and result_future.get().error)):
         if isinstance(output, beam_fn_api_pb2.Elements.Timer):
           with BundleManager._lock:
-            self._get_buffer(
+            timer_buffer = self._get_buffer(
                 expected_output_timers[(
                     output.transform_id, output.timer_family_id)],
-                output.transform_id).append(output.timers)
+                output.transform_id)
+            if timer_buffer.cleared:
+              timer_buffer.reset()
+            timer_buffer.append(output.timers)
         if isinstance(output, beam_fn_api_pb2.Elements.Data):
           with BundleManager._lock:
             self._get_buffer(
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 3bc977b..1d2400a 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -327,6 +327,7 @@ class FnApiRunnerTest(unittest.TestCase):
 
   def test_pardo_timers(self):
     timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
+    state_spec = userstate.CombiningValueStateSpec('num_called', sum)
 
     class TimerDoFn(beam.DoFn):
       def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)):
@@ -335,7 +336,14 @@ class FnApiRunnerTest(unittest.TestCase):
         timer.set(2 * ts)
 
       @userstate.on_timer(timer_spec)
-      def process_timer(self):
+      def process_timer(
+          self,
+          ts=beam.DoFn.TimestampParam,
+          timer=beam.DoFn.TimerParam(timer_spec),
+          state=beam.DoFn.StateParam(state_spec)):
+        if state.read() == 0:
+          state.add(1)
+          timer.set(timestamp.Timestamp(micros=2 * ts.micros))
         yield 'fired'
 
     with self.create_pipeline() as p:
@@ -345,7 +353,7 @@ class FnApiRunnerTest(unittest.TestCase):
           | beam.ParDo(TimerDoFn())
           | beam.Map(lambda x, ts=beam.DoFn.TimestampParam: (x, ts)))
 
-      expected = [('fired', ts) for ts in (20, 200)]
+      expected = [('fired', ts) for ts in (20, 200, 40, 400)]
       assert_that(actual, equal_to(expected))
 
   def test_pardo_timers_clear(self):
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index c028c8f..66f12f2 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -685,7 +685,8 @@ class DoOperation(Operation):
         timer_spec,
         timer_data.user_key,
         timer_data.windows[0],
-        timer_data.fire_timestamp)
+        timer_data.fire_timestamp,
+        timer_data.paneinfo)
 
   def finish(self):
     # type: () -> None


[beam] 03/04: Merge pull request #11595: [BEAM-9801] Fire timers set within timers in Spark

Posted by mx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f2bfed6cd9795cc61846e95f3bffaa8d52b012ad
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Sun May 3 12:46:54 2020 +0200

    Merge pull request #11595: [BEAM-9801] Fire timers set within timers in Spark
---
 .../functions/FlinkExecutableStageFunction.java    | 16 +---
 .../translation/PipelineTranslatorUtils.java       | 27 +++++--
 .../translation/SparkExecutableStageFunction.java  | 91 ++++++++++------------
 3 files changed, 59 insertions(+), 75 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index f8c9c24..bf3355d 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -53,7 +53,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -253,20 +252,7 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
               receiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler)) {
 
         PipelineTranslatorUtils.fireEligibleTimers(
-            timerInternals,
-            (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
-              FnDataReceiver<Timer> fnTimerReceiver =
-                  bundle.getTimerReceivers().get(transformAndTimerId);
-              Preconditions.checkNotNull(
-                  fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
-              try {
-                fnTimerReceiver.accept(timerValue);
-              } catch (Exception e) {
-                throw new RuntimeException(
-                    String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
-              }
-            },
-            currentTimerKey);
+            timerInternals, bundle.getTimerReceivers(), currentTimerKey);
       }
     }
   }
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
index c624e5c..1fa4ef0 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
@@ -22,7 +22,8 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.function.BiConsumer;
+import java.util.Locale;
+import java.util.Map;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.runners.core.InMemoryTimerInternals;
@@ -36,6 +37,7 @@ import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import org.apache.beam.runners.fnexecution.control.TimerReceiverFactory;
 import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -109,7 +111,7 @@ public final class PipelineTranslatorUtils {
    */
   public static void fireEligibleTimers(
       InMemoryTimerInternals timerInternals,
-      BiConsumer<KV<String, String>, Timer<?>> timerConsumer,
+      Map<KV<String, String>, FnDataReceiver<Timer>> timerReceivers,
       Object currentTimerKey) {
 
     boolean hasFired;
@@ -119,22 +121,22 @@ public final class PipelineTranslatorUtils {
 
       while ((timer = timerInternals.removeNextEventTimer()) != null) {
         hasFired = true;
-        fireTimer(timer, timerConsumer, currentTimerKey);
+        fireTimer(timer, timerReceivers, currentTimerKey);
       }
       while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
         hasFired = true;
-        fireTimer(timer, timerConsumer, currentTimerKey);
+        fireTimer(timer, timerReceivers, currentTimerKey);
       }
       while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
         hasFired = true;
-        fireTimer(timer, timerConsumer, currentTimerKey);
+        fireTimer(timer, timerReceivers, currentTimerKey);
       }
     } while (hasFired);
   }
 
   private static void fireTimer(
       TimerInternals.TimerData timer,
-      BiConsumer<KV<String, String>, Timer<?>> timerConsumer,
+      Map<KV<String, String>, FnDataReceiver<Timer>> timerReceivers,
       Object currentTimerKey) {
     StateNamespace namespace = timer.getNamespace();
     Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
@@ -149,7 +151,16 @@ public final class PipelineTranslatorUtils {
             timestamp,
             outputTimestamp,
             PaneInfo.NO_FIRING);
-    timerConsumer.accept(
-        TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerId()), timerValue);
+    KV<String, String> transformAndTimerId =
+        TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerId());
+    FnDataReceiver<Timer> fnTimerReceiver = timerReceivers.get(transformAndTimerId);
+    Preconditions.checkNotNull(
+        fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
+    try {
+      fnTimerReceiver.accept(timerValue);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
+    }
   }
 }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
index 954ccc5..a9a0dec 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
@@ -59,8 +59,6 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
@@ -128,42 +126,53 @@ class SparkExecutableStageFunction<InputT, SideInputT>
         StateRequestHandler stateRequestHandler =
             getStateRequestHandler(
                 executableStage, stageBundleFactory.getProcessBundleDescriptor());
-        if (executableStage.getTimers().size() > 0) {
-          // Used with Batch, we know that all the data is available for this key. We can't use the
-          // timer manager from the context because it doesn't exist. So we create one and advance
-          // time to the end after processing all elements.
-          final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
-          timerInternals.advanceProcessingTime(Instant.now());
-          timerInternals.advanceSynchronizedProcessingTime(Instant.now());
-
+        if (executableStage.getTimers().size() == 0) {
           ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap);
-
-          TimerReceiverFactory timerReceiverFactory =
-              new TimerReceiverFactory(
-                  stageBundleFactory,
-                  (Timer<?> timer, TimerInternals.TimerData timerData) -> {
-                    currentTimerKey = timer.getUserKey();
-                    timerInternals.setTimer(timerData);
-                  },
-                  windowCoder);
-
-          // Process inputs.
           processElements(
               executableStage,
               stateRequestHandler,
               receiverFactory,
-              timerReceiverFactory,
+              null,
               stageBundleFactory,
               inputs);
+          return collector.iterator();
+        }
+        // Used with Batch, we know that all the data is available for this key. We can't use the
+        // timer manager from the context because it doesn't exist. So we create one and advance
+        // time to the end after processing all elements.
+        final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+        timerInternals.advanceProcessingTime(Instant.now());
+        timerInternals.advanceSynchronizedProcessingTime(Instant.now());
+
+        ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap);
+
+        TimerReceiverFactory timerReceiverFactory =
+            new TimerReceiverFactory(
+                stageBundleFactory,
+                (Timer<?> timer, TimerInternals.TimerData timerData) -> {
+                  currentTimerKey = timer.getUserKey();
+                  timerInternals.setTimer(timerData);
+                },
+                windowCoder);
+
+        // Process inputs.
+        processElements(
+            executableStage,
+            stateRequestHandler,
+            receiverFactory,
+            timerReceiverFactory,
+            stageBundleFactory,
+            inputs);
 
-          // Finish any pending windows by advancing the input watermark to infinity.
-          timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
-          // Finally, advance the processing time to infinity to fire any timers.
-          timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-          timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        // Finish any pending windows by advancing the input watermark to infinity.
+        timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        // Finally, advance the processing time to infinity to fire any timers.
+        timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
-          // Now we fire the timers and process elements generated by timers (which may be timers
-          // itself)
+        // Now we fire the timers and process elements generated by timers (which may be timers
+        // itself)
+        while (timerInternals.hasPendingTimers()) {
           try (RemoteBundle bundle =
               stageBundleFactory.getBundle(
                   receiverFactory,
@@ -172,30 +181,8 @@ class SparkExecutableStageFunction<InputT, SideInputT>
                   getBundleProgressHandler())) {
 
             PipelineTranslatorUtils.fireEligibleTimers(
-                timerInternals,
-                (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
-                  FnDataReceiver<Timer> fnTimerReceiver =
-                      bundle.getTimerReceivers().get(transformAndTimerId);
-                  Preconditions.checkNotNull(
-                      fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
-                  try {
-                    fnTimerReceiver.accept(timerValue);
-                  } catch (Exception e) {
-                    throw new RuntimeException(
-                        String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
-                  }
-                },
-                currentTimerKey);
+                timerInternals, bundle.getTimerReceivers(), currentTimerKey);
           }
-        } else {
-          ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap);
-          processElements(
-              executableStage,
-              stateRequestHandler,
-              receiverFactory,
-              null,
-              stageBundleFactory,
-              inputs);
         }
         return collector.iterator();
       }


[beam] 04/04: [BEAM-9874] Support clearing timers in portable batch mode (Spark/Flink)

Posted by mx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit fe15aecd618e0a76960eca76ebc5d17b899b293c
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Sun May 3 12:38:35 2020 +0200

    [BEAM-9874] Support clearing timers in portable batch mode (Spark/Flink)
---
 .../beam/runners/core/InMemoryTimerInternals.java   | 21 ++++++++-------------
 .../functions/FlinkExecutableStageFunction.java     |  6 +++++-
 .../translation/SparkExecutableStageFunction.java   |  6 +++++-
 3 files changed, 18 insertions(+), 15 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 21420a7..241b49d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -40,13 +40,13 @@ public class InMemoryTimerInternals implements TimerInternals {
   Table<StateNamespace, String, TimerData> existingTimers = HashBasedTable.create();
 
   /** Pending input watermark timers, in timestamp order. */
-  private NavigableSet<TimerData> watermarkTimers = new TreeSet<>();
+  private final NavigableSet<TimerData> watermarkTimers = new TreeSet<>();
 
   /** Pending processing time timers, in timestamp order. */
-  private NavigableSet<TimerData> processingTimers = new TreeSet<>();
+  private final NavigableSet<TimerData> processingTimers = new TreeSet<>();
 
   /** Pending synchronized processing time timers, in timestamp order. */
-  private NavigableSet<TimerData> synchronizedProcessingTimers = new TreeSet<>();
+  private final NavigableSet<TimerData> synchronizedProcessingTimers = new TreeSet<>();
 
   /** Current input watermark. */
   private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
@@ -68,9 +68,7 @@ public class InMemoryTimerInternals implements TimerInternals {
 
   /** Returns true when there are still timers to be fired. */
   public boolean hasPendingTimers() {
-    return !(watermarkTimers.isEmpty()
-        && processingTimers.isEmpty()
-        && synchronizedProcessingTimers.isEmpty());
+    return !existingTimers.isEmpty();
   }
 
   /**
@@ -167,9 +165,9 @@ public class InMemoryTimerInternals implements TimerInternals {
   @Deprecated
   @Override
   public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
-    TimerData existing = existingTimers.get(namespace, timerId + '+' + timerFamilyId);
-    if (existing != null) {
-      deleteTimer(existing);
+    TimerData removedTimer = existingTimers.remove(namespace, timerId + '+' + timerFamilyId);
+    if (removedTimer != null) {
+      timersForDomain(removedTimer.getDomain()).remove(removedTimer);
     }
   }
 
@@ -177,10 +175,7 @@ public class InMemoryTimerInternals implements TimerInternals {
   @Deprecated
   @Override
   public void deleteTimer(TimerData timer) {
-    WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);
-    existingTimers.remove(
-        timer.getNamespace(), timer.getTimerId() + '+' + timer.getTimerFamilyId());
-    timersForDomain(timer.getDomain()).remove(timer);
+    deleteTimer(timer.getNamespace(), timer.getTimerId(), timer.getTimerFamilyId());
   }
 
   @Override
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index bf3355d..4ee46ae 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -228,7 +228,11 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
             stageBundleFactory,
             (Timer<?> timer, TimerInternals.TimerData timerData) -> {
               currentTimerKey = timer.getUserKey();
-              timerInternals.setTimer(timerData);
+              if (timer.getClearBit()) {
+                timerInternals.deleteTimer(timerData);
+              } else {
+                timerInternals.setTimer(timerData);
+              }
             },
             windowCoder);
 
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
index a9a0dec..233e095 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
@@ -151,7 +151,11 @@ class SparkExecutableStageFunction<InputT, SideInputT>
                 stageBundleFactory,
                 (Timer<?> timer, TimerInternals.TimerData timerData) -> {
                   currentTimerKey = timer.getUserKey();
-                  timerInternals.setTimer(timerData);
+                  if (timer.getClearBit()) {
+                    timerInternals.deleteTimer(timerData);
+                  } else {
+                    timerInternals.setTimer(timerData);
+                  }
                 },
                 windowCoder);