You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/11 01:10:58 UTC

[GitHub] [beam] youngoli opened a new pull request #11986: [BEAM-10235] Adding a CountElms transform to the Go SDK.

youngoli opened a new pull request #11986:
URL: https://github.com/apache/beam/pull/11986


   This transform counts the number of elements in a PTransform.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on pull request #11986: [BEAM-10235] Adding a CountElms transform to the Go SDK.

Posted by GitBox <gi...@apache.org>.
youngoli commented on pull request #11986:
URL: https://github.com/apache/beam/pull/11986#issuecomment-642346309


   R: @lostluck 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #11986: [BEAM-10235] Adding a CountElms transform to the Go SDK.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11986:
URL: https://github.com/apache/beam/pull/11986#discussion_r438884627



##########
File path: sdks/go/pkg/beam/transforms/stats/count.go
##########
@@ -18,18 +18,36 @@ package stats
 
 import (
 	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 )
 
-// Count counts the number of elements in a collection. It expects a
-// PCollection<T> as input and returns a PCollection<KV<T,int>>. T's encoding
-// must be a well-defined injection.
+// Count counts the number of appearances of each element in a collection. It
+// expects a PCollection<T> as input and returns a PCollection<KV<T,int>>. T's
+// encoding must be a well-defined injection.
 func Count(s beam.Scope, col beam.PCollection) beam.PCollection {
 	s = s.Scope("stats.Count")
 
-	pre := beam.ParDo(s, mapFn, col)
+	pre := beam.ParDo(s, keyedMapFn, col)
 	return SumPerKey(s, pre)
 }
 
