You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/07/28 18:09:33 UTC

[GitHub] [beam] riteshghorse opened a new pull request #15239: [BEAM-3304] added struct field for trigger

riteshghorse opened a new pull request #15239:
URL: https://github.com/apache/beam/pull/15239


   Trigger support for Go SDK
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r683717984



##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +91,32 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+// TriggerWindowSums, much like WindowSums described above has an addition of configuring
+// a trigger here. SetDefault works fine. Other triggers such as SetAlways throws
+// pane decoding error.
+func TriggerWindowSums(s beam.Scope, sumPerKey func(beam.Scope, beam.PCollection) beam.PCollection) {
+	timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{4, 9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s))
+
+	windowSize := 3 * time.Second
+
+	validate := func(s beam.Scope, wfn *window.Fn, in beam.PCollection, expected ...interface{}) {
+		// Window the data.
+		windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: window.Always})
+		// To get the pane decoding error, change above statement to
+		// windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: window.Always})
+		// Perform the appropriate sum operation.
+		sums := sumPerKey(s, windowed)
+		// Drop back to Global windows, and drop the key otherwise passert.Equals doesn't work.
+		sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+		sums = beam.DropKey(s, sums)
+		passert.Equals(s, sums, expected...)
+	}
+
+	// Use fixed windows to divide the data into 3 chunks.
+	validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), timestampedData, 15, 15, 15)

