You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/29 07:28:20 UTC

[GitHub] [beam] simonepri opened a new pull request #12393: Support creation of empty PCollection in beam CreateList

simonepri opened a new pull request #12393:
URL: https://github.com/apache/beam/pull/12393


   This PR allow to pass an empty list to the beam.CreateList function.
   Please merge #12373 before this.
   ------------------------
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] simonepri commented on pull request #12393: Support creation of empty PCollection in beam CreateList

Posted by GitBox <gi...@apache.org>.
simonepri commented on pull request #12393:
URL: https://github.com/apache/beam/pull/12393#issuecomment-665526797


   Shall we document this change in the Changelog?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] simonepri removed a comment on pull request #12393: Support creation of empty PCollection in beam CreateList

Posted by GitBox <gi...@apache.org>.
simonepri removed a comment on pull request #12393:
URL: https://github.com/apache/beam/pull/12393#issuecomment-665526797


   Shall we document this change in the Changelog?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] simonepri commented on a change in pull request #12393: Support creation of empty PCollection in beam CreateList

Posted by GitBox <gi...@apache.org>.
simonepri commented on a change in pull request #12393:
URL: https://github.com/apache/beam/pull/12393#discussion_r462068120



##########
File path: sdks/go/pkg/beam/create.go
##########
@@ -34,31 +34,44 @@ func Create(s Scope, values ...interface{}) PCollection {
 }
 
 // CreateList inserts a fixed set of values into the pipeline from a slice or