-func mapFn(elm beam.T) (beam.T, int) {
+func keyedMapFn(elm beam.T) (beam.T, int) {
 	return elm, 1
 }
+
+// CountElms counts the number of elements in a collection. It expects a
+// PCollection<T> as input and returns a PCollection<int> of one element
+// containing the count. T's encoding must be a well-defined injection.
+func CountElms(s beam.Scope, col beam.PCollection) beam.PCollection {
+	s = s.Scope("stats.CountElms")
+
+	if typex.IsKV(col.Type()) {
+		col = beam.DropKey(s, col)
+	}
+	pre := beam.ParDo(s, mapFn, col)
+	return Sum(s, pre)
+}
+
+func mapFn(_ beam.T) int {

Review comment:
       Consider countFn?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #11986: [BEAM-10235] Adding a CountElms transform to the Go SDK.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11986:
URL: https://github.com/apache/beam/pull/11986#discussion_r438868945



##########
File path: sdks/go/pkg/beam/transforms/stats/count.go
##########
@@ -18,18 +18,36 @@ package stats
 
 import (
 	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 )
 
-// Count counts the number of elements in a collection. It expects a
-// PCollection<T> as input and returns a PCollection<KV<T,int>>. T's encoding
-// must be a well-defined injection.
+// Count counts the number of appearances of each element in a collection. It
+// expects a PCollection<T> as input and returns a PCollection<KV<T,int>>. T's
+// encoding must be a well-defined injection.
 func Count(s beam.Scope, col beam.PCollection) beam.PCollection {
 	s = s.Scope("stats.Count")
 
-	pre := beam.ParDo(s, mapFn, col)
+	pre := beam.ParDo(s, keyedMapFn, col)
 	return SumPerKey(s, pre)
 }
 
-func mapFn(elm beam.T) (beam.T, int) {
+func keyedMapFn(elm beam.T) (beam.T, int) {
 	return elm, 1
 }
+
+// CountElms counts the number of elements in a collection. It expects a
+// PCollection<T> as input and returns a PCollection<int> of one element
+// containing the count. T's encoding must be a well-defined injection.

Review comment:
       Consider removing that last sentence about injections (here and above). Or we can change it!
   I think it's supposed to require that the encodings be deterministic  (which is required since Count uses elements as keys) which is clearer than the formal wording that currently exists.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #11986: [BEAM-10235] Adding a CountElms transform to the Go SDK.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #11986:
URL: https://github.com/apache/beam/pull/11986#discussion_r439719611



##########
File path: sdks/go/pkg/beam/transforms/stats/count.go
##########
@@ -18,18 +18,36 @@ package stats
 
 import (
 	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 )
 
-// Count counts the number of elements in a collection. It expects a
-// PCollection<T> as input and returns a PCollection<KV<T,int>>. T's encoding
-// must be a well-defined injection.
+// Count counts the number of appearances of each element in a collection. It
+// expects a PCollection<T> as input and returns a PCollection<KV<T,int>>. T's
+// encoding must be a well-defined injection.
 func Count(s beam.Scope, col beam.PCollection) beam.PCollection {
 	s = s.Scope("stats.Count")
 
-	pre := beam.ParDo(s, mapFn, col)
+	pre := beam.ParDo(s, keyedMapFn, col)
 	return SumPerKey(s, pre)
 }
 
-func mapFn(elm beam.T) (beam.T, int) {
+func keyedMapFn(elm beam.T) (beam.T, int) {
 	return elm, 1
 }
+
+// CountElms counts the number of elements in a collection. It expects a
+// PCollection<T> as input and returns a PCollection<int> of one element
+// containing the count. T's encoding must be a well-defined injection.

Review comment:
       Done for Count. And for CountElms I removed the line because T's encoding doesn't need to be deterministic since it's not used as a key.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #11986: [BEAM-10235] Adding a CountElms transform to the Go SDK.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11986:
URL: https://github.com/apache/beam/pull/11986#discussion_r438868945



##########
File path: sdks/go/pkg/beam/transforms/stats/count.go
##########
@@ -18,18 +18,36 @@ package stats
 
 import (
 	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 )
 
-// Count counts the number of elements in a collection. It expects a
-// PCollection<T> as input and returns a PCollection<KV<T,int>>. T's encoding
-// must be a well-defined injection.
+// Count counts the number of appearances of each element in a collection. It
+// expects a PCollection<T> as input and returns a PCollection<KV<T,int>>. T's
+// encoding must be a well-defined injection.
 func Count(s beam.Scope, col beam.PCollection) beam.PCollection {
 	s = s.Scope("stats.Count")
 
-	pre := beam.ParDo(s, mapFn, col)
+	pre := beam.ParDo(s, keyedMapFn, col)
 	return SumPerKey(s, pre)
 }
 
-func mapFn(elm beam.T) (beam.T, int) {
+func keyedMapFn(elm beam.T) (beam.T, int) {
 	return elm, 1
 }
+
+// CountElms counts the number of elements in a collection. It expects a
+// PCollection<T> as input and returns a PCollection<int> of one element
+// containing the count. T's encoding must be a well-defined injection.

Review comment:
       Consider removing that last sentence about injections (here and above), I don't even know what that's supposed to mean, since we need all encodings to be well defined anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #11986: [BEAM-10235] Adding a CountElms transform to the Go SDK.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #11986:
URL: https://github.com/apache/beam/pull/11986#discussion_r439719908



##########
File path: sdks/go/pkg/beam/transforms/stats/count.go
##########
@@ -18,18 +18,36 @@ package stats
 
 import (
 	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 )
 
-// Count counts the number of elements in a collection. It expects a
-// PCollection<T> as input and returns a PCollection<KV<T,int>>. T's encoding
-// must be a well-defined injection.
+// Count counts the number of appearances of each element in a collection. It
+// expects a PCollection<T> as input and returns a PCollection<KV<T,int>>. T's
+// encoding must be a well-defined injection.
 func Count(s beam.Scope, col beam.PCollection) beam.PCollection {
 	s = s.Scope("stats.Count")
 
-	pre := beam.ParDo(s, mapFn, col)
+	pre := beam.ParDo(s, keyedMapFn, col)
 	return SumPerKey(s, pre)
 }
 
-func mapFn(elm beam.T) (beam.T, int) {
+func keyedMapFn(elm beam.T) (beam.T, int) {
 	return elm, 1
 }
+
+// CountElms counts the number of elements in a collection. It expects a
+// PCollection<T> as input and returns a PCollection<int> of one element
+// containing the count. T's encoding must be a well-defined injection.
+func CountElms(s beam.Scope, col beam.PCollection) beam.PCollection {
+	s = s.Scope("stats.CountElms")
+
+	if typex.IsKV(col.Type()) {
+		col = beam.DropKey(s, col)
+	}
+	pre := beam.ParDo(s, mapFn, col)
+	return Sum(s, pre)
+}
+
+func mapFn(_ beam.T) int {

Review comment:
       Sounds good to me. Changed this to countFn and the other to keyedCountFn.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli merged pull request #11986: [BEAM-10235] Adding a CountElms transform to the Go SDK.

Posted by GitBox <gi...@apache.org>.
youngoli merged pull request #11986:
URL: https://github.com/apache/beam/pull/11986


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org