You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "liferoad (via GitHub)" <gi...@apache.org> on 2023/05/27 17:33:35 UTC

[GitHub] [beam] liferoad opened a new pull request, #26922: add tests

liferoad opened a new pull request, #26922:
URL: https://github.com/apache/beam/pull/26922

   Address #24683
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26922:
URL: https://github.com/apache/beam/pull/26922#issuecomment-1598659668

   Reminder, please take a look at this pr: @tvalentyn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #26922: Fix inject_default in CombineGlobally

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #26922:
URL: https://github.com/apache/beam/pull/26922#issuecomment-1577562314

   ## [Codecov](https://app.codecov.io/gh/apache/beam/pull/26922?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#26922](https://app.codecov.io/gh/apache/beam/pull/26922?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (2bcf657) into [master](https://app.codecov.io/gh/apache/beam/commit/6ab4eaf7865ded75eae1752c9e6452cd3ecf7579?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (6ab4eaf) will **decrease** coverage by `0.01%`.
   > The diff coverage is `28.57%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #26922      +/-   ##
   ==========================================
   - Coverage   71.51%   71.50%   -0.01%     
   ==========================================
     Files         768      853      +85     
     Lines      103725   103990     +265     
   ==========================================
   + Hits        74174    74361     +187     
   - Misses      28002    28080      +78     
     Partials     1549     1549              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `81.06% <28.57%> (-0.05%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://app.codecov.io/gh/apache/beam/pull/26922?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/transforms/core.py](https://app.codecov.io/gh/apache/beam/pull/26922?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb3JlLnB5) | `92.74% <28.57%> (+0.98%)` | :arrow_up: |
   
   ... and [136 files with indirect coverage changes](https://app.codecov.io/gh/apache/beam/pull/26922/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb merged pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb merged PR #26922:
URL: https://github.com/apache/beam/pull/26922


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on PR #26922:
URL: https://github.com/apache/beam/pull/26922#issuecomment-1615252938

   @liferoad lmk if you'd like to merge or wait to give @robertwb  another chance to TAL. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1240839035


##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for our libraries of combine PTransforms."""
+# pytype: skip-file
+
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
+
+
+class CombineGloballyTest(unittest.TestCase):
+  def test_with_periodic_impulse(self):
+    # this error is expected since the below combination is ill-defined.
+    with self.assertRaises(ValueError):
+      with TestPipeline() as p:
+        _ = (
+            p
+            | PeriodicImpulse(
+                start_timestamp=time.time(),
+                stop_timestamp=time.time() + 4,
+                fire_interval=1,
+                apply_windowing=False,
+            )
+            | beam.Map(lambda x: ('c', 1))
+            | beam.WindowInto(
+                window.GlobalWindows(),
+                trigger=trigger.Repeatedly(trigger.AfterCount(2)),
+                accumulation_mode=trigger.AccumulationMode.DISCARDING,
+            )
+            | beam.combiners.Count.Globally()

Review Comment:
   `as_singleton_view` is not supported by `beam.combiners.Count.Globally()`. and https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py#L2645 means the code does not go through the code changes. Not sure the test is relevant. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1240842602


##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for our libraries of combine PTransforms."""
+# pytype: skip-file
+
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
+
+
+class CombineGloballyTest(unittest.TestCase):
+  def test_with_periodic_impulse(self):
+    # this error is expected since the below combination is ill-defined.
+    with self.assertRaises(ValueError):
+      with TestPipeline() as p:
+        _ = (
+            p
+            | PeriodicImpulse(
+                start_timestamp=time.time(),
+                stop_timestamp=time.time() + 4,
+                fire_interval=1,
+                apply_windowing=False,
+            )
+            | beam.Map(lambda x: ('c', 1))
+            | beam.WindowInto(
+                window.GlobalWindows(),
+                trigger=trigger.Repeatedly(trigger.AfterCount(2)),
+                accumulation_mode=trigger.AccumulationMode.DISCARDING,
+            )
+            | beam.combiners.Count.Globally()
+            | "Print Windows" >> beam.Map(print))

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1224887322


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2598,8 +2598,9 @@ def typed(transform):
 
       def inject_default(_, combined):
         if combined:
-          assert len(combined) == 1
-          return combined[0]
+          if len(combined) > 1:
+            _LOGGER.warning('Apply Combined Fn with this list: %s', combined)
+            return combined[-1]

Review Comment:
   This is just my test. :)
   Restored the code but added some error messages to track this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1224894152


##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for our libraries of combine PTransforms."""
+# pytype: skip-file
+
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
+
+
+class CombineGloballyTest(unittest.TestCase):

Review Comment:
   Note this test is now failing if we add the DefaultTrigger condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1224894152


##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for our libraries of combine PTransforms."""
+# pytype: skip-file
+
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
+
+
+class CombineGloballyTest(unittest.TestCase):

Review Comment:
   Note this test is now failing if we use the `is_default` condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1237066077


##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for our libraries of combine PTransforms."""
+# pytype: skip-file
+
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
+
+
+class CombineGloballyTest(unittest.TestCase):
+  def test_with_periodic_impulse(self):

Review Comment:
   can we describe tested behavior and expected outcome in the test scenario name? For example: 
   
   `test_combining_unbounded_pcoll_with_custom_windowing_raises_error_when_default_unspecified`  
   `test_unbounded_pcoll_with_custom_windowing_require_specifying_defaults_for_empty_windows`
   
   (feel free to adjust the name)



##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2587,6 +2587,18 @@ def add_input_types(transform):
             "or CombineGlobally().as_singleton_view() to get the default "
             "output of the CombineFn if the input PCollection is empty.")
 
+      # return the error for this ill-defined streaming case
+      if not pcoll.is_bounded and not pcoll.windowing.is_default():
+        raise ValueError(
+            "For unbounded data sources, "
+            "default values are not yet supported in CombineGlobally() if the "
+            "output PCollection is not windowed by GlobalWindows"
+            " with DefaultTrigger. "

Review Comment:
   nit: let's move the leading space to end of last line for consistency 



##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more

Review Comment:
   should we place this in combiners_test.py ?



##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2587,6 +2587,18 @@ def add_input_types(transform):
             "or CombineGlobally().as_singleton_view() to get the default "
             "output of the CombineFn if the input PCollection is empty.")
 
+      # return the error for this ill-defined streaming case
+      if not pcoll.is_bounded and not pcoll.windowing.is_default():
+        raise ValueError(

Review Comment:
   1. Is the this behavior applicable only for CombineGlobally or also for CombinePerKey / CombineValues ? Perhaps we should generalize the wording here if other tests also fail in this scenario.
   
   2. For my understanding, how is the default value defined if we do use the default trigger?
   
   3. Should this wording be more appropriate: 
   
   ```
   "When combining elements in unbounded collections with non-default windowing strategy, you must explicitly specify how to define the combined result of an empty window. "
   
   "Please use CombineGlobally().without_defaults() to output "
               "an empty PCollection if the input PCollection is empty, "
               "or CombineGlobally().as_singleton_view() to get the default "
               "output of the CombineFn if the input PCollection is empty.")
   ```            
   4. If users specify `as_singleton_view`, do we need to check that  underlying CombineFn `.has_defaults()`? Is a meaningful error thrown  if it doesn't? 



##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for our libraries of combine PTransforms."""
+# pytype: skip-file
+
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
+
+
+class CombineGloballyTest(unittest.TestCase):
+  def test_with_periodic_impulse(self):
+    # this error is expected since the below combination is ill-defined.
+    with self.assertRaises(ValueError):
+      with TestPipeline() as p:
+        _ = (
+            p
+            | PeriodicImpulse(
+                start_timestamp=time.time(),
+                stop_timestamp=time.time() + 4,
+                fire_interval=1,
+                apply_windowing=False,
+            )
+            | beam.Map(lambda x: ('c', 1))
+            | beam.WindowInto(
+                window.GlobalWindows(),
+                trigger=trigger.Repeatedly(trigger.AfterCount(2)),
+                accumulation_mode=trigger.AccumulationMode.DISCARDING,
+            )
+            | beam.combiners.Count.Globally()

Review Comment:
   does this test pass if we use `beam.combiners.Count.Globally().as_singleton_view()`? Should we also add such scenario?



##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2587,6 +2587,18 @@ def add_input_types(transform):
             "or CombineGlobally().as_singleton_view() to get the default "
             "output of the CombineFn if the input PCollection is empty.")
 
+      # return the error for this ill-defined streaming case
+      if not pcoll.is_bounded and not pcoll.windowing.is_default():

Review Comment:
   Could you please add a docstring for `Windowing.is_default()` while at it? I wonder if you actually wanted to refer to `CombineGlobally.has_defaults` here...



##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for our libraries of combine PTransforms."""
+# pytype: skip-file
+
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
+
+
+class CombineGloballyTest(unittest.TestCase):
+  def test_with_periodic_impulse(self):
+    # this error is expected since the below combination is ill-defined.
+    with self.assertRaises(ValueError):
+      with TestPipeline() as p:
+        _ = (
+            p
+            | PeriodicImpulse(
+                start_timestamp=time.time(),
+                stop_timestamp=time.time() + 4,
+                fire_interval=1,
+                apply_windowing=False,
+            )
+            | beam.Map(lambda x: ('c', 1))
+            | beam.WindowInto(
+                window.GlobalWindows(),
+                trigger=trigger.Repeatedly(trigger.AfterCount(2)),
+                accumulation_mode=trigger.AccumulationMode.DISCARDING,
+            )
+            | beam.combiners.Count.Globally()
+            | "Print Windows" >> beam.Map(print))

Review Comment:
   nit: unnecessary line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1224998527


##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for our libraries of combine PTransforms."""
+# pytype: skip-file
+
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
+
+
+class CombineGloballyTest(unittest.TestCase):

Review Comment:
   ok. Updated the test. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1224891505


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2581,7 +2581,7 @@ def add_input_types(transform):
       if pcoll.windowing.windowfn != GlobalWindows():

Review Comment:
   I think I understand what you mean now. I changed the condition using what you suggested. But below code has this error `GroupByKey cannot be applied to an unbounded PCollection with global windowing and a default trigger
   pipeline_result = pipeline.run()`:
   ```
   pipeline = beam.Pipeline(DataflowRunner(), options=options)
   _ = (
       pipeline
       | beam.io.ReadFromPubSub(topic=topic)
       | beam.WindowInto(
           window.GlobalWindows(),
           trigger=trigger.Repeatedly(trigger.AfterCount(4)),
           accumulation_mode=trigger.AccumulationMode.DISCARDING,
           #accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
       )
       | "Print 1" >> beam.transforms.util.LogElements()
       # | beam.combiners.Count.Globally().without_defaults()
       | beam.combiners.Count.Globally()
       | "Print 2" >> beam.transforms.util.LogElements()
   )
   ```
   
   Is it possible to use this combination (Unbounded source + GlobalWindows with DefaultTrigger)?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1248324121


##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more

Review Comment:
   fair. although now combineglobally tests scenarios would be in multiple files. since combiners_test also include combine_globally tests. no strong opinion. on this, fine with me either way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1224909100


##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for our libraries of combine PTransforms."""
+# pytype: skip-file
+
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
+
+
+class CombineGloballyTest(unittest.TestCase):

Review Comment:
   As it should (because it does not have well-defined behavior). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on PR #26922:
URL: https://github.com/apache/beam/pull/26922#issuecomment-1615282381

   > @liferoad lmk if you'd like to merge or wait to give @robertwb  another chance to TAL. 
   
   Let us wait for him.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1255040999


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2676,6 +2685,8 @@ def typed(transform):
 
       def inject_default(_, combined):
         if combined:
+          if len(combined) > 1:
+            _LOGGER.error('Cannot inject default values with: %s', combined)

Review Comment:
   This error message isn't very actionable (or explanatory). What's going on here is that if the PCollection was non-empty, the combined value is passed through, and otherwise (the else clause below) a default value is provided. 
   
   Maybe "Multiple combined values unexpectedly provided for a global combine" would be clearer?



##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more

Review Comment:
   I have a slight preference for keeping things in the same file and being consistent over having to look in multiple files, but won't object if you feel strongly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on PR #26922:
URL: https://github.com/apache/beam/pull/26922#issuecomment-1625688121

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on pull request #26922: add tests

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on PR #26922:
URL: https://github.com/apache/beam/pull/26922#issuecomment-1565610191

   @tvalentyn From my two tests, I do not see any problem with CombineGlobally to process multiple windows as long as each window produces one value since it is required by CombineGlobally. I might misunderstand the issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1240807948


##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more

Review Comment:
   we could but `combiners_test.py` has ~1000 lines now. I would rather create small python files if possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #26922: Fix inject_default in CombineGlobally

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1224799988


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2598,8 +2598,9 @@ def typed(transform):
 
       def inject_default(_, combined):
         if combined:
-          assert len(combined) == 1
-          return combined[0]
+          if len(combined) > 1:
+            _LOGGER.warning('Apply Combined Fn with this list: %s', combined)
+            return combined[-1]

Review Comment:
   We cannot safely just "drop" extra values, giving back an arbitrary, uncombined choice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26922:
URL: https://github.com/apache/beam/pull/26922#issuecomment-1585447919

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @tvalentyn for label python.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #26922: Fix inject_default in CombineGlobally

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1224808670


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2581,7 +2581,7 @@ def add_input_types(transform):
       if pcoll.windowing.windowfn != GlobalWindows():

Review Comment:
   We should probably make this stronger, i.e. prohibiting anything but the default (global windows *and* no non-trivial triggering) windowing. IIRC, pcoll.windowing.is_default() would solve this. I don't think the solution below would give the expected output(emitting exactly once towards the beginning of a pipeline the few values that were combined up to that point). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1224891505


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2581,7 +2581,7 @@ def add_input_types(transform):
       if pcoll.windowing.windowfn != GlobalWindows():

Review Comment:
   I think I understand what you mean now. I changed the condition using what you suggested. But below code has this error `GroupByKey cannot be applied to an unbounded PCollection with global windowing and a default trigger
   pipeline_result = pipeline.run()`:
   ```
   pipeline = beam.Pipeline(DataflowRunner(), options=options)
   _ = (
       pipeline
       | beam.io.ReadFromPubSub(topic=topic)
       | beam.WindowInto(
           window.GlobalWindows(),
           trigger=trigger.Repeatedly(trigger.AfterCount(4)),
           accumulation_mode=trigger.AccumulationMode.DISCARDING,
           #accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
       )
       | "Print 1" >> beam.transforms.util.LogElements()
       # | beam.combiners.Count.Globally().without_defaults()
       | beam.combiners.Count.Globally()
       | "Print 2" >> beam.transforms.util.LogElements()
   )
   ```
   
   Is it possible to use this combination (Unbounded souce + GlobalWindows with DefaultTrigger)?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1227099913


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2587,6 +2587,18 @@ def add_input_types(transform):
             "or CombineGlobally().as_singleton_view() to get the default "
             "output of the CombineFn if the input PCollection is empty.")
 
+      # return the error for this ill-defined streaming case

Review Comment:
   `test_non_liftable_combine` would fail if we did not check the unbounded case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #26922: Fix CombineGlobally with GlobalWindows

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #26922:
URL: https://github.com/apache/beam/pull/26922#discussion_r1256132258


##########
sdks/python/apache_beam/transforms/combine_globally_test.py:
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for our libraries of combine PTransforms."""
+# pytype: skip-file
+
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
+
+
+class CombineGloballyTest(unittest.TestCase):
+  def test_with_periodic_impulse(self):
+    # this error is expected since the below combination is ill-defined.
+    with self.assertRaises(ValueError):
+      with TestPipeline() as p:
+        _ = (
+            p
+            | PeriodicImpulse(
+                start_timestamp=time.time(),
+                stop_timestamp=time.time() + 4,
+                fire_interval=1,
+                apply_windowing=False,
+            )
+            | beam.Map(lambda x: ('c', 1))
+            | beam.WindowInto(
+                window.GlobalWindows(),
+                trigger=trigger.Repeatedly(trigger.AfterCount(2)),
+                accumulation_mode=trigger.AccumulationMode.DISCARDING,
+            )
+            | beam.combiners.Count.Globally()

Review Comment:
   `as_singleton_view` should work, as it's pull-based rather than push-based, but that seems orthogonal to the code here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org