-// array. It is a convenience wrapper over Create.
+// array. Differently from Create this supports the creation of an empty
+// PCollection.
 func CreateList(s Scope, list interface{}) PCollection {
-	var ret []interface{}
 	val := reflect.ValueOf(list)
+	var ret []interface{}
+	for i := 0; i < val.Len(); i++ {

Review comment:
       You are right, thanks for spotting this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #12393: Support creation of empty PCollection in beam CreateList

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #12393:
URL: https://github.com/apache/beam/pull/12393#issuecomment-665916062


   Thank you for the contribution!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] simonepri commented on a change in pull request #12393: Support creation of empty PCollection in beam CreateList

Posted by GitBox <gi...@apache.org>.
simonepri commented on a change in pull request #12393:
URL: https://github.com/apache/beam/pull/12393#discussion_r462453993



##########
File path: sdks/go/pkg/beam/create.go
##########
@@ -34,31 +34,44 @@ func Create(s Scope, values ...interface{}) PCollection {
 }
 
 // CreateList inserts a fixed set of values into the pipeline from a slice or
-// array. It is a convenience wrapper over Create.
+// array. Unlike Create this supports the creation of an empty PCollection.
 func CreateList(s Scope, list interface{}) PCollection {
-	var ret []interface{}
 	val := reflect.ValueOf(list)
 	if val.Kind() != reflect.Slice && val.Kind() != reflect.Array {
 		panic(fmt.Sprintf("Input %v must be a slice or array", list))
 	}
+	var ret []interface{}
 	for i := 0; i < val.Len(); i++ {
 		ret = append(ret, val.Index(i).Interface())
 	}
-	return Must(TryCreate(s, ret...))
+	var t reflect.Type
+	if len(ret) == 0 {
+		t = reflect.TypeOf(list).Elem()
+	} else {
+		t = reflect.ValueOf(ret[0]).Type()
+	}
+	return Must(TryCreateList(s, ret, t))
 }
 
 func addCreateCtx(err error, s Scope) error {
 	return errors.WithContextf(err, "inserting Create in scope %s", s)
 }
 
-// TryCreate inserts a fixed set of values into the pipeline. The values must
-// be of the same type.
+// TryCreate inserts a fixed non-empty set of values into the pipeline. The
+// values must be of the same type.
 func TryCreate(s Scope, values ...interface{}) (PCollection, error) {
 	if len(values) == 0 {
 		return PCollection{}, addCreateCtx(errors.New("create has no values"), s)
 	}
 
 	t := reflect.ValueOf(values[0]).Type()
+	return TryCreateList(s, values, t)
+}
+
+// TryCreateList inserts a fixed set of values into the pipeline from a slice or
+// array. The values must be of the same type. Unlike TryCreate this supports
+// the creation of an empty PCollection.
+func TryCreateList(s Scope, values []interface{}, t reflect.Type) (PCollection, error) {

Review comment:
       Thanks for the comment!
   I did not know about this convention, but makes completely sense.
   Let me know if it is okay now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] simonepri commented on pull request #12393: Support creation of empty PCollection in beam CreateList

Posted by GitBox <gi...@apache.org>.
simonepri commented on pull request #12393:
URL: https://github.com/apache/beam/pull/12393#issuecomment-665129462






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #12393: Support creation of empty PCollection in beam CreateList

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #12393:
URL: https://github.com/apache/beam/pull/12393#discussion_r462422357



##########
File path: sdks/go/pkg/beam/create.go
##########
@@ -34,31 +34,44 @@ func Create(s Scope, values ...interface{}) PCollection {
 }
 
 // CreateList inserts a fixed set of values into the pipeline from a slice or
-// array. It is a convenience wrapper over Create.
+// array. Unlike Create this supports the creation of an empty PCollection.
 func CreateList(s Scope, list interface{}) PCollection {
-	var ret []interface{}
 	val := reflect.ValueOf(list)
 	if val.Kind() != reflect.Slice && val.Kind() != reflect.Array {
 		panic(fmt.Sprintf("Input %v must be a slice or array", list))
 	}
+	var ret []interface{}
 	for i := 0; i < val.Len(); i++ {
 		ret = append(ret, val.Index(i).Interface())
 	}
-	return Must(TryCreate(s, ret...))
+	var t reflect.Type
+	if len(ret) == 0 {
+		t = reflect.TypeOf(list).Elem()
+	} else {
+		t = reflect.ValueOf(ret[0]).Type()
+	}
+	return Must(TryCreateList(s, ret, t))
 }
 
 func addCreateCtx(err error, s Scope) error {
 	return errors.WithContextf(err, "inserting Create in scope %s", s)
 }
 
-// TryCreate inserts a fixed set of values into the pipeline. The values must
-// be of the same type.
+// TryCreate inserts a fixed non-empty set of values into the pipeline. The
+// values must be of the same type.
 func TryCreate(s Scope, values ...interface{}) (PCollection, error) {
 	if len(values) == 0 {
 		return PCollection{}, addCreateCtx(errors.New("create has no values"), s)
 	}
 
 	t := reflect.ValueOf(values[0]).Type()
+	return TryCreateList(s, values, t)
+}
+
+// TryCreateList inserts a fixed set of values into the pipeline from a slice or
+// array. The values must be of the same type. Unlike TryCreate this supports
+// the creation of an empty PCollection.
+func TryCreateList(s Scope, values []interface{}, t reflect.Type) (PCollection, error) {

Review comment:
       TryCreateList and CreateList should only differ in returning an error, and not force different parameters.
   
   That said, for convenience it's totally reasonable for an unexported tryCreateList to have this signature for reuse as you've done, and have the exported TryCreateList infer from the raw empty list type.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #12393: Support creation of empty PCollection in beam CreateList

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #12393:
URL: https://github.com/apache/beam/pull/12393#discussion_r461961550



##########
File path: sdks/go/pkg/beam/create.go
##########
@@ -34,31 +34,44 @@ func Create(s Scope, values ...interface{}) PCollection {
 }
 
 // CreateList inserts a fixed set of values into the pipeline from a slice or
-// array. It is a convenience wrapper over Create.
+// array. Differently from Create this supports the creation of an empty
+// PCollection.
 func CreateList(s Scope, list interface{}) PCollection {
-	var ret []interface{}
 	val := reflect.ValueOf(list)
+	var ret []interface{}
+	for i := 0; i < val.Len(); i++ {
+		ret = append(ret, val.Index(i).Interface())
+	}
 	if val.Kind() != reflect.Slice && val.Kind() != reflect.Array {
 		panic(fmt.Sprintf("Input %v must be a slice or array", list))
 	}
-	for i := 0; i < val.Len(); i++ {
-		ret = append(ret, val.Index(i).Interface())
+	if val.Len() == 0 {
+		t := reflect.TypeOf(list).Elem()
+		return Must(TryCreateList(s, t, ret))
 	}
-	return Must(TryCreate(s, ret...))
+	t := reflect.ValueOf(ret[0]).Type()
+	return Must(TryCreateList(s, t, ret))
 }
 
 func addCreateCtx(err error, s Scope) error {
 	return errors.WithContextf(err, "inserting Create in scope %s", s)
 }
 
-// TryCreate inserts a fixed set of values into the pipeline. The values must
-// be of the same type.
+// TryCreate inserts a fixed non-empty set of values into the pipeline. The
+// values must be of the same type.
 func TryCreate(s Scope, values ...interface{}) (PCollection, error) {
 	if len(values) == 0 {
 		return PCollection{}, addCreateCtx(errors.New("create has no values"), s)
 	}
 
 	t := reflect.ValueOf(values[0]).Type()
+	return TryCreateList(s, t, values)
+}
+
+// TryCreateList inserts a fixed set of values into the pipeline from a slice or
+// array. The values must be of the same type. Differently from TryCreate this

Review comment:
       Same suggestion as above:
   ```suggestion
   // array. The values must be of the same type. Unlike TryCreate this
   ```

##########
File path: sdks/go/pkg/beam/create.go
##########
@@ -34,31 +34,44 @@ func Create(s Scope, values ...interface{}) PCollection {
 }
 
 // CreateList inserts a fixed set of values into the pipeline from a slice or
-// array. It is a convenience wrapper over Create.
+// array. Differently from Create this supports the creation of an empty

Review comment:
       Optional, but how about this wording?:
   ```suggestion
   // array. Unlike Create this supports the creation of an empty
   ```

##########
File path: sdks/go/pkg/beam/create.go
##########
@@ -34,31 +34,44 @@ func Create(s Scope, values ...interface{}) PCollection {
 }
 
 // CreateList inserts a fixed set of values into the pipeline from a slice or
-// array. It is a convenience wrapper over Create.
+// array. Differently from Create this supports the creation of an empty
+// PCollection.
 func CreateList(s Scope, list interface{}) PCollection {
-	var ret []interface{}
 	val := reflect.ValueOf(list)
+	var ret []interface{}
+	for i := 0; i < val.Len(); i++ {

Review comment:
       It seems better to have the error check below happen before this loop (unless you have a specific reason for putting the loop first).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck merged pull request #12393: Support creation of empty PCollection in beam CreateList

Posted by GitBox <gi...@apache.org>.
lostluck merged pull request #12393:
URL: https://github.com/apache/beam/pull/12393


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #12393: Support creation of empty PCollection in beam CreateList

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #12393:
URL: https://github.com/apache/beam/pull/12393#issuecomment-665788173


   Run Go PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org