Review comment:
       Yes. I think there is something missing in the trigger implementation. Will be working on it next.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r690004841



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -144,8 +144,8 @@ func (n *DataSource) Process(ctx context.Context) error {
 		if n.incrementIndexAndCheckSplit() {
 			return nil
 		}
-		// TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes?
-		ws, t, err := DecodeWindowedValueHeader(wc, r)
+    // TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes?

Review comment:
       Probably need to run gofmt again.

##########
File path: sdks/go/pkg/beam/windowing.go
##########
@@ -21,21 +21,48 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
+type WindowIntoOption interface {
+	windowIntoOption()
+}
+
+type WindowTrigger struct {
+	Name window.Trigger
+}
+
+func (t WindowTrigger) windowIntoOption() {}
+
+type AccumulationMode struct {
+	Mode window.AccumulationMode
+}
+
+func (m AccumulationMode) windowIntoOption() {}
+
 // WindowInto applies the windowing strategy to each element.
-func WindowInto(s Scope, ws *window.Fn, col PCollection) PCollection {
-	return Must(TryWindowInto(s, ws, col))
+func WindowInto(s Scope, ws *window.Fn, col PCollection, opts ...WindowIntoOption) PCollection {
+	return Must(TryWindowInto(s, ws, col, opts...))
 }
 
 // TryWindowInto attempts to insert a WindowInto transform.
-func TryWindowInto(s Scope, ws *window.Fn, col PCollection) (PCollection, error) {
+func TryWindowInto(s Scope, wfn *window.Fn, col PCollection, opts ...WindowIntoOption) (PCollection, error) {
 	if !s.IsValid() {
 		return PCollection{}, errors.New("invalid scope")
 	}
 	if !col.IsValid() {
 		return PCollection{}, errors.New("invalid input pcollection")
 	}
+	ws := window.WindowingStrategy{Fn: wfn, Trigger: window.Trigger{}}
+	for _, opt := range opts {
+		switch opt := opt.(type) {
+		case WindowTrigger:
+			ws.Trigger = opt.Name
+		case AccumulationMode:
+			ws.AccumulationMode = opt.Mode
+		default:
+			panic("Invalid option for Windowing Strategy")

Review comment:
       Since WindowIntoOptions are implementers of interfaces, we can improve this by actually communicating what's going wrong. That is something like `panic(fmt.Sprintf("Unknown WindowingInto option type: %T: %v", opt, opt))` which communicates the type, and if it's printable somehow so it's easier to find what needs working on.
   
   Since an error like this is likely an SDK dev error, a panic is fine, as there's nothing a user can do about it programmatically.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,22 +981,135 @@ func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) (
 	} else {
 		mergeStat = pipepb.MergeStatus_NON_MERGING
 	}
+
 	ws := &pipepb.WindowingStrategy{
 		WindowFn:         windowFn,
 		MergeStatus:      mergeStat,
-		AccumulationMode: pipepb.AccumulationMode_DISCARDING,
 		WindowCoderId:    windowCoderId,
-		Trigger: &pipepb.Trigger{
+		Trigger:          makeTrigger(w.Trigger),
+		AccumulationMode: makeAccumulationMode(w.AccumulationMode),
+		OutputTime:       pipepb.OutputTime_END_OF_WINDOW,
+		ClosingBehavior:  pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+		AllowedLateness:  0,
+		OnTimeBehavior:   pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY,
+	}
+	return ws, nil
+}
+
+func makeAccumulationMode(m window.AccumulationMode) pipepb.AccumulationMode_Enum {
+	switch m {
+	case window.Accumulating:
+		return pipepb.AccumulationMode_ACCUMULATING
+	case window.Discarding:
+		return pipepb.AccumulationMode_DISCARDING
+	case window.Unspecified:
+		return pipepb.AccumulationMode_UNSPECIFIED
+	case window.Retracting:
+		return pipepb.AccumulationMode_RETRACTING
+	default:
+		return pipepb.AccumulationMode_DISCARDING
+	}
+}
+
+func makeTrigger(t window.Trigger) *pipepb.Trigger {
+	switch t.Kind {
+	case window.DefaultTrigger:
+		return &pipepb.Trigger{
 			Trigger: &pipepb.Trigger_Default_{
 				Default: &pipepb.Trigger_Default{},
 			},
-		},
-		OutputTime:      pipepb.OutputTime_END_OF_WINDOW,
-		ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
-		AllowedLateness: 0,
-		OnTimeBehavior:  pipepb.OnTimeBehavior_FIRE_ALWAYS,
+		}
+	case window.AlwaysTrigger:
+		return &pipepb.Trigger{
+			Trigger: &pipepb.Trigger_Always_{
+				Always: &pipepb.Trigger_Always{},
+			},
+		}
+	case window.AfterAnyTrigger:
+		return &pipepb.Trigger{
+			Trigger: &pipepb.Trigger_AfterAny_{
+				AfterAny: &pipepb.Trigger_AfterAny{
+					Subtriggers: extractSubtriggers(t.SubTriggers),
+				},
+			},
+		}
+	case window.AfterAllTrigger:
+		return &pipepb.Trigger{
+			Trigger: &pipepb.Trigger_AfterAll_{
+				AfterAll: &pipepb.Trigger_AfterAll{
+					Subtriggers: extractSubtriggers(t.SubTriggers),
+				},
+			},
+		}
+	case window.AfterProcessingTimeTrigger:
+		// TODO: Right now would work only for single delay value.

Review comment:
       Prefer not to leave unreferenced TODOs. Add the JIRA (BEAM-3304 ) for these or file a new one for these specific TODOs and list them as subtasks to BEAM-3304




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-889438127


   R: @lostluck 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-889509698


   I changed parts in WindowMarshallingStrategy in translate.go to try out triggers. But did undo all before committing. I'll write a separate test doing this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r682988766



##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"io"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+	FIRST     int = 0
+	ONE_INDEX int = 1
+	TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+	if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == typex.UNKNOWN {
+		return FIRST
+	} else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+		return ONE_INDEX
+	} else {
+		return TWO_INDEX
+	}

Review comment:
       Prefer using a switch statment for things like this instead of relying on if-else ladders.
   ```suggestion
   	switch {
   	case v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == typex.UNKNOWN:
    		return FIRST
   	case v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY:
   		return ONE_INDEX
   	default:
   		return TWO_INDEX
   	}
   ```

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"io"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+	FIRST     int = 0
+	ONE_INDEX int = 1
+	TWO_INDEX int = 2

Review comment:
       In Go, constants, exported or not should use MixedCaps if they're Exported or mixedCaps if they're unexported never ALL_CAPS.

##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,23 @@ type Window interface {
 	Equals(o Window) bool
 }
 
+type Timing string

Review comment:
       Instead of using a string, consider using an `int` (or even a uint8) as the base type. It's more compact in memory. It also lets one avoid doing string comparisons to know values for encodings, and avoids some awkward conversions that this PR is currently making.
   
   See https://yourbasic.org/golang/iota/ for a good rundown for how to do "enums" in Go. As well as in Effective Go. https://golang.org/doc/effective_go#constants

##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,23 @@ type Window interface {
 	Equals(o Window) bool
 }
 
+type Timing string
+
+const (
+	EARLY   Timing = "EARLY"
+	ON_TIME Timing = "ON_TIME"
+	LATE    Timing = "LATE"
+	UNKNOWN Timing = "UNKNOWN"
+)
+
+type PaneInfo struct {
+	Timing              Timing
+	IsFirst             bool
+	IsLast              bool
+	Index               int64
+	NonSpeculativeIndex int64

Review comment:
       Note, there's nothing wrong with the current approach, I'm pointing out a thing about Go:
   
   In this case, you can write these fields more compactly as: 
   ```suggestion
   	IsFirst, IsLast				bool
   	Index, NonSpeculativeIndex		int64
   ```

##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,23 @@ type Window interface {
 	Equals(o Window) bool
 }
 
+type Timing string
+
+const (
+	EARLY   Timing = "EARLY"
+	ON_TIME Timing = "ON_TIME"
+	LATE    Timing = "LATE"
+	UNKNOWN Timing = "UNKNOWN"

Review comment:
       Despite these being constants, it's not idiomatic go to use All caps for constants. 
   
   I also recommend prefixing what these are related to eg. PaneEarly, PaneOnTime, PaneLate, PaneUnknown 
   
   Mostly so they're adjacent in Go Doc. vs suffixing, which is a bit more readable.
   
   We can only get away without prefixing because we don't have a "pane" package which would delare what they're for earlier. In this case we shouldn't have a pane package, it would be too small for practical uses.
   
   This is because the typex package has a pretty grab bag set of values, so the enumerations need to be made clearer for their use elsewhere.
   
   

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger.go
##########
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+// TODO [BEAM-3304](riteshghorse): add configurable parameters to trigger
+type TriggerType string
+
+const (
+	Default  TriggerType = "Trigger_Default_"
+	Always   TriggerType = "Trigger_Always_"
+	AfterAny TriggerType = "Trigger_AfterAny_"
+	AfterAll TriggerType = "Trigger_AfterAll_"
+)
+
+func (ws *WindowingStrategy) SetAfterAll() {
+	ws.Trigger = AfterAll
+}
+
+func (ws *WindowingStrategy) SetAfterAny() {
+	ws.Trigger = AfterAny
+}
+
+func (ws *WindowingStrategy) SetAlways() {
+	ws.Trigger = Always
+}
+
+func (ws *WindowingStrategy) SetDefault() {
+	ws.Trigger = Default
+}

Review comment:
       We can probably remove these helper methods since they aren't being called, in favour of directly setting the fields ourselves.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/coder.go
##########
@@ -1088,25 +1121,22 @@ func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t typex.Eve
 	if err := enc.Encode(ws, w); err != nil {
 		return err
 	}
-	_, err := w.Write(paneNoFiring)
+	err := coder.EncodePane(p, w)
 	return err
 }
 
 // DecodeWindowedValueHeader deserializes a windowed value header.
-func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window, typex.EventTime, error) {
+func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window, typex.EventTime, typex.PaneInfo, error) {
 	// Encoding: Timestamp, Window, Pane (header) + Element
-
+	pn := typex.PaneInfo{}

Review comment:
       It would be more idiomatic to declare the zero value in place for the returns, which makes it explicit that they're the zero value. 
   The reason for this is to limit scope, and make it easier on readers. If possible we keep that as small as possible. With a pre-declaration like this, a reader has to keep an eye out for it, for when it's used, and what might have affected it before it's returned. By writing our intent (return a zero value on error) directly where it's happening, we avoid this.
   
   That said, I understand the repetition can be tiresome. You can instead have defined in this function scope helper function
   
   `onError := func(err error) { return  nil, mtime.ZeroTimestamp, typex.PaneInfo{}, err }`
   
   Which you can then call as `return onError(err)` and it will compile down to the same thing.
   
   Non-pointer zeros being inconvenient is a hot topic of discussion for Go and eventually someone will propose a satisfying solution for it.

##########
File path: sdks/go/pkg/beam/windowing.go
##########
@@ -21,21 +21,40 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
+type WindowIntoOption interface {
+	windowIntoOption()
+}
+
+type WindowTrigger struct {
+	Name window.TriggerType
+}
+
+func (t WindowTrigger) windowIntoOption() {}
+
 // WindowInto applies the windowing strategy to each element.
-func WindowInto(s Scope, ws *window.Fn, col PCollection) PCollection {
-	return Must(TryWindowInto(s, ws, col))
+func WindowInto(s Scope, ws *window.Fn, col PCollection, opts ...WindowIntoOption) PCollection {
+	return Must(TryWindowInto(s, ws, col, opts...))
 }
 
 // TryWindowInto attempts to insert a WindowInto transform.
-func TryWindowInto(s Scope, ws *window.Fn, col PCollection) (PCollection, error) {
+func TryWindowInto(s Scope, wfn *window.Fn, col PCollection, opts ...WindowIntoOption) (PCollection, error) {
 	if !s.IsValid() {
 		return PCollection{}, errors.New("invalid scope")
 	}
 	if !col.IsValid() {
 		return PCollection{}, errors.New("invalid input pcollection")
 	}
+	ws := window.WindowingStrategy{Fn: wfn, Trigger: window.Default}
+	for _, opt := range opts {
+		switch opt.(type) {
+		case WindowTrigger:
+			ws.Trigger = opt.(WindowTrigger).Name
+		default:
+			ws.Trigger = window.Default

Review comment:
       Since this switch is only run if we have options, it's valid to return an error or even `panic`if we don't know what type of option it is. That way users won't accidently override earlier set triggers because an implementation was half done somehow.

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"io"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+	FIRST     int = 0
+	ONE_INDEX int = 1
+	TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+	if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == typex.UNKNOWN {
+		return FIRST
+	} else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+		return ONE_INDEX
+	} else {
+		return TWO_INDEX
+	}
+}
+
+func timing(v typex.Timing) int {
+	if v == typex.EARLY {
+		return 0
+	} else if v == typex.ON_TIME {
+		return 1
+	} else if v == typex.LATE {
+		return 2
+	} else {
+		return 3
+	}
+}
+
+// EncodePane encodes a single byte.

Review comment:
       ```suggestion
   // EncodePane encodes a typex.PaneInfo.
   ```

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger.go
##########
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+// TODO [BEAM-3304](riteshghorse): add configurable parameters to trigger
+type TriggerType string
+
+const (
+	Default  TriggerType = "Trigger_Default_"
+	Always   TriggerType = "Trigger_Always_"
+	AfterAny TriggerType = "Trigger_AfterAny_"
+	AfterAll TriggerType = "Trigger_AfterAll_"

Review comment:
       WRT to the const names here, we probably want to call these DefaultTrigger, AlwaysTrigger, etc, since users will see these as  "window.Default" and "window.Always" and it won't necessarily be clear what they mean.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -134,7 +134,7 @@ func (n *DataSource) Process(ctx context.Context) error {
 		}
 		pe.Timestamp = t
 		pe.Windows = ws
-
+		pe.Pane = pn

Review comment:
       Please keep the blank line before the next block. 
   Blank lines used carefully help readability.

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +91,32 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+// TriggerWindowSums, much like WindowSums described above has an addition of configuring
+// a trigger here. SetDefault works fine. Other triggers such as SetAlways throws
+// pane decoding error.
+func TriggerWindowSums(s beam.Scope, sumPerKey func(beam.Scope, beam.PCollection) beam.PCollection) {
+	timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{4, 9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s))
+
+	windowSize := 3 * time.Second
+
+	validate := func(s beam.Scope, wfn *window.Fn, in beam.PCollection, expected ...interface{}) {
+		// Window the data.
+		windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: window.Always})
+		// To get the pane decoding error, change above statement to
+		// windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: window.Always})
+		// Perform the appropriate sum operation.
+		sums := sumPerKey(s, windowed)
+		// Drop back to Global windows, and drop the key otherwise passert.Equals doesn't work.
+		sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+		sums = beam.DropKey(s, sums)
+		passert.Equals(s, sums, expected...)
+	}
+
+	// Use fixed windows to divide the data into 3 chunks.
+	validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), timestampedData, 15, 15, 15)

Review comment:
       Doesn't the "Always" trigger happen on every element? Should we be actually expecting the sums, or the individual elements?  (I don't know what's correct here, outside of the default trigger.)

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"io"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+	FIRST     int = 0
+	ONE_INDEX int = 1
+	TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+	if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == typex.UNKNOWN {
+		return FIRST
+	} else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+		return ONE_INDEX
+	} else {
+		return TWO_INDEX
+	}
+}
+
+func timing(v typex.Timing) int {
+	if v == typex.EARLY {
+		return 0
+	} else if v == typex.ON_TIME {
+		return 1
+	} else if v == typex.LATE {
+		return 2
+	} else {
+		return 3
+	}
+}
+
+// EncodePane encodes a single byte.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+	// Encoding: typex.PaneInfo
+
+	pane := 0
+	if v.IsFirst {
+		pane |= 1
+	}
+	if v.IsLast {
+		pane |= 2
+	}
+	pane |= timing(v.Timing) << 2
+
+	switch chooseEncoding(v) {
+	case FIRST:
+		paneByte := []byte{byte(pane)}
+		w.Write(paneByte)
+	case ONE_INDEX:
+		paneByte := []byte{byte(pane | (ONE_INDEX)<<4)}
+		w.Write(paneByte)
+		EncodeVarInt(v.Index, w)
+	case TWO_INDEX:
+		paneByte := []byte{byte(pane | (TWO_INDEX)<<4)}
+		w.Write(paneByte)
+		EncodeVarInt(v.Index, w)
+		EncodeVarInt(v.NonSpeculativeIndex, w)
+	}
+	return nil
+}
+
+func encodingType(b byte) int64 {
+	return int64(b >> 4)
+}
+
+func NewPane(b byte) typex.PaneInfo {
+	pn := typex.PaneInfo{}
+	if b&0x01 == 1 {
+		pn.IsFirst = true
+	}
+	if b&0x02 == 2 {
+		pn.IsLast = true
+	}
+	switch int64((b >> 2) & 0x03) {
+	case 0:
+		pn.Timing = typex.EARLY
+	case 1:
+		pn.Timing = typex.ON_TIME
+	case 2:
+		pn.Timing = typex.LATE
+	case 3:
+		pn.Timing = typex.UNKNOWN
+	}
+
+	return pn
+}
+
+// DecodePane decodes a single byte.
+func DecodePane(r io.Reader) (typex.PaneInfo, error) {
+	// Decoding: typex.PaneInfo
+	var data [1]byte
+	if err := ioutilx.ReadNBufUnsafe(r, data[:]); err != nil { // NO_FIRING pane
+		return typex.PaneInfo{}, err
+	}
+	pn := NewPane(data[0] & 0x0f)
+	switch encodingType(data[0]) {

Review comment:
       Cleaning up this switch statement I'll leave as an excercise based on my other comments.

##########
File path: sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
##########
@@ -117,7 +118,7 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, er
 		// URL Query-escaped windowed _unnested_ value. It is read back in
 		// a nested context at runtime.
 		var buf bytes.Buffer
-		if err := exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()), window.SingleGlobalWindow, mtime.ZeroTimestamp, &buf); err != nil {
+		if err := exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()), window.SingleGlobalWindow, mtime.ZeroTimestamp, typex.PaneInfo{}, &buf); err != nil {

Review comment:
       Question: Is the zero value for your pane info type identical to the "No firing" pane we've been writing? If so, very nice!
   
   Optionally, also add a function to the typex package: typex.NoFiringPane() to document the intended semantic meaning of the type at the call sights like this.

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"io"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+	FIRST     int = 0
+	ONE_INDEX int = 1
+	TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+	if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == typex.UNKNOWN {
+		return FIRST
+	} else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+		return ONE_INDEX
+	} else {
+		return TWO_INDEX
+	}
+}
+
+func timing(v typex.Timing) int {
+	if v == typex.EARLY {
+		return 0
+	} else if v == typex.ON_TIME {
+		return 1
+	} else if v == typex.LATE {
+		return 2
+	} else {
+		return 3
+	}
+}
+
+// EncodePane encodes a single byte.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+	// Encoding: typex.PaneInfo
+
+	pane := 0
+	if v.IsFirst {
+		pane |= 1
+	}
+	if v.IsLast {
+		pane |= 2
+	}
+	pane |= timing(v.Timing) << 2
+
+	switch chooseEncoding(v) {
+	case FIRST:
+		paneByte := []byte{byte(pane)}
+		w.Write(paneByte)
+	case ONE_INDEX:
+		paneByte := []byte{byte(pane | (ONE_INDEX)<<4)}
+		w.Write(paneByte)
+		EncodeVarInt(v.Index, w)
+	case TWO_INDEX:
+		paneByte := []byte{byte(pane | (TWO_INDEX)<<4)}
+		w.Write(paneByte)
+		EncodeVarInt(v.Index, w)
+		EncodeVarInt(v.NonSpeculativeIndex, w)
+	}
+	return nil
+}
+
+func encodingType(b byte) int64 {
+	return int64(b >> 4)
+}
+
+func NewPane(b byte) typex.PaneInfo {
+	pn := typex.PaneInfo{}
+	if b&0x01 == 1 {
+		pn.IsFirst = true
+	}
+	if b&0x02 == 2 {
+		pn.IsLast = true
+	}
+	switch int64((b >> 2) & 0x03) {
+	case 0:
+		pn.Timing = typex.EARLY
+	case 1:
+		pn.Timing = typex.ON_TIME
+	case 2:
+		pn.Timing = typex.LATE
+	case 3:
+		pn.Timing = typex.UNKNOWN
+	}

Review comment:
       If you define those arrival conditions (early late etc)  as based on integers or uints with iota, you can replace this whole switch with: `pn.Timing = typex.Timing((b >> 2) & 0x03)`

##########
File path: sdks/go/pkg/beam/core/runtime/exec/coder.go
##########
@@ -1079,7 +1112,7 @@ func (*intervalWindowDecoder) DecodeSingle(r io.Reader) (typex.Window, error) {
 var paneNoFiring = []byte{0xf}

Review comment:
       If we aren't using this variable anywhere anymore we can remove it.

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"io"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+	FIRST     int = 0
+	ONE_INDEX int = 1
+	TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+	if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == typex.UNKNOWN {
+		return FIRST
+	} else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+		return ONE_INDEX
+	} else {
+		return TWO_INDEX
+	}

Review comment:
       As a result, we can probably just move these conditions into the encoder, rather than relating them through constants, and have a comment that explains their differences, which you're using as a the constant name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-892993803


   PTAL.
   
   Added pane helpers and changed all affecting function signatures/receivers. The error of `source decode failed  cause by ____` seems to have gone. I tested this the old way (TestTriggerWindowSums_GBK). Next step would be writing isolated trigger tests.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r682988766



##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"io"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+	FIRST     int = 0
+	ONE_INDEX int = 1
+	TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+	if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == typex.UNKNOWN {
+		return FIRST
+	} else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+		return ONE_INDEX
+	} else {
+		return TWO_INDEX
+	}

