You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "robertwb (via GitHub)" <gi...@apache.org> on 2023/04/27 17:06:01 UTC

[GitHub] [beam] robertwb opened a new pull request, #26451: Add flexible windowing syntax to yaml.

robertwb opened a new pull request, #26451:
URL: https://github.com/apache/beam/pull/26451

   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26451: Add flexible windowing syntax to yaml.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26451:
URL: https://github.com/apache/beam/pull/26451#issuecomment-1526060341

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] twang126 commented on a diff in pull request #26451: Add flexible windowing syntax to yaml.

Posted by "twang126 (via GitHub)" <gi...@apache.org>.
twang126 commented on code in PR #26451:
URL: https://github.com/apache/beam/pull/26451#discussion_r1180636363


##########
sdks/python/apache_beam/yaml/README.md:
##########
@@ -215,6 +215,157 @@ pipeline:
       path: /path/to/big.csv
 ```
 
+## Windowing
+
+This API can be used to define both streaming and batch pipelines.
+In order to meaningfully aggregate elements in a streaming pipeline,
+some kind of windowing is typically required. Beam's
+[windowing](https://beam.apache.org/documentation/programming-guide/#windowing)
+and [triggering](https://beam.apache.org/documentation/programming-guide/#triggers)
+can be be declared using the same WindowInto transform available in all other
+SDKs.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+    - type: WindowInto
+      windowing:
+        type: fixed
+        size: 60
+    - type: SomeAggregation
+    - type: WriteToPubSub
+      topic: anotherPubSubTopic
+```
+
+Rather than using an explicit `WindowInto` operation, one may instead tag a
+transform itself with a specified windowing which will cause its inputs
+(and hence the transform itself) to be applied with that windowing.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+    - type: SomeAggregation
+      windowing:
+        type: sliding
+        size: 60
+        period: 10
+    - type: WriteToPubSub
+      topic: anotherPubSubTopic
+```
+
+Note that the `Sql` operation itself often a from of aggregation, and applying

Review Comment:
   nit: maybe reword to `Note that the `Sql` operation itself  **is** often a from of aggregation, and applying a windowing ~~which~~ will cause all grouping to be done per window.` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] twang126 commented on a diff in pull request #26451: Add flexible windowing syntax to yaml.

Posted by "twang126 (via GitHub)" <gi...@apache.org>.
twang126 commented on code in PR #26451:
URL: https://github.com/apache/beam/pull/26451#discussion_r1180636363


##########
sdks/python/apache_beam/yaml/README.md:
##########
@@ -215,6 +215,157 @@ pipeline:
       path: /path/to/big.csv
 ```
 
