You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/11 23:27:43 UTC

[GitHub] [beam] riteshghorse opened a new pull request #15952: [BEAM-3304] Refactor trigger API

riteshghorse opened a new pull request #15952:
URL: https://github.com/apache/beam/pull/15952


   Refactor existing trigger API in Go SDK. Converted the `trigger.Trigger` from struct to interface. 
   Thanks @lostluck for all the suggestions!
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on pull request #15952: [BEAM-3304] Refactor trigger API

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on pull request #15952:
URL: https://github.com/apache/beam/pull/15952#issuecomment-967229035


   Run Go PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse edited a comment on pull request #15952: [BEAM-3304] Refactor trigger API

Posted by GitBox <gi...@apache.org>.
riteshghorse edited a comment on pull request #15952:
URL: https://github.com/apache/beam/pull/15952#issuecomment-966698896


   Looks like the commits from previous work followed. But did merge master at the end.
   R: @lostluck 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on a change in pull request #15952: [BEAM-3304] Refactor trigger API

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on a change in pull request #15952:
URL: https://github.com/apache/beam/pull/15952#discussion_r748369532



##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
 		// TODO: Change to call UnixMilli() once we move to only supporting a go version > 1.17.
 		offsetMillis = offset.Unix()*1e3 + int64(offset.Nanosecond())/1e6
 	}