Review comment:
       Prefer using a switch statment for things like this instead of relying on if-else ladders.
   ```suggestion
   	switch {
   	case v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == typex.UNKNOWN:
    		return FIRST
   	case v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY:
   		return ONE_INDEX
   	default:
   		return TWO_INDEX
   	}
   ```

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"io"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+	FIRST     int = 0
+	ONE_INDEX int = 1
+	TWO_INDEX int = 2

Review comment:
       In Go, constants, exported or not should use MixedCaps if they're Exported or mixedCaps if they're unexported never ALL_CAPS.

##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,23 @@ type Window interface {
 	Equals(o Window) bool
 }
 
+type Timing string

Review comment:
       Instead of using a string, consider using an `int` (or even a uint8) as the base type. It's more compact in memory. It also lets one avoid doing string comparisons to know values for encodings, and avoids some awkward conversions that this PR is currently making.
   
   See https://yourbasic.org/golang/iota/ for a good rundown for how to do "enums" in Go. As well as in Effective Go. https://golang.org/doc/effective_go#constants

##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,23 @@ type Window interface {
 	Equals(o Window) bool
 }
 
+type Timing string
+
+const (
+	EARLY   Timing = "EARLY"
+	ON_TIME Timing = "ON_TIME"
+	LATE    Timing = "LATE"
+	UNKNOWN Timing = "UNKNOWN"
+)
+
+type PaneInfo struct {
+	Timing              Timing
+	IsFirst             bool
+	IsLast              bool
+	Index               int64
+	NonSpeculativeIndex int64

Review comment:
       Note, there's nothing wrong with the current approach, I'm pointing out a thing about Go:
   
   In this case, you can write these fields more compactly as: 
   ```suggestion
   	IsFirst, IsLast				bool
   	Index, NonSpeculativeIndex		int64
   ```

##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,23 @@ type Window interface {
 	Equals(o Window) bool
 }
 
+type Timing string
+
+const (
+	EARLY   Timing = "EARLY"
+	ON_TIME Timing = "ON_TIME"
+	LATE    Timing = "LATE"
+	UNKNOWN Timing = "UNKNOWN"

Review comment:
       Despite these being constants, it's not idiomatic go to use All caps for constants. 
   
   I also recommend prefixing what these are related to eg. PaneEarly, PaneOnTime, PaneLate, PaneUnknown 
   
   Mostly so they're adjacent in Go Doc. vs suffixing, which is a bit more readable.
   
   We can only get away without prefixing because we don't have a "pane" package which would delare what they're for earlier. In this case we shouldn't have a pane package, it would be too small for practical uses.
   
   This is because the typex package has a pretty grab bag set of values, so the enumerations need to be made clearer for their use elsewhere.
   
   

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger.go
##########
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+// TODO [BEAM-3304](riteshghorse): add configurable parameters to trigger
+type TriggerType string
+
+const (
+	Default  TriggerType = "Trigger_Default_"
+	Always   TriggerType = "Trigger_Always_"
+	AfterAny TriggerType = "Trigger_AfterAny_"
+	AfterAll TriggerType = "Trigger_AfterAll_"
+)
+
+func (ws *WindowingStrategy) SetAfterAll() {
+	ws.Trigger = AfterAll
+}
+
+func (ws *WindowingStrategy) SetAfterAny() {
+	ws.Trigger = AfterAny
+}
+
+func (ws *WindowingStrategy) SetAlways() {
+	ws.Trigger = Always
+}
+
+func (ws *WindowingStrategy) SetDefault() {
+	ws.Trigger = Default
+}

Review comment:
       We can probably remove these helper methods since they aren't being called, in favour of directly setting the fields ourselves.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/coder.go
##########
@@ -1088,25 +1121,22 @@ func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t typex.Eve
 	if err := enc.Encode(ws, w); err != nil {
 		return err
 	}
-	_, err := w.Write(paneNoFiring)
+	err := coder.EncodePane(p, w)
 	return err
 }
 
 // DecodeWindowedValueHeader deserializes a windowed value header.