+## Windowing
+
+This API can be used to define both streaming and batch pipelines.
+In order to meaningfully aggregate elements in a streaming pipeline,
+some kind of windowing is typically required. Beam's
+[windowing](https://beam.apache.org/documentation/programming-guide/#windowing)
+and [triggering](https://beam.apache.org/documentation/programming-guide/#triggers)
+can be be declared using the same WindowInto transform available in all other
+SDKs.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+    - type: WindowInto
+      windowing:
+        type: fixed
+        size: 60
+    - type: SomeAggregation
+    - type: WriteToPubSub
+      topic: anotherPubSubTopic
+```
+
+Rather than using an explicit `WindowInto` operation, one may instead tag a
+transform itself with a specified windowing which will cause its inputs
+(and hence the transform itself) to be applied with that windowing.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+    - type: SomeAggregation
+      windowing:
+        type: sliding
+        size: 60
+        period: 10
+    - type: WriteToPubSub
+      topic: anotherPubSubTopic
+```
+
+Note that the `Sql` operation itself often a from of aggregation, and applying

Review Comment:
   nit: maybe reword to `Note that the `Sql` operation itself  **is** often a from of aggregation, and applying a windowing ~which~ will cause all grouping to be done per window.` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] twang126 commented on a diff in pull request #26451: Add flexible windowing syntax to yaml.

Posted by "twang126 (via GitHub)" <gi...@apache.org>.
twang126 commented on code in PR #26451:
URL: https://github.com/apache/beam/pull/26451#discussion_r1180683497


##########
sdks/python/apache_beam/yaml/yaml_transform_test.py:
##########
@@ -166,6 +166,121 @@ def test_csv_to_json(self):
       pd.testing.assert_frame_equal(data, result)
 
 
+class CreateTimestamped(beam.PTransform):
+  def __init__(self, elements):
+    self._elements = elements
+
+  def expand(self, p):
+    return (
+        p
+        | beam.Create(self._elements)
+        | beam.Map(lambda x: beam.transforms.window.TimestampedValue(x, x)))
+
+
+class SumGlobally(beam.PTransform):
+  def expand(self, pcoll):
+    return pcoll | beam.CombineGlobally(sum).without_defaults()
+
+
+TEST_PROVIDERS = {
+    'CreateTimestamped': CreateTimestamped, 'SumGlobally': SumGlobally
+}
+
+
+class YamlWindowingTest(unittest.TestCase):
+  def test_explicit_window_into(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: chain
+          transforms:
+            - type: CreateTimestamped
+              elements: [0, 1, 2, 3, 4, 5]
+            - type: WindowInto
+              windowing:
+                type: fixed
+                size: 4
+            - type: SumGlobally
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+
+  def test_windowing_on_input(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: chain
+          transforms:
+            - type: CreateTimestamped
+              elements: [0, 1, 2, 3, 4, 5]
+            - type: SumGlobally
+              windowing:
+                type: fixed
+                size: 4
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+
+  def test_windowing_multiple_inputs(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: composite
+          transforms:
+            - type: CreateTimestamped
+              name: Create1
+              elements: [0, 2, 4]
+            - type: CreateTimestamped
+              name: Create2
+              elements: [1, 3, 5]
+            - type: SumGlobally
+              input: [Create1, Create2]
+              windowing:
+                type: fixed
+                size: 4
+          output: SumGlobally
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+
+  def test_windowing_on_output(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: chain
+          transforms:
+            - type: CreateTimestamped
+              elements: [0, 1, 2, 3, 4, 5]
+              windowing:
+                type: fixed
+                size: 4
+            - type: SumGlobally
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+
+  def test_windowing_on_outer(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: chain
+          transforms:
+            - type: CreateTimestamped
+              elements: [0, 1, 2, 3, 4, 5]
+            - type: SumGlobally
+          windowing:
+            type: fixed
+            size: 4
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+

Review Comment:
   oops missed that one, sounds good! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on pull request #26451: Add flexible windowing syntax to yaml.

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on PR #26451:
URL: https://github.com/apache/beam/pull/26451#issuecomment-1528094372

   The apache_beam.io.tfrecordio_test.TestEnd2EndWriteAndRead.test_end2end_auto_compression failure is unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on pull request #26451: Add flexible windowing syntax to yaml.

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on PR #26451:
URL: https://github.com/apache/beam/pull/26451#issuecomment-1526058971

   R: @twang126


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] twang126 commented on a diff in pull request #26451: Add flexible windowing syntax to yaml.

Posted by "twang126 (via GitHub)" <gi...@apache.org>.
twang126 commented on code in PR #26451:
URL: https://github.com/apache/beam/pull/26451#discussion_r1180643654


##########
sdks/python/apache_beam/yaml/README.md:
##########
@@ -215,6 +215,157 @@ pipeline:
       path: /path/to/big.csv
 ```
 
+## Windowing
+
+This API can be used to define both streaming and batch pipelines.
+In order to meaningfully aggregate elements in a streaming pipeline,
+some kind of windowing is typically required. Beam's
+[windowing](https://beam.apache.org/documentation/programming-guide/#windowing)
+and [triggering](https://beam.apache.org/documentation/programming-guide/#triggers)
+can be be declared using the same WindowInto transform available in all other
+SDKs.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+    - type: WindowInto
+      windowing:
+        type: fixed
+        size: 60
+    - type: SomeAggregation
+    - type: WriteToPubSub
+      topic: anotherPubSubTopic
+```
+
+Rather than using an explicit `WindowInto` operation, one may instead tag a
+transform itself with a specified windowing which will cause its inputs
+(and hence the transform itself) to be applied with that windowing.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+    - type: SomeAggregation
+      windowing:
+        type: sliding
+        size: 60
+        period: 10
+    - type: WriteToPubSub
+      topic: anotherPubSubTopic
+```
+
+Note that the `Sql` operation itself often a from of aggregation, and applying
+a windowing which will cause all grouping to be done per window.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+    - type: Sql
+      query: "select col1, count(*) as c from PCOLLECTION"
+      windowing:
+        type: sessions
+        gap: 60
+    - type: WriteToPubSub
+      topic: anotherPubSubTopic
+```
+
+The specified windowing is applied to all inputs, in this case resulting in
+a join per window.
+
+```
+pipeline:
+  - type: ReadFromPubSub
+    name: ReadLeft
+    topic: leftTopic
+
+  - type: ReadFromPubSub
+    name: ReadRight
+    topic: rightTopic
+
+  - type: Sql
+    query: select left.col1, right.col2 from left join right using (col3)
+    input:
+      left: ReadLeft
+      right: ReadRight
+    windowing:
+      type: fixed
+      size: 60
+```
+
+For a transform with no inputs, the specified windowing is instead applied to
+its output(s). As per the Beam model, the windowing is then inherited by all
+consuming operations. This is especially useful for root operations like Read.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+      windowing:
+        type: fixed
+        size: 60

Review Comment:
   I'm curious how you are planning on supporting triggers? Will the trigger definitions just be listed here as properties of each window? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #26451: Add flexible windowing syntax to yaml.

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #26451:
URL: https://github.com/apache/beam/pull/26451#issuecomment-1526080175

   ## [Codecov](https://codecov.io/gh/apache/beam/pull/26451?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#26451](https://codecov.io/gh/apache/beam/pull/26451?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8121f2b) into [master](https://codecov.io/gh/apache/beam/commit/3440ae59a815e2ee90528b503d116a21ac5b1672?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3440ae5) will **increase** coverage by `0.01%`.
   > The diff coverage is `90.75%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #26451      +/-   ##
   ==========================================
   + Coverage   81.11%   81.12%   +0.01%     
   ==========================================
     Files         469      469              
     Lines       67294    67395     +101     
   ==========================================
   + Hits        54583    54676      +93     
   - Misses      12711    12719       +8     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `81.12% <90.75%> (+0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/26451?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/yaml/yaml\_transform.py](https://codecov.io/gh/apache/beam/pull/26451?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0veWFtbC95YW1sX3RyYW5zZm9ybS5weQ==) | `85.79% <89.88%> (+1.12%)` | :arrow_up: |
   | [sdks/python/apache\_beam/yaml/yaml\_provider.py](https://codecov.io/gh/apache/beam/pull/26451?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0veWFtbC95YW1sX3Byb3ZpZGVyLnB5) | `66.53% <93.33%> (+3.73%)` | :arrow_up: |
   
   ... and [6 files with indirect coverage changes](https://codecov.io/gh/apache/beam/pull/26451/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #26451: Add flexible windowing syntax to yaml.

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #26451:
URL: https://github.com/apache/beam/pull/26451#discussion_r1180682464


##########
sdks/python/apache_beam/yaml/yaml_transform_test.py:
##########
@@ -166,6 +166,121 @@ def test_csv_to_json(self):
       pd.testing.assert_frame_equal(data, result)
 
 
+class CreateTimestamped(beam.PTransform):
+  def __init__(self, elements):
+    self._elements = elements
+
+  def expand(self, p):
+    return (
+        p
+        | beam.Create(self._elements)
+        | beam.Map(lambda x: beam.transforms.window.TimestampedValue(x, x)))
+
+
+class SumGlobally(beam.PTransform):
+  def expand(self, pcoll):
+    return pcoll | beam.CombineGlobally(sum).without_defaults()
+
+
+TEST_PROVIDERS = {
+    'CreateTimestamped': CreateTimestamped, 'SumGlobally': SumGlobally
+}
+
+
+class YamlWindowingTest(unittest.TestCase):
+  def test_explicit_window_into(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: chain
+          transforms:
+            - type: CreateTimestamped
+              elements: [0, 1, 2, 3, 4, 5]
+            - type: WindowInto
+              windowing:
+                type: fixed
+                size: 4
+            - type: SumGlobally
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+
+  def test_windowing_on_input(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: chain
+          transforms:
+            - type: CreateTimestamped
+              elements: [0, 1, 2, 3, 4, 5]
+            - type: SumGlobally
+              windowing:
+                type: fixed
+                size: 4
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+
+  def test_windowing_multiple_inputs(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: composite
+          transforms:
+            - type: CreateTimestamped
+              name: Create1
+              elements: [0, 2, 4]
+            - type: CreateTimestamped
+              name: Create2
+              elements: [1, 3, 5]
+            - type: SumGlobally
+              input: [Create1, Create2]
+              windowing:
+                type: fixed
+                size: 4
+          output: SumGlobally
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+
+  def test_windowing_on_output(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: chain
+          transforms:
+            - type: CreateTimestamped
+              elements: [0, 1, 2, 3, 4, 5]
+              windowing:
+                type: fixed
+                size: 4
+            - type: SumGlobally
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+
+  def test_windowing_on_outer(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: chain
+          transforms:
+            - type: CreateTimestamped
+              elements: [0, 1, 2, 3, 4, 5]
+            - type: SumGlobally
+          windowing:
+            type: fixed
+            size: 4
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+

Review Comment:
   Yes, the last test in the README has multiple windowings. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb merged pull request #26451: Add flexible windowing syntax to yaml.

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb merged PR #26451:
URL: https://github.com/apache/beam/pull/26451


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #26451: Add flexible windowing syntax to yaml.

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #26451:
URL: https://github.com/apache/beam/pull/26451#discussion_r1179468781


##########
sdks/python/apache_beam/yaml/readme_test.py:
##########
@@ -85,13 +86,46 @@ def guess_name_and_type(expr):
     output_schema = [
         guess_name_and_type(expr) for expr in m.group(1).split(',')
     ]
-    return next(iter(inputs.values())).pipeline | beam.Create(
-        [beam.Row(**{name: typ()
-                     for name, typ in output_schema})])
+    output_element = beam.Row(**{name: typ() for name, typ in output_schema})
+    return next(iter(inputs.values())) | beam.Map(

Review Comment:
   Note: previously the output didn't derive from the input so no windowing was inherited. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #26451: Add flexible windowing syntax to yaml.

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #26451:
URL: https://github.com/apache/beam/pull/26451#discussion_r1180660999


##########
sdks/python/apache_beam/yaml/README.md:
##########
@@ -215,6 +215,157 @@ pipeline:
       path: /path/to/big.csv
 ```
 
+## Windowing
+
+This API can be used to define both streaming and batch pipelines.
+In order to meaningfully aggregate elements in a streaming pipeline,
+some kind of windowing is typically required. Beam's
+[windowing](https://beam.apache.org/documentation/programming-guide/#windowing)
+and [triggering](https://beam.apache.org/documentation/programming-guide/#triggers)
+can be be declared using the same WindowInto transform available in all other
+SDKs.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+    - type: WindowInto
+      windowing:
+        type: fixed
+        size: 60
+    - type: SomeAggregation
+    - type: WriteToPubSub
+      topic: anotherPubSubTopic
+```
+
+Rather than using an explicit `WindowInto` operation, one may instead tag a
+transform itself with a specified windowing which will cause its inputs
+(and hence the transform itself) to be applied with that windowing.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+    - type: SomeAggregation
+      windowing:
+        type: sliding
+        size: 60
+        period: 10
+    - type: WriteToPubSub
+      topic: anotherPubSubTopic
+```
+
+Note that the `Sql` operation itself often a from of aggregation, and applying
+a windowing which will cause all grouping to be done per window.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+    - type: Sql
+      query: "select col1, count(*) as c from PCOLLECTION"
+      windowing:
+        type: sessions
+        gap: 60
+    - type: WriteToPubSub
+      topic: anotherPubSubTopic
+```
+
+The specified windowing is applied to all inputs, in this case resulting in
+a join per window.
+
+```
+pipeline:
+  - type: ReadFromPubSub
+    name: ReadLeft
+    topic: leftTopic
+
+  - type: ReadFromPubSub
+    name: ReadRight
+    topic: rightTopic
+
+  - type: Sql
+    query: select left.col1, right.col2 from left join right using (col3)
+    input:
+      left: ReadLeft
+      right: ReadRight
+    windowing:
+      type: fixed
+      size: 60
+```
+
+For a transform with no inputs, the specified windowing is instead applied to
+its output(s). As per the Beam model, the windowing is then inherited by all
+consuming operations. This is especially useful for root operations like Read.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+      windowing:
+        type: fixed
+        size: 60

Review Comment:
   TBD. They'll probably be properties here. (I thought about requiring an extra level of nesting by putting this under `windowfn` but that felt needlessly verbose. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] twang126 commented on a diff in pull request #26451: Add flexible windowing syntax to yaml.

Posted by "twang126 (via GitHub)" <gi...@apache.org>.
twang126 commented on code in PR #26451:
URL: https://github.com/apache/beam/pull/26451#discussion_r1180636363


##########
sdks/python/apache_beam/yaml/README.md:
##########
@@ -215,6 +215,157 @@ pipeline:
       path: /path/to/big.csv
 ```
 
+## Windowing
+
+This API can be used to define both streaming and batch pipelines.
+In order to meaningfully aggregate elements in a streaming pipeline,
+some kind of windowing is typically required. Beam's
+[windowing](https://beam.apache.org/documentation/programming-guide/#windowing)
+and [triggering](https://beam.apache.org/documentation/programming-guide/#triggers)
+can be be declared using the same WindowInto transform available in all other
+SDKs.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+    - type: WindowInto
+      windowing:
+        type: fixed
+        size: 60
+    - type: SomeAggregation
+    - type: WriteToPubSub
+      topic: anotherPubSubTopic
+```
+
+Rather than using an explicit `WindowInto` operation, one may instead tag a
+transform itself with a specified windowing which will cause its inputs
+(and hence the transform itself) to be applied with that windowing.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+    - type: SomeAggregation
+      windowing:
+        type: sliding
+        size: 60
+        period: 10
+    - type: WriteToPubSub
+      topic: anotherPubSubTopic
+```
+
+Note that the `Sql` operation itself often a from of aggregation, and applying

Review Comment:
   nit: maybe reword to `Note that the `Sql` operation itself  is often a from of aggregation, and applying a windowing will cause all grouping to be done per window.` 



##########
sdks/python/apache_beam/yaml/README.md:
##########
@@ -215,6 +215,157 @@ pipeline:
       path: /path/to/big.csv
 ```
 
+## Windowing
+
+This API can be used to define both streaming and batch pipelines.
+In order to meaningfully aggregate elements in a streaming pipeline,
+some kind of windowing is typically required. Beam's
+[windowing](https://beam.apache.org/documentation/programming-guide/#windowing)
+and [triggering](https://beam.apache.org/documentation/programming-guide/#triggers)
+can be be declared using the same WindowInto transform available in all other
+SDKs.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+    - type: WindowInto
+      windowing:
+        type: fixed
+        size: 60
+    - type: SomeAggregation
+    - type: WriteToPubSub
+      topic: anotherPubSubTopic
+```
+
+Rather than using an explicit `WindowInto` operation, one may instead tag a
+transform itself with a specified windowing which will cause its inputs
+(and hence the transform itself) to be applied with that windowing.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+    - type: SomeAggregation
+      windowing:
+        type: sliding
+        size: 60
+        period: 10
+    - type: WriteToPubSub
+      topic: anotherPubSubTopic
+```
+
+Note that the `Sql` operation itself often a from of aggregation, and applying

Review Comment:
   nit: maybe reword to `Note that the `Sql` operation itself is often a from of aggregation, and applying a windowing will cause all grouping to be done per window.` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] twang126 commented on a diff in pull request #26451: Add flexible windowing syntax to yaml.

Posted by "twang126 (via GitHub)" <gi...@apache.org>.
twang126 commented on code in PR #26451:
URL: https://github.com/apache/beam/pull/26451#discussion_r1180666428


##########
sdks/python/apache_beam/yaml/yaml_transform_test.py:
##########
@@ -166,6 +166,121 @@ def test_csv_to_json(self):
       pd.testing.assert_frame_equal(data, result)
 
 
+class CreateTimestamped(beam.PTransform):
+  def __init__(self, elements):
+    self._elements = elements
+
+  def expand(self, p):
+    return (
+        p
+        | beam.Create(self._elements)
+        | beam.Map(lambda x: beam.transforms.window.TimestampedValue(x, x)))
+
+
+class SumGlobally(beam.PTransform):
+  def expand(self, pcoll):
+    return pcoll | beam.CombineGlobally(sum).without_defaults()
+
+
+TEST_PROVIDERS = {
+    'CreateTimestamped': CreateTimestamped, 'SumGlobally': SumGlobally
+}
+
+
+class YamlWindowingTest(unittest.TestCase):
+  def test_explicit_window_into(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: chain
+          transforms:
+            - type: CreateTimestamped
+              elements: [0, 1, 2, 3, 4, 5]
+            - type: WindowInto
+              windowing:
+                type: fixed
+                size: 4
+            - type: SumGlobally
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+
+  def test_windowing_on_input(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: chain
+          transforms:
+            - type: CreateTimestamped
+              elements: [0, 1, 2, 3, 4, 5]
+            - type: SumGlobally
+              windowing:
+                type: fixed
+                size: 4
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+
+  def test_windowing_multiple_inputs(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: composite
+          transforms:
+            - type: CreateTimestamped
+              name: Create1
+              elements: [0, 2, 4]
+            - type: CreateTimestamped
+              name: Create2
+              elements: [1, 3, 5]
+            - type: SumGlobally
+              input: [Create1, Create2]
+              windowing:
+                type: fixed
+                size: 4
+          output: SumGlobally
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+
+  def test_windowing_on_output(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: chain
+          transforms:
+            - type: CreateTimestamped
+              elements: [0, 1, 2, 3, 4, 5]
+              windowing:
+                type: fixed
+                size: 4
+            - type: SumGlobally
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+
+  def test_windowing_on_outer(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: chain
+          transforms:
+            - type: CreateTimestamped
+              elements: [0, 1, 2, 3, 4, 5]
+            - type: SumGlobally
+          windowing:
+            type: fixed
+            size: 4
+          ''',
+          providers=TEST_PROVIDERS)
+      assert_that(result, equal_to([6, 9]))
+

Review Comment:
   Maybe worth adding a test where there's multiple windows in a pipeline (if its supported)? e.g. re-windowing one of the outputs of an aggregation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #26451: Add flexible windowing syntax to yaml.

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on code in PR #26451:
URL: https://github.com/apache/beam/pull/26451#discussion_r1180659960


##########
sdks/python/apache_beam/yaml/README.md:
##########
@@ -215,6 +215,157 @@ pipeline:
       path: /path/to/big.csv
 ```
 
+## Windowing
+
+This API can be used to define both streaming and batch pipelines.
+In order to meaningfully aggregate elements in a streaming pipeline,
+some kind of windowing is typically required. Beam's
+[windowing](https://beam.apache.org/documentation/programming-guide/#windowing)
+and [triggering](https://beam.apache.org/documentation/programming-guide/#triggers)
+can be be declared using the same WindowInto transform available in all other
+SDKs.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+    - type: WindowInto
+      windowing:
+        type: fixed
+        size: 60
+    - type: SomeAggregation
+    - type: WriteToPubSub
+      topic: anotherPubSubTopic
+```
+
+Rather than using an explicit `WindowInto` operation, one may instead tag a
+transform itself with a specified windowing which will cause its inputs
+(and hence the transform itself) to be applied with that windowing.
+
+```
+pipeline:
+  type: chain
+  transforms:
+    - type: ReadFromPubSub
+      topic: myPubSubTopic
+    - type: SomeAggregation
+      windowing:
+        type: sliding
+        size: 60
+        period: 10
+    - type: WriteToPubSub
+      topic: anotherPubSubTopic
+```
+
+Note that the `Sql` operation itself often a from of aggregation, and applying

Review Comment:
   Thanks. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org