You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2020/04/07 17:42:20 UTC

[beam] branch release-2.20.0 updated (ef4b21b -> cd67dbf)

This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a change to branch release-2.20.0
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from ef4b21b  [BEAM-9557] Fix timer window boundary checking (#11252)
     new 6bba79a  Install typing package only for Python < 3.5.3 (#10821)
     new cd67dbf  Merge pull request #11226: [BEAM-9557] Fix timer window boundary checking

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/beam/runners/core/SimpleDoFnRunner.java |  4 +-
 .../org/apache/beam/sdk/transforms/ParDoTest.java  | 91 +++++++++++++++++++++-
 sdks/python/setup.py                               |  4 +-
 3 files changed, 94 insertions(+), 5 deletions(-)


[beam] 01/02: Install typing package only for Python < 3.5.3 (#10821)

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch release-2.20.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6bba79abbca019bfcbdae714b02806817ebc06a3
Author: Curtis "Fjord" Hawthorne <cg...@gmail.com>
AuthorDate: Tue Mar 10 16:06:26 2020 -0700

    Install typing package only for Python < 3.5.3 (#10821)
---
 sdks/python/setup.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index bd6450e..80f4cb0 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -168,8 +168,8 @@ REQUIRED_PACKAGES = [
     # [BEAM-5628] Beam VCF IO is not supported in Python 3.
     'pyvcf>=0.6.8,<0.7.0; python_version < "3.0"',
     # fixes and additions have been made since typing 3.5
-    'typing>=3.7.0,<3.8.0; python_version < "3.8.0"',
-    'typing-extensions>=3.7.0,<3.8.0; python_version < "3.8.0"',
+    'typing>=3.7.0,<3.8.0; python_version < "3.5.3"',
+    'typing-extensions>=3.7.0,<3.8.0',
     ]
 
 # [BEAM-8181] pyarrow cannot be installed on 32-bit Windows platforms.


[beam] 02/02: Merge pull request #11226: [BEAM-9557] Fix timer window boundary checking

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch release-2.20.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit cd67dbf1c0209824a06f764f3a1d4c591441c5c8
Author: reuvenlax <re...@google.com>
AuthorDate: Tue Apr 7 09:50:14 2020 -0700

    Merge pull request #11226: [BEAM-9557] Fix timer window boundary checking
---
 .../apache/beam/runners/core/SimpleDoFnRunner.java |  4 +-
 .../org/apache/beam/sdk/transforms/ParDoTest.java  | 91 +++++++++++++++++++++-
 2 files changed, 92 insertions(+), 3 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index fa5c695..9cc1b8d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -1190,13 +1190,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
         checkArgument(
             !outputTimestamp.isAfter(target),
-            "Attempted to set an event time timer with an output timestamp of %s that is"
+            "Attempted to set an event-time timer with an output timestamp of %s that is"
                 + " after the timer firing timestamp %s",
             outputTimestamp,
             target);
         checkArgument(
             !target.isAfter(windowExpiry),
-            "Attempted to set an event time timer with a firing timestamp of %s that is"
+            "Attempted to set an event-time timer with a firing timestamp of %s that is"
                 + " after the expiration of window %s",
             target,
             windowExpiry);
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index f3a3b03..7a2978b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -3465,10 +3465,99 @@ public class ParDoTest implements Serializable {
                 ProcessContext context, BoundedWindow window, @TimerId(timerId) Timer timer) {
               try {
                 timer.set(window.maxTimestamp().plus(1L));
+                fail("Should have failed due to out-of-bounds timer.");
+              } catch (RuntimeException e) {
+                String message = e.getMessage();
+                List<String> expectedSubstrings = Arrays.asList("event-time timer", "expiration");
+                expectedSubstrings.forEach(
+                    str ->
+                        Preconditions.checkState(
+                            message.contains(str),
+                            "Pipeline didn't fail with the expected strings: %s",
+                            expectedSubstrings));
+              }
+            }
+
+            @OnTimer(timerId)
+            public void onTimer() {}
+          };
+
+      pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
+      pipeline.run();
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
+    public void testOutOfBoundsEventTimeTimerHold() throws Exception {
+      final String timerId = "foo";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @ProcessElement
+            public void processElement(
+                ProcessContext context, BoundedWindow window, @TimerId(timerId) Timer timer) {
+              try {
+                timer
+                    .withOutputTimestamp(window.maxTimestamp().plus(1L))
+                    .set(window.maxTimestamp());
+                fail("Should have failed due to out-of-bounds timer.");
+              } catch (RuntimeException e) {
+                String message = e.getMessage();
+                List<String> expectedSubstrings =
+                    Arrays.asList("event-time timer", "output timestamp");
+                expectedSubstrings.forEach(
+                    str ->
+                        Preconditions.checkState(
+                            message.contains(str),
+                            "Pipeline didn't fail with the expected strings: %s",
+                            expectedSubstrings));
+              }
+            }
+
+            @OnTimer(timerId)
+            public void onTimer() {}
+          };
+
+      pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
+      pipeline.run();
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
+    public void testOutOfBoundsProcessingTimeTimerHold() throws Exception {
+      final String timerId = "foo";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+            @ProcessElement
+            public void processElement(
+                ProcessContext context, BoundedWindow window, @TimerId(timerId) Timer timer) {
+              try {
+                timer
+                    .withOutputTimestamp(window.maxTimestamp().plus(1L))
+                    .offset(Duration.standardSeconds(1))
+                    .setRelative();
                 fail("Should have failed due to processing time with absolute timer.");
               } catch (RuntimeException e) {
                 String message = e.getMessage();
-                List<String> expectedSubstrings = Arrays.asList("event time timer", "expiration");
+                List<String> expectedSubstrings =
+                    Arrays.asList("processing-time timer", "output timestamp");
                 expectedSubstrings.forEach(
                     str ->
                         Preconditions.checkState(