-func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window, typex.EventTime, error) {
+func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window, typex.EventTime, typex.PaneInfo, error) {
 	// Encoding: Timestamp, Window, Pane (header) + Element
-
+	pn := typex.PaneInfo{}

Review comment:
       It would be more idiomatic to declare the zero value in place for the returns, which makes it explicit that they're the zero value. 
   The reason for this is to limit scope, and make it easier on readers. If possible we keep that as small as possible. With a pre-declaration like this, a reader has to keep an eye out for it, for when it's used, and what might have affected it before it's returned. By writing our intent (return a zero value on error) directly where it's happening, we avoid this.
   
   That said, I understand the repetition can be tiresome. You can instead have defined in this function scope helper function
   
   `onError := func(err error) { return  nil, mtime.ZeroTimestamp, typex.PaneInfo{}, err }`
   
   Which you can then call as `return onError(err)` and it will compile down to the same thing.
   
   Non-pointer zeros being inconvenient is a hot topic of discussion for Go and eventually someone will propose a satisfying solution for it.

##########
File path: sdks/go/pkg/beam/windowing.go
##########
@@ -21,21 +21,40 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
+type WindowIntoOption interface {
+	windowIntoOption()
+}
+
+type WindowTrigger struct {
+	Name window.TriggerType
+}
+
+func (t WindowTrigger) windowIntoOption() {}
+
 // WindowInto applies the windowing strategy to each element.
-func WindowInto(s Scope, ws *window.Fn, col PCollection) PCollection {
-	return Must(TryWindowInto(s, ws, col))
+func WindowInto(s Scope, ws *window.Fn, col PCollection, opts ...WindowIntoOption) PCollection {
+	return Must(TryWindowInto(s, ws, col, opts...))
 }
 
 // TryWindowInto attempts to insert a WindowInto transform.
-func TryWindowInto(s Scope, ws *window.Fn, col PCollection) (PCollection, error) {
+func TryWindowInto(s Scope, wfn *window.Fn, col PCollection, opts ...WindowIntoOption) (PCollection, error) {
 	if !s.IsValid() {
 		return PCollection{}, errors.New("invalid scope")
 	}
 	if !col.IsValid() {
 		return PCollection{}, errors.New("invalid input pcollection")
 	}
+	ws := window.WindowingStrategy{Fn: wfn, Trigger: window.Default}
+	for _, opt := range opts {
+		switch opt.(type) {
+		case WindowTrigger:
+			ws.Trigger = opt.(WindowTrigger).Name
+		default:
+			ws.Trigger = window.Default

Review comment:
       Since this switch is only run if we have options, it's valid to return an error or even `panic`if we don't know what type of option it is. That way users won't accidently override earlier set triggers because an implementation was half done somehow.

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"io"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+	FIRST     int = 0
+	ONE_INDEX int = 1
+	TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+	if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == typex.UNKNOWN {
+		return FIRST
+	} else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+		return ONE_INDEX
+	} else {
+		return TWO_INDEX
+	}
+}
+
+func timing(v typex.Timing) int {
+	if v == typex.EARLY {
+		return 0
+	} else if v == typex.ON_TIME {
+		return 1
+	} else if v == typex.LATE {
+		return 2
+	} else {
+		return 3
+	}
+}
+
+// EncodePane encodes a single byte.

Review comment:
       ```suggestion
   // EncodePane encodes a typex.PaneInfo.
   ```

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger.go
##########
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+// TODO [BEAM-3304](riteshghorse): add configurable parameters to trigger
+type TriggerType string
+
+const (
+	Default  TriggerType = "Trigger_Default_"
+	Always   TriggerType = "Trigger_Always_"
+	AfterAny TriggerType = "Trigger_AfterAny_"
+	AfterAll TriggerType = "Trigger_AfterAll_"

Review comment:
       WRT to the const names here, we probably want to call these DefaultTrigger, AlwaysTrigger, etc, since users will see these as  "window.Default" and "window.Always" and it won't necessarily be clear what they mean.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -134,7 +134,7 @@ func (n *DataSource) Process(ctx context.Context) error {
 		}
 		pe.Timestamp = t
 		pe.Windows = ws
-
+		pe.Pane = pn

Review comment:
       Please keep the blank line before the next block. 
   Blank lines used carefully help readability.

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +91,32 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+// TriggerWindowSums, much like WindowSums described above has an addition of configuring
+// a trigger here. SetDefault works fine. Other triggers such as SetAlways throws
+// pane decoding error.
+func TriggerWindowSums(s beam.Scope, sumPerKey func(beam.Scope, beam.PCollection) beam.PCollection) {
+	timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{4, 9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s))
+
+	windowSize := 3 * time.Second
+
+	validate := func(s beam.Scope, wfn *window.Fn, in beam.PCollection, expected ...interface{}) {
+		// Window the data.
+		windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: window.Always})
+		// To get the pane decoding error, change above statement to
+		// windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: window.Always})
+		// Perform the appropriate sum operation.
+		sums := sumPerKey(s, windowed)
+		// Drop back to Global windows, and drop the key otherwise passert.Equals doesn't work.
+		sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+		sums = beam.DropKey(s, sums)
+		passert.Equals(s, sums, expected...)
+	}
+
+	// Use fixed windows to divide the data into 3 chunks.
+	validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), timestampedData, 15, 15, 15)

Review comment:
       Doesn't the "Always" trigger happen on every element? Should we be actually expecting the sums, or the individual elements?  (I don't know what's correct here, outside of the default trigger.)

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"io"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+	FIRST     int = 0
+	ONE_INDEX int = 1
+	TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+	if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == typex.UNKNOWN {
+		return FIRST
+	} else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+		return ONE_INDEX
+	} else {
+		return TWO_INDEX
+	}
+}
+
+func timing(v typex.Timing) int {
+	if v == typex.EARLY {
+		return 0
+	} else if v == typex.ON_TIME {
+		return 1
+	} else if v == typex.LATE {
+		return 2
+	} else {
+		return 3
+	}
+}
+
+// EncodePane encodes a single byte.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+	// Encoding: typex.PaneInfo
+
+	pane := 0
+	if v.IsFirst {
+		pane |= 1
+	}
+	if v.IsLast {
+		pane |= 2
+	}
+	pane |= timing(v.Timing) << 2
+
+	switch chooseEncoding(v) {
+	case FIRST:
+		paneByte := []byte{byte(pane)}
+		w.Write(paneByte)
+	case ONE_INDEX:
+		paneByte := []byte{byte(pane | (ONE_INDEX)<<4)}
+		w.Write(paneByte)
+		EncodeVarInt(v.Index, w)
+	case TWO_INDEX:
+		paneByte := []byte{byte(pane | (TWO_INDEX)<<4)}
+		w.Write(paneByte)
+		EncodeVarInt(v.Index, w)
+		EncodeVarInt(v.NonSpeculativeIndex, w)
+	}
+	return nil
+}
+
+func encodingType(b byte) int64 {
+	return int64(b >> 4)
+}
+
+func NewPane(b byte) typex.PaneInfo {
+	pn := typex.PaneInfo{}
+	if b&0x01 == 1 {
+		pn.IsFirst = true
+	}
+	if b&0x02 == 2 {
+		pn.IsLast = true
+	}
+	switch int64((b >> 2) & 0x03) {
+	case 0:
+		pn.Timing = typex.EARLY
+	case 1:
+		pn.Timing = typex.ON_TIME
+	case 2:
+		pn.Timing = typex.LATE
+	case 3:
+		pn.Timing = typex.UNKNOWN
+	}
+
+	return pn
+}
+
+// DecodePane decodes a single byte.
+func DecodePane(r io.Reader) (typex.PaneInfo, error) {
+	// Decoding: typex.PaneInfo
+	var data [1]byte
+	if err := ioutilx.ReadNBufUnsafe(r, data[:]); err != nil { // NO_FIRING pane
+		return typex.PaneInfo{}, err
+	}
+	pn := NewPane(data[0] & 0x0f)
+	switch encodingType(data[0]) {

Review comment:
       Cleaning up this switch statement I'll leave as an excercise based on my other comments.

##########
File path: sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
##########
@@ -117,7 +118,7 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, er
 		// URL Query-escaped windowed _unnested_ value. It is read back in
 		// a nested context at runtime.
 		var buf bytes.Buffer
-		if err := exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()), window.SingleGlobalWindow, mtime.ZeroTimestamp, &buf); err != nil {
+		if err := exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()), window.SingleGlobalWindow, mtime.ZeroTimestamp, typex.PaneInfo{}, &buf); err != nil {

Review comment:
       Question: Is the zero value for your pane info type identical to the "No firing" pane we've been writing? If so, very nice!
   
   Optionally, also add a function to the typex package: typex.NoFiringPane() to document the intended semantic meaning of the type at the call sights like this.

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"io"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+	FIRST     int = 0
+	ONE_INDEX int = 1
+	TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+	if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == typex.UNKNOWN {
+		return FIRST
+	} else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+		return ONE_INDEX
+	} else {
+		return TWO_INDEX
+	}
+}
+
+func timing(v typex.Timing) int {
+	if v == typex.EARLY {
+		return 0
+	} else if v == typex.ON_TIME {
+		return 1
+	} else if v == typex.LATE {
+		return 2
+	} else {
+		return 3
+	}
+}
+
+// EncodePane encodes a single byte.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+	// Encoding: typex.PaneInfo
+
+	pane := 0
+	if v.IsFirst {
+		pane |= 1
+	}
+	if v.IsLast {
+		pane |= 2
+	}
+	pane |= timing(v.Timing) << 2
+
+	switch chooseEncoding(v) {
+	case FIRST:
+		paneByte := []byte{byte(pane)}
+		w.Write(paneByte)
+	case ONE_INDEX:
+		paneByte := []byte{byte(pane | (ONE_INDEX)<<4)}
+		w.Write(paneByte)
+		EncodeVarInt(v.Index, w)
+	case TWO_INDEX:
+		paneByte := []byte{byte(pane | (TWO_INDEX)<<4)}
+		w.Write(paneByte)
+		EncodeVarInt(v.Index, w)
+		EncodeVarInt(v.NonSpeculativeIndex, w)
+	}
+	return nil
+}
+
+func encodingType(b byte) int64 {
+	return int64(b >> 4)
+}
+
+func NewPane(b byte) typex.PaneInfo {
+	pn := typex.PaneInfo{}
+	if b&0x01 == 1 {
+		pn.IsFirst = true
+	}
+	if b&0x02 == 2 {
+		pn.IsLast = true
+	}
+	switch int64((b >> 2) & 0x03) {
+	case 0:
+		pn.Timing = typex.EARLY
+	case 1:
+		pn.Timing = typex.ON_TIME
+	case 2:
+		pn.Timing = typex.LATE
+	case 3:
+		pn.Timing = typex.UNKNOWN
+	}

Review comment:
       If you define those arrival conditions (early late etc)  as based on integers or uints with iota, you can replace this whole switch with: `pn.Timing = typex.Timing((b >> 2) & 0x03)`

##########
File path: sdks/go/pkg/beam/core/runtime/exec/coder.go
##########
@@ -1079,7 +1112,7 @@ func (*intervalWindowDecoder) DecodeSingle(r io.Reader) (typex.Window, error) {
 var paneNoFiring = []byte{0xf}

Review comment:
       If we aren't using this variable anywhere anymore we can remove it.

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"io"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+	FIRST     int = 0
+	ONE_INDEX int = 1
+	TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+	if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == typex.UNKNOWN {
+		return FIRST
+	} else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+		return ONE_INDEX
+	} else {
+		return TWO_INDEX
+	}

Review comment:
       As a result, we can probably just move these conditions into the encoder, rather than relating them through constants, and have a comment that explains their differences, which you're using as a the constant name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r683489530



##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,23 @@ type Window interface {
 	Equals(o Window) bool
 }
 