-	tr.TimestampTransforms = append(tr.TimestampTransforms, AlignToTransform{
+	tr.timestampTransforms = append(tr.timestampTransforms, AlignToTransform{
 		Period: int64(period / time.Millisecond),
 		Offset: offsetMillis,
 	})
 	return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+	subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+	return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {
-	return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+	return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late firing.
+type AfterEndOfWindowTrigger struct {
+	earlyFiring Trigger
+	lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {

Review comment:
       Makes sense.

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -65,59 +119,13 @@ type AlignToTransform struct {
 
 func (AlignToTransform) timestampTransform() {}
 
-const (
-	DefaultTrigger                         string = "Trigger_Default_"
-	AlwaysTrigger                          string = "Trigger_Always_"
-	AfterAnyTrigger                        string = "Trigger_AfterAny_"
-	AfterAllTrigger                        string = "Trigger_AfterAll_"
-	AfterProcessingTimeTrigger             string = "Trigger_AfterProcessing_Time_"
-	ElementCountTrigger                    string = "Trigger_ElementCount_"
-	AfterEndOfWindowTrigger                string = "Trigger_AfterEndOfWindow_"
-	RepeatTrigger                          string = "Trigger_Repeat_"
-	OrFinallyTrigger                       string = "Trigger_OrFinally_"
-	NeverTrigger                           string = "Trigger_Never_"
-	AfterSynchronizedProcessingTimeTrigger string = "Trigger_AfterSynchronizedProcessingTime_"
-)
-
-// Default constructs a default trigger that fires once after the end of window.
-// Late Data is discarded.
-func Default() Trigger {
-	return Trigger{Kind: DefaultTrigger}
-}
-
-// Always constructs a trigger that fires immediately
-// whenever an element is received.
-//
-// Equivalent to trigger.Repeat(trigger.AfterCount(1))
-func Always() Trigger {
-	return Trigger{Kind: AlwaysTrigger}
-}
-
-// AfterCount constructs a trigger that fires after
-// at least `count` number of elements are processed.
-func AfterCount(count int32) Trigger {
-	return Trigger{Kind: ElementCountTrigger, ElementCount: count}
-}
-
-// AfterProcessingTime constructs a trigger that fires relative to
-// when input first arrives.
-//
-// Must be configured with calls to PlusDelay, or AlignedTo. May be
-// configured with additional delay.
-func AfterProcessingTime() Trigger {
-	return Trigger{Kind: AfterProcessingTimeTrigger}
-}
-
 // PlusDelay configures an AfterProcessingTime trigger to fire after a specified delay,
 // no smaller than a millisecond.
-func (tr Trigger) PlusDelay(delay time.Duration) Trigger {
-	if tr.Kind != AfterProcessingTimeTrigger {
-		panic(fmt.Errorf("can't apply processing delay to %s, want: AfterProcessingTimeTrigger", tr.Kind))
-	}
+func (tr *AfterProcessingTimeTrigger) PlusDelay(delay time.Duration) Trigger {

Review comment:
       Got it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15952: [BEAM-3304] Refactor trigger API

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15952:
URL: https://github.com/apache/beam/pull/15952#discussion_r748453793



##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,116 @@ func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
 		// TODO: Change to call UnixMilli() once we move to only supporting a go version > 1.17.
 		offsetMillis = offset.Unix()*1e3 + int64(offset.Nanosecond())/1e6
 	}
-	tr.TimestampTransforms = append(tr.TimestampTransforms, AlignToTransform{
+	tr.timestampTransforms = append(tr.timestampTransforms, AlignToTransform{
 		Period: int64(period / time.Millisecond),
 		Offset: offsetMillis,
 	})
 	return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+	subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+	return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
-func Repeat(tr Trigger) Trigger {
-	return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+func Repeat(tr Trigger) *RepeatTrigger {
+	return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late firing.
+type AfterEndOfWindowTrigger struct {
+	earlyFiring Trigger
+	lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+	return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+	return t.lateFiring
 }
 
 // AfterEndOfWindow constructs a trigger that is configurable for early firing
 // (before the end of window) and late firing (after the end of window).
 //
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
+// Must call EarlyFiring or LateFiring method on this trigger at the time of setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
 	defaultEarly := Default()
-	return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger: &defaultEarly, LateTrigger: nil}
+	return &AfterEndOfWindowTrigger{earlyFiring: defaultEarly, lateFiring: nil}
 }
 
 // EarlyFiring configures an AfterEndOfWindow trigger with an implicitly
 // repeated trigger that applies before the end of the window.
-func (tr Trigger) EarlyFiring(early Trigger) Trigger {
-	if tr.Kind != AfterEndOfWindowTrigger {
-		panic(fmt.Errorf("can't apply early firing to %s, want: AfterEndOfWindowTrigger", tr.Kind))
-	}
-	tr.EarlyTrigger = &early
+func (tr *AfterEndOfWindowTrigger) EarlyFiring(early Trigger) *AfterEndOfWindowTrigger {
+	tr.earlyFiring = early
 	return tr
 }
 
 // LateFiring configures an AfterEndOfWindow trigger with an implicitly
 // repeated trigger that applies after the end of the window.
 //
 // Not setting a late firing trigger means elements are discarded.
-func (tr Trigger) LateFiring(late Trigger) Trigger {
-	if tr.Kind != AfterEndOfWindowTrigger {
-		panic(fmt.Errorf("can't apply late firing to %s, want: AfterEndOfWindowTrigger", tr.Kind))
-	}
-	tr.LateTrigger = &late
+func (tr *AfterEndOfWindowTrigger) LateFiring(late Trigger) *AfterEndOfWindowTrigger {
+	tr.lateFiring = late
 	return tr
 }
+
+// NYI(BEAM-3304). Intended for framework use only.
+// AfterAnyTrigger fires after any of sub-trigger fires.

Review comment:
       Go doc comments on exported types and functions *must* start with the identifier, so these lines are reversed.
   
   ```suggestion
   // AfterAnyTrigger fires after any of sub-trigger fires.
   // NYI(BEAM-3304). Intended for framework use only.
   ```
   
   Here and below.

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,116 @@ func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
 		// TODO: Change to call UnixMilli() once we move to only supporting a go version > 1.17.
 		offsetMillis = offset.Unix()*1e3 + int64(offset.Nanosecond())/1e6
 	}
-	tr.TimestampTransforms = append(tr.TimestampTransforms, AlignToTransform{
+	tr.timestampTransforms = append(tr.timestampTransforms, AlignToTransform{
 		Period: int64(period / time.Millisecond),
 		Offset: offsetMillis,
 	})
 	return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+	subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+	return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
-func Repeat(tr Trigger) Trigger {
-	return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+func Repeat(tr Trigger) *RepeatTrigger {
+	return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late firing.
+type AfterEndOfWindowTrigger struct {
+	earlyFiring Trigger
+	lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+	return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+	return t.lateFiring
 }
 
 // AfterEndOfWindow constructs a trigger that is configurable for early firing
 // (before the end of window) and late firing (after the end of window).
 //
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
+// Must call EarlyFiring or LateFiring method on this trigger at the time of setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
 	defaultEarly := Default()
-	return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger: &defaultEarly, LateTrigger: nil}
+	return &AfterEndOfWindowTrigger{earlyFiring: defaultEarly, lateFiring: nil}
 }
 
 // EarlyFiring configures an AfterEndOfWindow trigger with an implicitly
 // repeated trigger that applies before the end of the window.
-func (tr Trigger) EarlyFiring(early Trigger) Trigger {
-	if tr.Kind != AfterEndOfWindowTrigger {
-		panic(fmt.Errorf("can't apply early firing to %s, want: AfterEndOfWindowTrigger", tr.Kind))
-	}
-	tr.EarlyTrigger = &early
+func (tr *AfterEndOfWindowTrigger) EarlyFiring(early Trigger) *AfterEndOfWindowTrigger {
+	tr.earlyFiring = early
 	return tr
 }
 
 // LateFiring configures an AfterEndOfWindow trigger with an implicitly
 // repeated trigger that applies after the end of the window.
 //
 // Not setting a late firing trigger means elements are discarded.
-func (tr Trigger) LateFiring(late Trigger) Trigger {
-	if tr.Kind != AfterEndOfWindowTrigger {
-		panic(fmt.Errorf("can't apply late firing to %s, want: AfterEndOfWindowTrigger", tr.Kind))
-	}
-	tr.LateTrigger = &late
+func (tr *AfterEndOfWindowTrigger) LateFiring(late Trigger) *AfterEndOfWindowTrigger {
+	tr.lateFiring = late
 	return tr
 }
+
+// NYI(BEAM-3304). Intended for framework use only.
+// AfterAnyTrigger fires after any of sub-trigger fires.
+type AfterAnyTrigger struct {
+	subTriggers []Trigger
+}
+
+func (t AfterAnyTrigger) trigger() {}
+
+// SubTriggers returns the subTriggers.

Review comment:
       Please make your use/spelling of "subtriggers" to be consistent. The method name is fine, but we should change the comment to something like
   ```suggestion
   // SubTriggers returns the component triggers.
   ```
   
   This avoids having to figure out "subtriggers" or "sub-triggers" or "sub triggers", which isn't a term used by the programming guide.  https://beam.apache.org/documentation/programming-guide/#composite-triggers

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,116 @@ func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
 		// TODO: Change to call UnixMilli() once we move to only supporting a go version > 1.17.
 		offsetMillis = offset.Unix()*1e3 + int64(offset.Nanosecond())/1e6
 	}
-	tr.TimestampTransforms = append(tr.TimestampTransforms, AlignToTransform{
+	tr.timestampTransforms = append(tr.timestampTransforms, AlignToTransform{
 		Period: int64(period / time.Millisecond),
 		Offset: offsetMillis,
 	})
 	return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+	subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+	return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
-func Repeat(tr Trigger) Trigger {
-	return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+func Repeat(tr Trigger) *RepeatTrigger {
+	return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late firing.
+type AfterEndOfWindowTrigger struct {
+	earlyFiring Trigger
+	lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+	return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {

Review comment:
       WRT method names, we can drop the `Trigger` suffix here. `Early()` and `Late()` are fine.

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,116 @@ func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
 		// TODO: Change to call UnixMilli() once we move to only supporting a go version > 1.17.
 		offsetMillis = offset.Unix()*1e3 + int64(offset.Nanosecond())/1e6
 	}
-	tr.TimestampTransforms = append(tr.TimestampTransforms, AlignToTransform{
+	tr.timestampTransforms = append(tr.timestampTransforms, AlignToTransform{
 		Period: int64(period / time.Millisecond),
 		Offset: offsetMillis,
 	})
 	return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+	subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+	return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
-func Repeat(tr Trigger) Trigger {
-	return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+func Repeat(tr Trigger) *RepeatTrigger {
+	return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late firing.
+type AfterEndOfWindowTrigger struct {
+	earlyFiring Trigger
+	lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+	return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+	return t.lateFiring
 }
 
 // AfterEndOfWindow constructs a trigger that is configurable for early firing
 // (before the end of window) and late firing (after the end of window).
 //
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
+// Must call EarlyFiring or LateFiring method on this trigger at the time of setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
 	defaultEarly := Default()
-	return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger: &defaultEarly, LateTrigger: nil}
+	return &AfterEndOfWindowTrigger{earlyFiring: defaultEarly, lateFiring: nil}
 }
 
 // EarlyFiring configures an AfterEndOfWindow trigger with an implicitly
 // repeated trigger that applies before the end of the window.
-func (tr Trigger) EarlyFiring(early Trigger) Trigger {
-	if tr.Kind != AfterEndOfWindowTrigger {
-		panic(fmt.Errorf("can't apply early firing to %s, want: AfterEndOfWindowTrigger", tr.Kind))
-	}
-	tr.EarlyTrigger = &early
+func (tr *AfterEndOfWindowTrigger) EarlyFiring(early Trigger) *AfterEndOfWindowTrigger {
+	tr.earlyFiring = early
 	return tr
 }
 
 // LateFiring configures an AfterEndOfWindow trigger with an implicitly
 // repeated trigger that applies after the end of the window.
 //
 // Not setting a late firing trigger means elements are discarded.
-func (tr Trigger) LateFiring(late Trigger) Trigger {
-	if tr.Kind != AfterEndOfWindowTrigger {
-		panic(fmt.Errorf("can't apply late firing to %s, want: AfterEndOfWindowTrigger", tr.Kind))
-	}
-	tr.LateTrigger = &late
+func (tr *AfterEndOfWindowTrigger) LateFiring(late Trigger) *AfterEndOfWindowTrigger {
+	tr.lateFiring = late
 	return tr
 }
+
+// NYI(BEAM-3304). Intended for framework use only.
+// AfterAnyTrigger fires after any of sub-trigger fires.
+type AfterAnyTrigger struct {
+	subTriggers []Trigger

Review comment:
       Don't camelcase the T here. just use subtriggers, which is what the proto uses anyway. Similarly for the other  
   subtrigger fields.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on a change in pull request #15952: [BEAM-3304] Refactor trigger API

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on a change in pull request #15952:
URL: https://github.com/apache/beam/pull/15952#discussion_r748368847



##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
 		// TODO: Change to call UnixMilli() once we move to only supporting a go version > 1.17.
 		offsetMillis = offset.Unix()*1e3 + int64(offset.Nanosecond())/1e6
 	}
-	tr.TimestampTransforms = append(tr.TimestampTransforms, AlignToTransform{
+	tr.timestampTransforms = append(tr.timestampTransforms, AlignToTransform{
 		Period: int64(period / time.Millisecond),
 		Offset: offsetMillis,
 	})
 	return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+	subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+	return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {
-	return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+	return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late firing.
+type AfterEndOfWindowTrigger struct {
+	earlyFiring Trigger
+	lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+	return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+	return t.lateFiring
 }
 
 // AfterEndOfWindow constructs a trigger that is configurable for early firing
 // (before the end of window) and late firing (after the end of window).
 //
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
-	defaultEarly := Default()
-	return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger: &defaultEarly, LateTrigger: nil}
+// Must call EarlyFiring or LateFiring method on this trigger at the time of setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
+	return &AfterEndOfWindowTrigger{earlyFiring: nil, lateFiring: nil}

Review comment:
       > Alternatively (as a separate PR), we should probably have a validation function somewhere for triggers when they're set to make sure they aren't going to lose data or similar:
   > 
   > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/trigger.py
   
   Noted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck merged pull request #15952: [BEAM-3304] Refactor trigger API

Posted by GitBox <gi...@apache.org>.
lostluck merged pull request #15952:
URL: https://github.com/apache/beam/pull/15952


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on a change in pull request #15952: [BEAM-3304] Refactor trigger API

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on a change in pull request #15952:
URL: https://github.com/apache/beam/pull/15952#discussion_r748368647



##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
 		// TODO: Change to call UnixMilli() once we move to only supporting a go version > 1.17.
 		offsetMillis = offset.Unix()*1e3 + int64(offset.Nanosecond())/1e6
 	}
-	tr.TimestampTransforms = append(tr.TimestampTransforms, AlignToTransform{
+	tr.timestampTransforms = append(tr.timestampTransforms, AlignToTransform{
 		Period: int64(period / time.Millisecond),
 		Offset: offsetMillis,
 	})
 	return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+	subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+	return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {
-	return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+	return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late firing.
+type AfterEndOfWindowTrigger struct {
+	earlyFiring Trigger
+	lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+	return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+	return t.lateFiring
 }
 
 // AfterEndOfWindow constructs a trigger that is configurable for early firing
 // (before the end of window) and late firing (after the end of window).
 //
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
-	defaultEarly := Default()
-	return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger: &defaultEarly, LateTrigger: nil}
+// Must call EarlyFiring or LateFiring method on this trigger at the time of setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
+	return &AfterEndOfWindowTrigger{earlyFiring: nil, lateFiring: nil}

Review comment:
       But in addressing the other comments, I believe we need the default value here and return concrete types from other trigger construct functions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on pull request #15952: [BEAM-3304] Refactor trigger API

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on pull request #15952:
URL: https://github.com/apache/beam/pull/15952#issuecomment-967321786


   Run Go PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on a change in pull request #15952: [BEAM-3304] Refactor trigger API

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on a change in pull request #15952:
URL: https://github.com/apache/beam/pull/15952#discussion_r748373269



##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
 		// TODO: Change to call UnixMilli() once we move to only supporting a go version > 1.17.
 		offsetMillis = offset.Unix()*1e3 + int64(offset.Nanosecond())/1e6
 	}
-	tr.TimestampTransforms = append(tr.TimestampTransforms, AlignToTransform{
+	tr.timestampTransforms = append(tr.timestampTransforms, AlignToTransform{
 		Period: int64(period / time.Millisecond),
 		Offset: offsetMillis,
 	})
 	return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+	subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+	return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {

Review comment:
       We don't know the type of trigger set by the user in this case. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on pull request #15952: [BEAM-3304] Refactor trigger API

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on pull request #15952:
URL: https://github.com/apache/beam/pull/15952#issuecomment-966698896


   Looks like the commits from previous work followed. But did merged master at the end.
   R: @lostluck 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on pull request #15952: [BEAM-3304] Refactor trigger API

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on pull request #15952:
URL: https://github.com/apache/beam/pull/15952#issuecomment-966698555


   Run Go PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15952: [BEAM-3304] Refactor trigger API

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15952:
URL: https://github.com/apache/beam/pull/15952#discussion_r747922760



##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -65,59 +119,13 @@ type AlignToTransform struct {
 
 func (AlignToTransform) timestampTransform() {}
 
-const (
-	DefaultTrigger                         string = "Trigger_Default_"
-	AlwaysTrigger                          string = "Trigger_Always_"
-	AfterAnyTrigger                        string = "Trigger_AfterAny_"
-	AfterAllTrigger                        string = "Trigger_AfterAll_"
-	AfterProcessingTimeTrigger             string = "Trigger_AfterProcessing_Time_"
-	ElementCountTrigger                    string = "Trigger_ElementCount_"
-	AfterEndOfWindowTrigger                string = "Trigger_AfterEndOfWindow_"
-	RepeatTrigger                          string = "Trigger_Repeat_"
-	OrFinallyTrigger                       string = "Trigger_OrFinally_"
-	NeverTrigger                           string = "Trigger_Never_"
-	AfterSynchronizedProcessingTimeTrigger string = "Trigger_AfterSynchronizedProcessingTime_"
-)
-
-// Default constructs a default trigger that fires once after the end of window.
-// Late Data is discarded.
-func Default() Trigger {
-	return Trigger{Kind: DefaultTrigger}
-}
-
-// Always constructs a trigger that fires immediately
-// whenever an element is received.
-//
-// Equivalent to trigger.Repeat(trigger.AfterCount(1))
-func Always() Trigger {
-	return Trigger{Kind: AlwaysTrigger}
-}
-
-// AfterCount constructs a trigger that fires after
-// at least `count` number of elements are processed.
-func AfterCount(count int32) Trigger {
-	return Trigger{Kind: ElementCountTrigger, ElementCount: count}
-}
-
-// AfterProcessingTime constructs a trigger that fires relative to
-// when input first arrives.
-//
-// Must be configured with calls to PlusDelay, or AlignedTo. May be
-// configured with additional delay.
-func AfterProcessingTime() Trigger {
-	return Trigger{Kind: AfterProcessingTimeTrigger}
-}
-
 // PlusDelay configures an AfterProcessingTime trigger to fire after a specified delay,
 // no smaller than a millisecond.
-func (tr Trigger) PlusDelay(delay time.Duration) Trigger {
-	if tr.Kind != AfterProcessingTimeTrigger {
-		panic(fmt.Errorf("can't apply processing delay to %s, want: AfterProcessingTimeTrigger", tr.Kind))
-	}
+func (tr *AfterProcessingTimeTrigger) PlusDelay(delay time.Duration) Trigger {

Review comment:
       A Go idiom that applies here is "Accept interfaces, return concrete types" in this case, don't return the Trigger interface, return the `*AfterProcessingTimeTrigger` type. If it needs to be used as the Trigger interface, it will be, but doing so other wise prevents the chaining.
   

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
 		// TODO: Change to call UnixMilli() once we move to only supporting a go version > 1.17.
 		offsetMillis = offset.Unix()*1e3 + int64(offset.Nanosecond())/1e6
 	}
-	tr.TimestampTransforms = append(tr.TimestampTransforms, AlignToTransform{
+	tr.timestampTransforms = append(tr.timestampTransforms, AlignToTransform{
 		Period: int64(period / time.Millisecond),
 		Offset: offsetMillis,
 	})
 	return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+	subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+	return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {
-	return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+	return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late firing.
+type AfterEndOfWindowTrigger struct {
+	earlyFiring Trigger
+	lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+	return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+	return t.lateFiring
 }
 
 // AfterEndOfWindow constructs a trigger that is configurable for early firing
 // (before the end of window) and late firing (after the end of window).
 //
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
-	defaultEarly := Default()
-	return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger: &defaultEarly, LateTrigger: nil}
+// Must call EarlyFiring or LateFiring method on this trigger at the time of setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
+	return &AfterEndOfWindowTrigger{earlyFiring: nil, lateFiring: nil}
 }
 
 // EarlyFiring configures an AfterEndOfWindow trigger with an implicitly
 // repeated trigger that applies before the end of the window.
-func (tr Trigger) EarlyFiring(early Trigger) Trigger {
-	if tr.Kind != AfterEndOfWindowTrigger {
-		panic(fmt.Errorf("can't apply early firing to %s, want: AfterEndOfWindowTrigger", tr.Kind))
-	}
-	tr.EarlyTrigger = &early
+func (tr *AfterEndOfWindowTrigger) EarlyFiring(early Trigger) Trigger {
+	tr.earlyFiring = early
 	return tr
 }
 
 // LateFiring configures an AfterEndOfWindow trigger with an implicitly
 // repeated trigger that applies after the end of the window.
 //
 // Not setting a late firing trigger means elements are discarded.
-func (tr Trigger) LateFiring(late Trigger) Trigger {
-	if tr.Kind != AfterEndOfWindowTrigger {
-		panic(fmt.Errorf("can't apply late firing to %s, want: AfterEndOfWindowTrigger", tr.Kind))
-	}
-	tr.LateTrigger = &late
+func (tr *AfterEndOfWindowTrigger) LateFiring(late Trigger) Trigger {

Review comment:
       Similarly, return `*AfterEndOfWindowTrigger`

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
 		// TODO: Change to call UnixMilli() once we move to only supporting a go version > 1.17.
 		offsetMillis = offset.Unix()*1e3 + int64(offset.Nanosecond())/1e6
 	}
-	tr.TimestampTransforms = append(tr.TimestampTransforms, AlignToTransform{
+	tr.timestampTransforms = append(tr.timestampTransforms, AlignToTransform{
 		Period: int64(period / time.Millisecond),
 		Offset: offsetMillis,
 	})
 	return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+	subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+	return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {
-	return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+	return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late firing.
+type AfterEndOfWindowTrigger struct {
+	earlyFiring Trigger
+	lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {

Review comment:
       Note that the `EarlyTrigger` and `LateTrigger`, we *must* return the `Trigger` interface, since we don't know anything more precise, since the user could have set anything. It's OK in this case.

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -126,10 +134,7 @@ func (tr Trigger) PlusDelay(delay time.Duration) Trigger {
 //
 // * Period may not be smaller than a millisecond.
 // * Offset may be a zero time (time.Time{}).
-func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
-	if tr.Kind != AfterProcessingTimeTrigger {
-		panic(fmt.Errorf("can't apply processing delay to %s, want: AfterProcessingTimeTrigger", tr.Kind))
-	}
+func (tr *AfterProcessingTimeTrigger) AlignedTo(period time.Duration, offset time.Time) Trigger {

Review comment:
       Similarly, return `*AfterProcessingTimeTrigger`

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
 		// TODO: Change to call UnixMilli() once we move to only supporting a go version > 1.17.
 		offsetMillis = offset.Unix()*1e3 + int64(offset.Nanosecond())/1e6
 	}
-	tr.TimestampTransforms = append(tr.TimestampTransforms, AlignToTransform{
+	tr.timestampTransforms = append(tr.timestampTransforms, AlignToTransform{
 		Period: int64(period / time.Millisecond),
 		Offset: offsetMillis,
 	})
 	return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+	subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+	return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {
-	return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+	return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late firing.
+type AfterEndOfWindowTrigger struct {
+	earlyFiring Trigger
+	lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+	return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+	return t.lateFiring
 }
 
 // AfterEndOfWindow constructs a trigger that is configurable for early firing
 // (before the end of window) and late firing (after the end of window).
 //
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
-	defaultEarly := Default()
-	return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger: &defaultEarly, LateTrigger: nil}
+// Must call EarlyFiring or LateFiring method on this trigger at the time of setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
+	return &AfterEndOfWindowTrigger{earlyFiring: nil, lateFiring: nil}
 }
 
 // EarlyFiring configures an AfterEndOfWindow trigger with an implicitly
 // repeated trigger that applies before the end of the window.
-func (tr Trigger) EarlyFiring(early Trigger) Trigger {
-	if tr.Kind != AfterEndOfWindowTrigger {
-		panic(fmt.Errorf("can't apply early firing to %s, want: AfterEndOfWindowTrigger", tr.Kind))
-	}
-	tr.EarlyTrigger = &early
+func (tr *AfterEndOfWindowTrigger) EarlyFiring(early Trigger) Trigger {

Review comment:
       Similarly, return `*AfterEndOfWindowTrigger`

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
 		// TODO: Change to call UnixMilli() once we move to only supporting a go version > 1.17.
 		offsetMillis = offset.Unix()*1e3 + int64(offset.Nanosecond())/1e6
 	}
-	tr.TimestampTransforms = append(tr.TimestampTransforms, AlignToTransform{
+	tr.timestampTransforms = append(tr.timestampTransforms, AlignToTransform{
 		Period: int64(period / time.Millisecond),
 		Offset: offsetMillis,
 	})
 	return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+	subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+	return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {
-	return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+	return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late firing.
+type AfterEndOfWindowTrigger struct {
+	earlyFiring Trigger
+	lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+	return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+	return t.lateFiring
 }
 
 // AfterEndOfWindow constructs a trigger that is configurable for early firing
 // (before the end of window) and late firing (after the end of window).
 //
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
-	defaultEarly := Default()
-	return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger: &defaultEarly, LateTrigger: nil}
+// Must call EarlyFiring or LateFiring method on this trigger at the time of setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
+	return &AfterEndOfWindowTrigger{earlyFiring: nil, lateFiring: nil}

Review comment:
       Any reason why the default was removed here?
   
   Alternatively (as a separate PR), we should probably have a validation function somewhere for triggers when they're set to make sure they aren't going to lose data or similar:
   
   https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/trigger.py
   
   See the `may_lose_data` methods.  

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
 		// TODO: Change to call UnixMilli() once we move to only supporting a go version > 1.17.
 		offsetMillis = offset.Unix()*1e3 + int64(offset.Nanosecond())/1e6
 	}
-	tr.TimestampTransforms = append(tr.TimestampTransforms, AlignToTransform{
+	tr.timestampTransforms = append(tr.timestampTransforms, AlignToTransform{
 		Period: int64(period / time.Millisecond),
 		Offset: offsetMillis,
 	})
 	return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+	subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+	return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {
-	return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+	return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late firing.
+type AfterEndOfWindowTrigger struct {
+	earlyFiring Trigger
+	lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+	return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+	return t.lateFiring
 }
 
 // AfterEndOfWindow constructs a trigger that is configurable for early firing
 // (before the end of window) and late firing (after the end of window).
 //
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
-	defaultEarly := Default()
-	return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger: &defaultEarly, LateTrigger: nil}
+// Must call EarlyFiring or LateFiring method on this trigger at the time of setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
+	return &AfterEndOfWindowTrigger{earlyFiring: nil, lateFiring: nil}
 }
 
 // EarlyFiring configures an AfterEndOfWindow trigger with an implicitly
 // repeated trigger that applies before the end of the window.
-func (tr Trigger) EarlyFiring(early Trigger) Trigger {
-	if tr.Kind != AfterEndOfWindowTrigger {
-		panic(fmt.Errorf("can't apply early firing to %s, want: AfterEndOfWindowTrigger", tr.Kind))
-	}
-	tr.EarlyTrigger = &early
+func (tr *AfterEndOfWindowTrigger) EarlyFiring(early Trigger) Trigger {
+	tr.earlyFiring = early
 	return tr
 }
 
 // LateFiring configures an AfterEndOfWindow trigger with an implicitly
 // repeated trigger that applies after the end of the window.
 //
 // Not setting a late firing trigger means elements are discarded.
-func (tr Trigger) LateFiring(late Trigger) Trigger {
-	if tr.Kind != AfterEndOfWindowTrigger {
-		panic(fmt.Errorf("can't apply late firing to %s, want: AfterEndOfWindowTrigger", tr.Kind))
-	}
-	tr.LateTrigger = &late
+func (tr *AfterEndOfWindowTrigger) LateFiring(late Trigger) Trigger {
+	tr.lateFiring = late
 	return tr
 }
+
+// TODO(BEAM-3304) Add support for composite triggers.
+// Below defined triggers do not work as of now.
+// Intended for framework use only.

Review comment:
       It's irritating, but since these types below are Exported, you'll need to add the "TODO,(BEAM-3304)" or "NYI(BEAM-3304)" to each of their doc strings.
   
    Only people who actually read the source will discover this comment block as written, which is not how users will find these. (autocomplete may suggest them, etc). 

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
 		// TODO: Change to call UnixMilli() once we move to only supporting a go version > 1.17.
 		offsetMillis = offset.Unix()*1e3 + int64(offset.Nanosecond())/1e6
 	}
-	tr.TimestampTransforms = append(tr.TimestampTransforms, AlignToTransform{
+	tr.timestampTransforms = append(tr.timestampTransforms, AlignToTransform{
 		Period: int64(period / time.Millisecond),
 		Offset: offsetMillis,
 	})
 	return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+	subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+	return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {

Review comment:
       Similarly, return `*RepeatTrigger`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] riteshghorse commented on a change in pull request #15952: [BEAM-3304] Refactor trigger API

Posted by GitBox <gi...@apache.org>.
riteshghorse commented on a change in pull request #15952:
URL: https://github.com/apache/beam/pull/15952#discussion_r748365829



##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
 		// TODO: Change to call UnixMilli() once we move to only supporting a go version > 1.17.
 		offsetMillis = offset.Unix()*1e3 + int64(offset.Nanosecond())/1e6
 	}
-	tr.TimestampTransforms = append(tr.TimestampTransforms, AlignToTransform{
+	tr.timestampTransforms = append(tr.timestampTransforms, AlignToTransform{
 		Period: int64(period / time.Millisecond),
 		Offset: offsetMillis,
 	})
 	return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+	subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+	return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {
-	return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+	return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late firing.
+type AfterEndOfWindowTrigger struct {
+	earlyFiring Trigger
+	lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+	return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+	return t.lateFiring
 }
 
 // AfterEndOfWindow constructs a trigger that is configurable for early firing
 // (before the end of window) and late firing (after the end of window).
 //
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
-	defaultEarly := Default()
-	return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger: &defaultEarly, LateTrigger: nil}
+// Must call EarlyFiring or LateFiring method on this trigger at the time of setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
+	return &AfterEndOfWindowTrigger{earlyFiring: nil, lateFiring: nil}

Review comment:
       The idea was to return the Trigger interface from all triggers. But in doing so I wanted to make sure that the methods `EarlyFiring` and `LateFiring` is always called on `AfterEndOfWindow`. Therefore, the users will always have to call `EarlyFiring` or `LateFiring` on `AfterEndOfWindow` which returns the `Trigger` interface. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org