+type Timing string
+
+const (
+	EARLY   Timing = "EARLY"
+	ON_TIME Timing = "ON_TIME"
+	LATE    Timing = "LATE"
+	UNKNOWN Timing = "UNKNOWN"

Review comment:
       Got it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r687900874



##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +92,60 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+func validate(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr window.Trigger, m window.AccumulationMode, expected ...interface{}) {
+	windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: tr}, beam.AccumulationMode{Mode: m})
+	sums := stats.Sum(s, windowed)
+	sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+	passert.Equals(s, sums, expected...)
+}
+
+// TriggerDefault tests the default trigger which fires the pane after the end of the window
+func TriggerDefault(s beam.Scope) {
+	// create a teststream pipeline and get the pcollection
+	con := teststream.NewConfig()
+	con.AddElements(1000, 1.0, 2.0, 3.0)
+	con.AdvanceWatermark(11000)
+	con.AddElements(12000, 4.0, 5.0)
+	con.AdvanceWatermark(13000)
+
+	col := teststream.Create(s, con)
+	windowSize := 10 * time.Second
+	validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, window.Trigger{Kind: window.DefaultTrigger}, window.Accumulating, 6.0, 9.0)
+
+}
+
+// TriggerAlways tests the Always trigger, it is expected to receive every input value as the output.
+// It also return an extra empty pane. Not sure why it is so. It is only in the case of this trigger

Review comment:
       Got it fixed. It was because OnTimeBehavior was set to fire always. Now changed it to fire if non-empty. Will push that changes soon.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r681872693



##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +90,32 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+// TriggerWindowSums, much like WindowSums described above has an addition of configuring
+// a trigger here. SetDefault works fine. Other triggers such as SetAlways throws
+// pane decoding error.
+func TriggerWindowSums(s beam.Scope, sumPerKey func(beam.Scope, beam.PCollection) beam.PCollection) {

Review comment:
       Consider re-writing this test to make it easier to validate *triggers* rather than windows. Certainly re-use what you can, but you are allowed to write new code.

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger.go
##########
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+type TriggerType string

Review comment:
       Consider how we'd implement the more complicated triggers that have sub triggers. For example, AfterAll and AfterAny are missing their sub trigger configurations.

##########
File path: sdks/go/pkg/beam/pcollection.go
##########
@@ -49,7 +50,11 @@ func (p PCollection) IsValid() bool {
 }
 
 // TODO(herohde) 5/30/2017: add name for PCollections? Java supports it.
-// TODO(herohde) 5/30/2017: add windowing strategy and documentation.
+
+// WindowingStrategy returns the WindowingStrategy of PCollection.
+func (p PCollection) WindowingStrategy() *window.WindowingStrategy {
+	return p.n.WindowingStrategy()
+}

Review comment:
       We can probably leave out this method for now. It's easier to add something like this later than to remove it later.
   I'd still remove the TODO. We don't know what herohde's goal was with the TODO, and we've gone in our own design direction.

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger.go
##########
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+type TriggerType string
+
+const (
+	Default  TriggerType = "Trigger_Default_"
+	Always   TriggerType = "Trigger_Always_"
+	AfterAny TriggerType = "Trigger_AfterAny_"
+	AfterAll TriggerType = "Trigger_AfterAny_"

Review comment:
       Typo: duplicated as AfterAny instead of AfterAll.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-900457826


   Run Go PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r680249151



##########
File path: sdks/go/pkg/beam/pcollection.go
##########
@@ -49,7 +50,11 @@ func (p PCollection) IsValid() bool {
 }
 
 // TODO(herohde) 5/30/2017: add name for PCollections? Java supports it.
+
 // TODO(herohde) 5/30/2017: add windowing strategy and documentation.

Review comment:
       Since this becomes part of the public  API for PCollection, it should have a real doc comment
   
   `// WindowingStrategy returns the windowing strategy for the PCollection`
   
   

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +91,35 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+// TriggerWindowSums, much like WindowSums described above has an addition of configuring
+// a trigger here. SetDefault works fine. Other triggers such as SetAlways throws
+// pane decoding error.
+func TriggerWindowSums(s beam.Scope, sumPerKey func(beam.Scope, beam.PCollection) beam.PCollection) {
+	timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{4, 9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s))
+
+	windowSize := 3 * time.Second
+
+	validate := func(s beam.Scope, wfn *window.Fn, in beam.PCollection, expected ...interface{}) {
+		// Window the data.
+		windowed := beam.WindowInto(s, wfn, in)
+
+		// change below statement to: windowed.WindowingStrategy().SetAlways()
+		// to get the decoding error.
+		windowed.WindowingStrategy().SetDefault()

Review comment:
       OK. It pains me to suggest this, but this user experience leaves a little to be desired and we have a short window before the Go SDK exits experimental where we can make it better. 
   
   I propose we make a `beam.WindowIntoOption` interface type, and change `beam.WindowInto` to take a variadic parameter of those to configure the trigger and other windowing strategy properties. Then we can type assert and apply the options to the windowing strategy without much concern.
   
   This would be similar to how `beam.ParDo` has an option type https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/option.go#L25
   https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/pardo.go#L356
   
   My recommendation would be to make that into a separate change by itself, not defining any of the concrete `beam.WindowIntoOption` types and handling, and just changing the signature to take in `...options beam.WindowIntoOption` as the final parameter (variadic parameters must be the final one in a function signature).
   
   While this change would not be 100% backwards compatible, it would be source compatible for all reasonable uses of `beam.WindowInto`. The main place it would break is if a user were doing something like:
   
   `var myWindowInto func(s Scope, ws *window.Fn, col PCollection) PCollection = beam.WindowInto`
   
   which is unlikely and weird to begin with (it wouldn't serve anything beyond ensuring the signature never changes).
   
   Then we can rebase and get back to here and finish handling the implementation.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,24 +981,46 @@ func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) (
 	} else {
 		mergeStat = pipepb.MergeStatus_NON_MERGING
 	}
+	trigger, err := makeTrigger(w.Trigger)
+	if err != nil {
+		return nil, err
+	}
+
 	ws := &pipepb.WindowingStrategy{
 		WindowFn:         windowFn,
 		MergeStatus:      mergeStat,
 		AccumulationMode: pipepb.AccumulationMode_DISCARDING,
 		WindowCoderId:    windowCoderId,
-		Trigger: &pipepb.Trigger{
+		Trigger:          trigger,
+		OutputTime:       pipepb.OutputTime_END_OF_WINDOW,
+		ClosingBehavior:  pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+		AllowedLateness:  0,
+		OnTimeBehavior:   pipepb.OnTimeBehavior_FIRE_ALWAYS,
+	}
+	return ws, nil
+}
+func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {
+	switch t {
+	case window.Default:
+		return &pipepb.Trigger{
 			Trigger: &pipepb.Trigger_Default_{
 				Default: &pipepb.Trigger_Default{},
 			},
-		},
-		OutputTime:      pipepb.OutputTime_END_OF_WINDOW,
-		ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
-		AllowedLateness: 0,
-		OnTimeBehavior:  pipepb.OnTimeBehavior_FIRE_ALWAYS,
+		}, nil
+	case window.Always:
+		return &pipepb.Trigger{
+			Trigger: &pipepb.Trigger_Always_{
+				Always: &pipepb.Trigger_Always{},
+			},
+		}, nil
+	default:
+		return &pipepb.Trigger{
+			Trigger: &pipepb.Trigger_Default_{
+				Default: &pipepb.Trigger_Default{},
+			},
+		}, nil
 	}
-	return ws, nil
 }
-

Review comment:
       Missing blank line.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,24 +981,46 @@ func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) (
 	} else {
 		mergeStat = pipepb.MergeStatus_NON_MERGING
 	}
+	trigger, err := makeTrigger(w.Trigger)
+	if err != nil {
+		return nil, err
+	}
+
 	ws := &pipepb.WindowingStrategy{
 		WindowFn:         windowFn,
 		MergeStatus:      mergeStat,
 		AccumulationMode: pipepb.AccumulationMode_DISCARDING,
 		WindowCoderId:    windowCoderId,
-		Trigger: &pipepb.Trigger{
+		Trigger:          trigger,
+		OutputTime:       pipepb.OutputTime_END_OF_WINDOW,
+		ClosingBehavior:  pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+		AllowedLateness:  0,
+		OnTimeBehavior:   pipepb.OnTimeBehavior_FIRE_ALWAYS,
+	}
+	return ws, nil
+}
+func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {

Review comment:
       ```suggestion
   }
   
   func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,24 +981,46 @@ func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) (
 	} else {
 		mergeStat = pipepb.MergeStatus_NON_MERGING
 	}
+	trigger, err := makeTrigger(w.Trigger)
+	if err != nil {
+		return nil, err
+	}
+
 	ws := &pipepb.WindowingStrategy{
 		WindowFn:         windowFn,
 		MergeStatus:      mergeStat,
 		AccumulationMode: pipepb.AccumulationMode_DISCARDING,
 		WindowCoderId:    windowCoderId,
-		Trigger: &pipepb.Trigger{
+		Trigger:          trigger,
+		OutputTime:       pipepb.OutputTime_END_OF_WINDOW,
+		ClosingBehavior:  pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+		AllowedLateness:  0,
+		OnTimeBehavior:   pipepb.OnTimeBehavior_FIRE_ALWAYS,
+	}
+	return ws, nil
+}
+func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {

Review comment:
       Probably need to run `go fmt` over your changes, as for some reason newlines between function declarations seem to be going missing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck merged pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
lostluck merged pull request #15239:
URL: https://github.com/apache/beam/pull/15239


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-900457170


   Run Go Flink ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-900503568


   It's not clear what's causing the failure with flink in this case. It's almost certainly the same TestStream issues. Lets disable the tests on flink entirely for now (like they are for the other runners) and we'll investigate it properly from there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-900455108


   Run Go PostCommit
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse edited a comment on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse edited a comment on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-889509698


   I changed parts in marshalWindowingStrategy in translate.go to try out triggers. But did undo all before committing. I'll write a separate test doing this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-901466341


   Run Go PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r681123176



##########
File path: sdks/go/pkg/beam/core/graph/edge.go
##########
@@ -527,8 +527,8 @@ func NewImpulse(g *Graph, s *Scope, value []byte) *MultiEdge {
 }
 
 // NewWindowInto inserts a new WindowInto edge into the graph.
-func NewWindowInto(g *Graph, s *Scope, wfn *window.Fn, in *Node) *MultiEdge {
-	n := g.NewNode(in.Type(), &window.WindowingStrategy{Fn: wfn}, in.Bounded())
+func NewWindowInto(g *Graph, s *Scope, wfn *window.Fn, tr window.TriggerType, in *Node) *MultiEdge {

Review comment:
       NewWindowInto should just change to take in a `*window.WindowingStrategy` instead of adding more and more parameters to it. There are other modes to handle in the future and by populating the windowing strategy in the Beam package before it gets here, will save time and effort when adding features later.

##########
File path: sdks/go/pkg/beam/windowing.go
##########
@@ -21,21 +21,47 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
+type WindowIntoOption interface {
+	private()

Review comment:
       "private" isn't a good name for this blocking function, because beam.Option is already using it. You can test this out yourself by seeing that you can pass `WindowIntoOption`s as an Option into beam.ParDo.
   
   A better unexported method name would be `windowIntoOption()` to avoid this conflation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r681128710



##########
File path: sdks/go/pkg/beam/core/graph/edge.go
##########
@@ -527,8 +527,8 @@ func NewImpulse(g *Graph, s *Scope, value []byte) *MultiEdge {
 }
 
 // NewWindowInto inserts a new WindowInto edge into the graph.
-func NewWindowInto(g *Graph, s *Scope, wfn *window.Fn, in *Node) *MultiEdge {
-	n := g.NewNode(in.Type(), &window.WindowingStrategy{Fn: wfn}, in.Bounded())
+func NewWindowInto(g *Graph, s *Scope, wfn *window.Fn, tr window.TriggerType, in *Node) *MultiEdge {

Review comment:
       Make sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-892993803


   PTAL.
   
   Added pane helpers and changed all affecting function signatures/receivers. The error of `source decode failed  cause by ____` seems to have gone. I tested this the old way (TestTriggerWindowSums_GBK). Next step would be writing isolated trigger tests.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r683505310



##########
File path: sdks/go/pkg/beam/core/runtime/exec/coder.go
##########
@@ -1088,25 +1121,22 @@ func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t typex.Eve
 	if err := enc.Encode(ws, w); err != nil {
 		return err
 	}
-	_, err := w.Write(paneNoFiring)
+	err := coder.EncodePane(p, w)
 	return err
 }
 
 // DecodeWindowedValueHeader deserializes a windowed value header.
-func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window, typex.EventTime, error) {
+func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window, typex.EventTime, typex.PaneInfo, error) {
 	// Encoding: Timestamp, Window, Pane (header) + Element
-
+	pn := typex.PaneInfo{}

Review comment:
       This is great!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-895514779


   A lot of clean up is remaining in translate.go and possibly in strategy.go and trigger.go.
   Added windowing strategy params, trigger params. Tests for Working Triggers.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r691641045



##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -19,12 +19,12 @@ import (
 	"reflect"
 	"time"
 
-	"github.com/apache/beam/sdks/go/pkg/beam"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-901466123


   Run Go Flink ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse edited a comment on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse edited a comment on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-895514779


   A lot of clean up is remaining in translate.go and possibly in strategy.go, windowinto.go and trigger.go.
   Added windowing strategy params, trigger params. Tests for Working Triggers.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r679496274



##########
File path: sdks/go/pkg/beam/core/graph/window/strategy.go
##########
@@ -16,12 +16,16 @@
 // Package window contains window representation, windowing strategies and utilities.
 package window
 
+import (
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
 // WindowingStrategy defines the types of windowing used in a pipeline and contains
 // the data to support executing a windowing strategy.
 type WindowingStrategy struct {
 	Fn *Fn
-
 	// TODO(BEAM-3304): trigger support
+	Trigger *pipepb.Trigger

Review comment:
       As a rule, we avoid having users have access to, or deal with, the proto directly, including via any of the built up structs. The protos are an implementation detail, not an interface for users to use.
   
   This largely means building up SDK specific mechanisms that are then translated to the proto in graphx/translate.go. Essentially, the graph/** directories shouldn't be depending on the protos.
   
   That said, this is fine during development as Pane plumbing and such is being developed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r683482219



##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,23 @@ type Window interface {
 	Equals(o Window) bool
 }
 
+type Timing string

Review comment:
       Thanks for this! This is really helpful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r691542639



##########
File path: sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
##########
@@ -22,18 +22,18 @@ import (
 	"net/url"
 	"path"
 
-	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"

Review comment:
       Looks like these have been missed.

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -19,12 +19,12 @@ import (
 	"reflect"
 	"time"
 
-	"github.com/apache/beam/sdks/go/pkg/beam"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"

Review comment:
       Looks like these have been missed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-897196196


   PTAL. Basic triggers. Will add more tests in windowinto as they are similar to what we currently have.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-899712534


   PTAL. AfterAll and AfterAny Triggers are throwing Illegal Argument Exception. Working on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r687919886



##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +92,60 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+func validate(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr window.Trigger, m window.AccumulationMode, expected ...interface{}) {
+	windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: tr}, beam.AccumulationMode{Mode: m})
+	sums := stats.Sum(s, windowed)
+	sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+	passert.Equals(s, sums, expected...)
+}
+
+// TriggerDefault tests the default trigger which fires the pane after the end of the window
+func TriggerDefault(s beam.Scope) {
+	// create a teststream pipeline and get the pcollection
+	con := teststream.NewConfig()
+	con.AddElements(1000, 1.0, 2.0, 3.0)
+	con.AdvanceWatermark(11000)
+	con.AddElements(12000, 4.0, 5.0)
+	con.AdvanceWatermark(13000)
+
+	col := teststream.Create(s, con)
+	windowSize := 10 * time.Second
+	validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, window.Trigger{Kind: window.DefaultTrigger}, window.Accumulating, 6.0, 9.0)
+
+}
+
+// TriggerAlways tests the Always trigger, it is expected to receive every input value as the output.
+// It also return an extra empty pane. Not sure why it is so. It is only in the case of this trigger
+func TriggerAlways(s beam.Scope) {
+	con := teststream.NewConfig()
+	con.AddElements(1000, 1.0, 2.0, 3.0)
+	con.AdvanceWatermark(11000)
+	col := teststream.Create(s, con)
+	windowSize := 10 * time.Second
+
+	validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, window.Trigger{Kind: window.AlwaysTrigger}, window.Discarding, 1.0, 2.0, 3.0, 0.0)
+}
+
+// TriggerElementCount tests the ElementCount Trigger, it waits for atleast N elements to be ready
+// to fire an output pane
+func TriggerElementCount(s beam.Scope) {
+	// create a teststream pipeline and get the pcollection
+	con := teststream.NewConfig()
+	con.AddElements(1000, 1.0, 2.0, 3.0)
+	con.AdvanceWatermark(2000)
+	con.AddElements(6000, 4.0, 5.0)
+	con.AdvanceWatermark(10000)
+	con.AddElements(52000, 10.0)
+	con.AdvanceWatermark(53000)
+
+	col := teststream.Create(s, con)
+
+	// waits only for two elements to arrive and fires output after that and never fires that.
+	// For the trigger to fire every 2 elements, combine it with Repeat Trigger
+	tr := window.Trigger{Kind: window.ElementCountTrigger, ElementCount: 2}
+	windowed := beam.WindowInto(s, window.NewGlobalWindows(), col, beam.WindowTrigger{Name: tr}, beam.AccumulationMode{Mode: window.Discarding})
+	sums := stats.Sum(s, windowed)
+	sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+	passert.Count(s, sums, "total collections", 1)

Review comment:
       yes, you're right




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-900457170






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r681128710



##########
File path: sdks/go/pkg/beam/core/graph/edge.go
##########
@@ -527,8 +527,8 @@ func NewImpulse(g *Graph, s *Scope, value []byte) *MultiEdge {
 }
 
 // NewWindowInto inserts a new WindowInto edge into the graph.
-func NewWindowInto(g *Graph, s *Scope, wfn *window.Fn, in *Node) *MultiEdge {
-	n := g.NewNode(in.Type(), &window.WindowingStrategy{Fn: wfn}, in.Bounded())
+func NewWindowInto(g *Graph, s *Scope, wfn *window.Fn, tr window.TriggerType, in *Node) *MultiEdge {

Review comment:
       Makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r687272096



##########
File path: sdks/go/pkg/beam/core/graph/coder/coder.go
##########
@@ -188,6 +188,8 @@ const (
 	//
 	// TODO(BEAM-490): once this JIRA is done, this coder should become the new thing.
 	CoGBK Kind = "CoGBK"
+
+	PaneInfo Kind = "PaneInfo"

Review comment:
       Nit: I'd put this just below Timer in the above group, rather than isolated by itself down here.  Also feel free to have the short version be "PI" rather than the full "PaneInfo" to match the other "parts of encoded window infrastructure" convention we have.

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"io"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+// EncodePane encodes a typex.PaneInfo.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+	// Encoding: typex.PaneInfo
+
+	pane := 0
+	if v.IsFirst {
+		pane |= 1
+	}
+	if v.IsLast {
+		pane |= 2
+	}
+	pane |= v.Timing << 2
+
+	switch {
+	case (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == typex.PaneUnknown:

Review comment:
       Nit: I don't think it's necessary to pair the first two conditions with parenthesese, since it's all a disjunction (||) anyway.

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +92,60 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+func validate(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr window.Trigger, m window.AccumulationMode, expected ...interface{}) {
+	windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: tr}, beam.AccumulationMode{Mode: m})
+	sums := stats.Sum(s, windowed)
+	sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+	passert.Equals(s, sums, expected...)
+}
+
+// TriggerDefault tests the default trigger which fires the pane after the end of the window
+func TriggerDefault(s beam.Scope) {
+	// create a teststream pipeline and get the pcollection
+	con := teststream.NewConfig()
+	con.AddElements(1000, 1.0, 2.0, 3.0)
+	con.AdvanceWatermark(11000)
+	con.AddElements(12000, 4.0, 5.0)
+	con.AdvanceWatermark(13000)
+
+	col := teststream.Create(s, con)
+	windowSize := 10 * time.Second
+	validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, window.Trigger{Kind: window.DefaultTrigger}, window.Accumulating, 6.0, 9.0)
+
+}
+
+// TriggerAlways tests the Always trigger, it is expected to receive every input value as the output.
+// It also return an extra empty pane. Not sure why it is so. It is only in the case of this trigger
+func TriggerAlways(s beam.Scope) {
+	con := teststream.NewConfig()
+	con.AddElements(1000, 1.0, 2.0, 3.0)
+	con.AdvanceWatermark(11000)
+	col := teststream.Create(s, con)
+	windowSize := 10 * time.Second
+
+	validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, window.Trigger{Kind: window.AlwaysTrigger}, window.Discarding, 1.0, 2.0, 3.0, 0.0)
+}
+
+// TriggerElementCount tests the ElementCount Trigger, it waits for atleast N elements to be ready
+// to fire an output pane
+func TriggerElementCount(s beam.Scope) {
+	// create a teststream pipeline and get the pcollection
+	con := teststream.NewConfig()
+	con.AddElements(1000, 1.0, 2.0, 3.0)
+	con.AdvanceWatermark(2000)
+	con.AddElements(6000, 4.0, 5.0)
+	con.AdvanceWatermark(10000)
+	con.AddElements(52000, 10.0)
+	con.AdvanceWatermark(53000)
+
+	col := teststream.Create(s, con)
+
+	// waits only for two elements to arrive and fires output after that and never fires that.
+	// For the trigger to fire every 2 elements, combine it with Repeat Trigger
+	tr := window.Trigger{Kind: window.ElementCountTrigger, ElementCount: 2}
+	windowed := beam.WindowInto(s, window.NewGlobalWindows(), col, beam.WindowTrigger{Name: tr}, beam.AccumulationMode{Mode: window.Discarding})
+	sums := stats.Sum(s, windowed)
+	sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+	passert.Count(s, sums, "total collections", 1)

Review comment:
       I'm guessing we can't assert which two values we get, so that's why we check with on Count?

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +92,60 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+func validate(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr window.Trigger, m window.AccumulationMode, expected ...interface{}) {
+	windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: tr}, beam.AccumulationMode{Mode: m})
+	sums := stats.Sum(s, windowed)
+	sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+	passert.Equals(s, sums, expected...)
+}
+
+// TriggerDefault tests the default trigger which fires the pane after the end of the window
+func TriggerDefault(s beam.Scope) {
+	// create a teststream pipeline and get the pcollection
+	con := teststream.NewConfig()
+	con.AddElements(1000, 1.0, 2.0, 3.0)
+	con.AdvanceWatermark(11000)
+	con.AddElements(12000, 4.0, 5.0)
+	con.AdvanceWatermark(13000)
+
+	col := teststream.Create(s, con)
+	windowSize := 10 * time.Second
+	validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, window.Trigger{Kind: window.DefaultTrigger}, window.Accumulating, 6.0, 9.0)
+
+}
+
+// TriggerAlways tests the Always trigger, it is expected to receive every input value as the output.
+// It also return an extra empty pane. Not sure why it is so. It is only in the case of this trigger

Review comment:
       Consider asking about this extra empty pane. Is it documented? Is it something wrong with how we're specifying things?

##########
File path: sdks/go/pkg/beam/core/graph/window/strategy.go
##########
@@ -16,12 +16,22 @@
 // Package window contains window representation, windowing strategies and utilities.
 package window
 
+type AccumulationMode string
+
+const (
+	Unspecified  AccumulationMode = "AccumulationMode_UNSPECIFIED"
+	Discarding   AccumulationMode = "AccumulationMode_DISCARDING"
+	Accumulating AccumulationMode = "AccumulationMode_ACCUMULATING"
+	Retracting   AccumulationMode = "AccumulationMode_RETRACTING"
+)
+
 // WindowingStrategy defines the types of windowing used in a pipeline and contains
 // the data to support executing a windowing strategy.
 type WindowingStrategy struct {
-	Fn *Fn
-
-	// TODO(BEAM-3304): trigger support
+	Fn               *Fn
+	Trigger          Trigger
+	AccumulationMode AccumulationMode
+	AllowedLateness  int

Review comment:
       Consider adding a comment here that it's expected to be in milliseconds or use a time.Duration instead, and convert to milliseconds later. Explicit types are friends!

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,22 +981,114 @@ func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) (
 	} else {
 		mergeStat = pipepb.MergeStatus_NON_MERGING
 	}
+	trigger := makeTrigger(w.Trigger)
+	accMode := makeAccumulationMode(w.AccumulationMode)
 	ws := &pipepb.WindowingStrategy{
 		WindowFn:         windowFn,
 		MergeStatus:      mergeStat,
-		AccumulationMode: pipepb.AccumulationMode_DISCARDING,
 		WindowCoderId:    windowCoderId,
-		Trigger: &pipepb.Trigger{
+		Trigger:          trigger,
+		AccumulationMode: accMode,
+		OutputTime:       pipepb.OutputTime_EARLIEST_IN_PANE,

Review comment:
       This changed from OutputTime_END_OF_WINDOW to this. We probably don't want that right now.
   
   

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +92,60 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+func validate(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr window.Trigger, m window.AccumulationMode, expected ...interface{}) {
+	windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: tr}, beam.AccumulationMode{Mode: m})
+	sums := stats.Sum(s, windowed)
+	sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+	passert.Equals(s, sums, expected...)
+}
+
+// TriggerDefault tests the default trigger which fires the pane after the end of the window
+func TriggerDefault(s beam.Scope) {
+	// create a teststream pipeline and get the pcollection
+	con := teststream.NewConfig()
+	con.AddElements(1000, 1.0, 2.0, 3.0)
+	con.AdvanceWatermark(11000)
+	con.AddElements(12000, 4.0, 5.0)
+	con.AdvanceWatermark(13000)
+
+	col := teststream.Create(s, con)
+	windowSize := 10 * time.Second
+	validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, window.Trigger{Kind: window.DefaultTrigger}, window.Accumulating, 6.0, 9.0)
+

Review comment:
       rm spare blank line.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,22 +981,114 @@ func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) (
 	} else {
 		mergeStat = pipepb.MergeStatus_NON_MERGING
 	}
+	trigger := makeTrigger(w.Trigger)
+	accMode := makeAccumulationMode(w.AccumulationMode)
 	ws := &pipepb.WindowingStrategy{
 		WindowFn:         windowFn,
 		MergeStatus:      mergeStat,
-		AccumulationMode: pipepb.AccumulationMode_DISCARDING,
 		WindowCoderId:    windowCoderId,
-		Trigger: &pipepb.Trigger{
+		Trigger:          trigger,
+		AccumulationMode: accMode,
+		OutputTime:       pipepb.OutputTime_EARLIEST_IN_PANE,
+		ClosingBehavior:  pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+		AllowedLateness:  10,

Review comment:
       This should default to 0, unless we're pulling it from the passed in strategy.

##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,19 @@ type Window interface {
 	Equals(o Window) bool
 }
 
+const (
+	PaneEarly   int = 0 // EARLY
+	PaneOnTime  int = 1 // ON_TIME
+	PaneLate    int = 2 // LATE
+	PaneUnknown int = 3 // UNKNOWN
+)
+
+type PaneInfo struct {
+	Timing                     int

Review comment:
       As much as it's probably extra, we don't want this to be an int in type, as then users might try to manipulate it as an int.
   Instead add a new type `type PaneTiming byte`  (since we really won't need more than that presently, or likely, ever) then the above enum declaration can be
   
   ```
   const (
   	PaneEarly   PaneTiming = 0 
   	PaneOnTime  PaneTiming = 1 
   	PaneLate    PaneTiming = 2 
   	PaneUnknown PaneTiming = 3 
   )
   ```
   (Since we have specific definitions, we can't lean on an iota, even though it comes out the same. We want them to match their defined meaning in the coder definition).
   
   The net result is that in order to use it in math/etc, a user would need to explicitly type cast to a numeric value. This avoids being able to misuse the enum type as a value rather than symbolically which is all it needs to have. 
   
   This is necessary since the PaneInfo will eventually exposed to user DoFns, and it needs to be hard to misuse, which we're doing with the type system.
   
   

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"io"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+// EncodePane encodes a typex.PaneInfo.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+	// Encoding: typex.PaneInfo
+
+	pane := 0

Review comment:
       It is probably worth converting this to a byte(0), or uint8(0) early, to avoid errors sooner. It also telegraphs the up coming bitwise operations.

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"io"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+// EncodePane encodes a typex.PaneInfo.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+	// Encoding: typex.PaneInfo
+
+	pane := 0
+	if v.IsFirst {
+		pane |= 1
+	}
+	if v.IsLast {
+		pane |= 2

Review comment:
       Minor Nit: Please use the hex notation for the bitwise operations to make it much clearer to readers that the bits are significant. In this case the 0x01 and 0x02 will be unambiguous what we're doing with the byte layout.
   
   Yes, more verbose, but also unambiguous.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,22 +981,114 @@ func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) (
 	} else {
 		mergeStat = pipepb.MergeStatus_NON_MERGING
 	}
+	trigger := makeTrigger(w.Trigger)
+	accMode := makeAccumulationMode(w.AccumulationMode)
 	ws := &pipepb.WindowingStrategy{
 		WindowFn:         windowFn,
 		MergeStatus:      mergeStat,
-		AccumulationMode: pipepb.AccumulationMode_DISCARDING,
 		WindowCoderId:    windowCoderId,
-		Trigger: &pipepb.Trigger{
+		Trigger:          trigger,
+		AccumulationMode: accMode,
+		OutputTime:       pipepb.OutputTime_EARLIEST_IN_PANE,
+		ClosingBehavior:  pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+		AllowedLateness:  10,
+		OnTimeBehavior:   pipepb.OnTimeBehavior_FIRE_ALWAYS,
+		EnvironmentId:    "",

Review comment:
       We can also keep this unset.

##########
File path: sdks/go/test/integration/primitives/windowinto_test.go
##########
@@ -36,3 +36,24 @@ func TestWindowSums_GBK(t *testing.T) {
 	WindowSums_GBK(s)
 	ptest.RunAndValidate(t, p)
 }
+
+func TestTriggerDefault(t *testing.T) {
+	integration.CheckFilters(t)

Review comment:
       This has been mentioned elsewhere, since we're using TestStream here, we need to filter these tests out from everything but the Flink runner. Fortunately, we can use regex and just cover the prefix "TestTrigger"

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger.go
##########
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+type Trigger struct {
+	Kind         string
+	SubTriggers  []Trigger
+	Delay        int64

Review comment:
       Same comment here about what the int64 type is representing, or use a time.Duration

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,22 +981,114 @@ func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) (
 	} else {
 		mergeStat = pipepb.MergeStatus_NON_MERGING
 	}
+	trigger := makeTrigger(w.Trigger)
+	accMode := makeAccumulationMode(w.AccumulationMode)

Review comment:
       We can inline these calls to the proto field assignments instead of assigning to variables first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-897835537


   Resolved the comments and added a test for Repeat Trigger.
   PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r681196854



##########
File path: sdks/go/pkg/beam/windowing.go
##########
@@ -21,21 +21,47 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
+type WindowIntoOption interface {
+	windowIntoOption()
+}
+
+type WindowTrigger struct {
+	Name window.TriggerType
+}
+
+func (t WindowTrigger) windowIntoOption() {}
+
+func (t WindowTrigger) GetName() window.TriggerType {
+	return t.Name
+}
+
 // WindowInto applies the windowing strategy to each element.
-func WindowInto(s Scope, ws *window.Fn, col PCollection) PCollection {
-	return Must(TryWindowInto(s, ws, col))
+func WindowInto(s Scope, ws *window.Fn, col PCollection, opts ...WindowIntoOption) PCollection {
+	return Must(TryWindowInto(s, ws, col, opts...))
 }
 
 // TryWindowInto attempts to insert a WindowInto transform.
-func TryWindowInto(s Scope, ws *window.Fn, col PCollection) (PCollection, error) {
+func TryWindowInto(s Scope, ws *window.Fn, col PCollection, opts ...WindowIntoOption) (PCollection, error) {
 	if !s.IsValid() {
 		return PCollection{}, errors.New("invalid scope")
 	}
 	if !col.IsValid() {
 		return PCollection{}, errors.New("invalid input pcollection")
 	}
+	var edge *graph.MultiEdge
+	for _, opt := range opts {

Review comment:
       Consider just creating a windowing strategy before iterating over the ops with all the reasonable defaults set, and then update that strategy with the whatever is in options. Then we don't end up with a combinatorial explosion when other options are added or with the awkward "if there are no options, do this" length check written below.

##########
File path: sdks/go/pkg/beam/windowing.go
##########
@@ -21,21 +21,47 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
+type WindowIntoOption interface {
+	windowIntoOption()
+}
+
+type WindowTrigger struct {
+	Name window.TriggerType
+}
+
+func (t WindowTrigger) windowIntoOption() {}
+
+func (t WindowTrigger) GetName() window.TriggerType {

Review comment:
       There's no need for a GetName function. As a rule, don't add "Get" to a method that just returns a field.
   See https://golang.org/doc/effective_go#Getters
   

##########
File path: sdks/go/pkg/beam/windowing.go
##########
@@ -21,21 +21,47 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
+type WindowIntoOption interface {
+	windowIntoOption()
+}
+
+type WindowTrigger struct {
+	Name window.TriggerType
+}
+
+func (t WindowTrigger) windowIntoOption() {}
+
+func (t WindowTrigger) GetName() window.TriggerType {
+	return t.Name
+}
+
 // WindowInto applies the windowing strategy to each element.
-func WindowInto(s Scope, ws *window.Fn, col PCollection) PCollection {
-	return Must(TryWindowInto(s, ws, col))
+func WindowInto(s Scope, ws *window.Fn, col PCollection, opts ...WindowIntoOption) PCollection {
+	return Must(TryWindowInto(s, ws, col, opts...))
 }
 
 // TryWindowInto attempts to insert a WindowInto transform.
-func TryWindowInto(s Scope, ws *window.Fn, col PCollection) (PCollection, error) {
+func TryWindowInto(s Scope, ws *window.Fn, col PCollection, opts ...WindowIntoOption) (PCollection, error) {
 	if !s.IsValid() {
 		return PCollection{}, errors.New("invalid scope")
 	}
 	if !col.IsValid() {
 		return PCollection{}, errors.New("invalid input pcollection")
 	}
+	var edge *graph.MultiEdge
+	for _, opt := range opts {
+		switch opt.(type) {
+		case WindowTrigger:
+			edge = graph.NewWindowInto(s.real, s.scope, &window.WindowingStrategy{Fn: ws, Trigger: opt.(WindowTrigger).GetName()}, col.n)

Review comment:
       Consider accessing the trigger kind directly with `.Name` without a method instead, and remove the method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on pull request #15239:
URL: https://github.com/apache/beam/pull/15239#issuecomment-900455108


   Run Go PostCommit
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15239: [BEAM-3304] Go triggering support

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r687898221



##########
File path: sdks/go/test/integration/integration.go
##########
@@ -69,6 +69,8 @@ var directFilters = []string{
 var portableFilters = []string{
 	// The portable runner does not support the TestStream primitive
 	"TestTestStream.*",
+	// The trigger tests uses TestStream
+	"TestTrigger.*",

Review comment:
       You'll also need to add this to the samza, spark, direct, and dataflow tests. Basically anywhere TestStream is being filteed out right now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org