You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/15 22:58:00 UTC

[GitHub] [beam] lostluck commented on a change in pull request #15981: [BEAM-3293] Finish E2E implementation of MultiMap side inputs, add integration test

lostluck commented on a change in pull request #15981:
URL: https://github.com/apache/beam/pull/15981#discussion_r749736267



##########
File path: sdks/go/test/integration/primitives/pardo.go
##########
@@ -81,3 +87,71 @@ func ParDoKVSideInput() *beam.Pipeline {
 
 	return p
 }
+
+type stringPair struct {
+	K, V string
+}
+
+func splitStringPair(e stringPair) (string, string) {
+	return e.K, e.V
+}
+
+var emailSlice = []stringPair{
+	{"amy", "amy@example.com"},
+	{"james", "james@email.com"},
+	{"carl", "carl@example.com"},
+	{"julia", "julia@example.com"},
+	{"carl", "carl@email.com"},
+	{"james", "james@example.com"},
+}
+
+var phoneSlice = []stringPair{
+	{"amy", "111-222-3333"},
+	{"james", "222-333-4444"},
+}
+
+// CreateAndSplit makes a KV PCollection from a list of stringPair types
+func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
+	initial := beam.CreateList(s, input)
+	return beam.ParDo(s, splitStringPair, initial)
+}
+
+// ParDoMultiMapSideInput checkls that the multimap side input access pattern

Review comment:
       ```suggestion
   // ParDoMultiMapSideInput checks that the multimap side input access pattern
   ```

##########
File path: sdks/go/test/integration/primitives/pardo.go
##########
@@ -81,3 +87,71 @@ func ParDoKVSideInput() *beam.Pipeline {
 
 	return p
 }
+
+type stringPair struct {
+	K, V string
+}
+
+func splitStringPair(e stringPair) (string, string) {
+	return e.K, e.V
+}
+
+var emailSlice = []stringPair{
+	{"amy", "amy@example.com"},
+	{"james", "james@email.com"},
+	{"carl", "carl@example.com"},
+	{"julia", "julia@example.com"},
+	{"carl", "carl@email.com"},
+	{"james", "james@example.com"},
+}
+
+var phoneSlice = []stringPair{
+	{"amy", "111-222-3333"},
+	{"james", "222-333-4444"},
+}
+
+// CreateAndSplit makes a KV PCollection from a list of stringPair types
+func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
+	initial := beam.CreateList(s, input)
+	return beam.ParDo(s, splitStringPair, initial)
+}
+
+// ParDoMultiMapSideInput checkls that the multimap side input access pattern
+// works correctly, properly producing the correct output. for the asymmetric joing

Review comment:
       ```suggestion
   // works correctly, properly producing the correct output with an asymmetric join.
   ```

##########
File path: sdks/go/test/integration/integration.go
##########
@@ -67,6 +67,8 @@ var directFilters = []string{
 	"TestTestStream.*",
 	// (BEAM-13075): The direct runner does not support windowed side inputs
 	"TestValidateWindowedSideInputs",
+	// The direct runner does not currently support multimap side inputs

Review comment:
       Please add the JIRA for this issue. Likely the work can be consolidated w/ the windowed side inputs above, as it'll require similar/identical work.
   
   Scope creep, but very related scope creep.

##########
File path: sdks/go/test/integration/primitives/pardo.go
##########
@@ -81,3 +87,71 @@ func ParDoKVSideInput() *beam.Pipeline {
 
 	return p
 }
+
+type stringPair struct {
+	K, V string
+}
+
+func splitStringPair(e stringPair) (string, string) {
+	return e.K, e.V
+}
+
+var emailSlice = []stringPair{
+	{"amy", "amy@example.com"},
+	{"james", "james@email.com"},
+	{"carl", "carl@example.com"},
+	{"julia", "julia@example.com"},
+	{"carl", "carl@email.com"},
+	{"james", "james@example.com"},
+}
+
+var phoneSlice = []stringPair{
+	{"amy", "111-222-3333"},
+	{"james", "222-333-4444"},
+}
+
+// CreateAndSplit makes a KV PCollection from a list of stringPair types
+func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
+	initial := beam.CreateList(s, input)
+	return beam.ParDo(s, splitStringPair, initial)
+}
+
+// ParDoMultiMapSideInput checkls that the multimap side input access pattern
+// works correctly, properly producing the correct output. for the asymmetric joing
+func ParDoMultiMapSideInput() *beam.Pipeline {
+	beam.Init()
+	p, s := beam.NewPipelineWithRoot()
+	emailsKV := CreateAndSplit(s.Scope("CreateEmails"), emailSlice)
+	phonesKV := CreateAndSplit(s.Scope("CreatePhones"), phoneSlice)
+	output := beam.ParDo(s, asymJoinFn, phonesKV, beam.SideInput{Input: emailsKV})
+	passert.Count(s, output, "post-join", 2)
+	amyOut, jamesOut := beam.ParDo2(s, splitByName, output)
+	passert.Equals(s, amyOut, "amy@example.com", "111-222-3333")
+	passert.Equals(s, jamesOut, "james@email.com", "james@example.com", "222-333-4444")
+	return p
+}
+
+func asymJoinFn(k, v string, mapSide func(string) func(*string) bool) (string, []string) {
+	var out string
+	var results []string
+	results = append(results, v)
+	iter := mapSide(k)
+	for iter(&out) {
+		results = append(results, out)
+	}
+	return k, results
+}
+
+func splitByName(key string, vals []string, a, j func(string)) {

Review comment:
       Consider adding a 3rd "catch all" output that is used if the keys don't match as a default case. Then it can be verified to be empty with `passert.Empty`.

##########
File path: sdks/go/test/integration/primitives/pardo.go
##########
@@ -81,3 +87,71 @@ func ParDoKVSideInput() *beam.Pipeline {
 
 	return p
 }
+
+type stringPair struct {
+	K, V string
+}
+
+func splitStringPair(e stringPair) (string, string) {
+	return e.K, e.V
+}
+
+var emailSlice = []stringPair{
+	{"amy", "amy@example.com"},
+	{"james", "james@email.com"},
+	{"carl", "carl@example.com"},
+	{"julia", "julia@example.com"},
+	{"carl", "carl@email.com"},
+	{"james", "james@example.com"},
+}
+
+var phoneSlice = []stringPair{
+	{"amy", "111-222-3333"},
+	{"james", "222-333-4444"},
+}
+
+// CreateAndSplit makes a KV PCollection from a list of stringPair types
+func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
+	initial := beam.CreateList(s, input)
+	return beam.ParDo(s, splitStringPair, initial)
+}
+
+// ParDoMultiMapSideInput checkls that the multimap side input access pattern
+// works correctly, properly producing the correct output. for the asymmetric joing
+func ParDoMultiMapSideInput() *beam.Pipeline {
+	beam.Init()
+	p, s := beam.NewPipelineWithRoot()
+	emailsKV := CreateAndSplit(s.Scope("CreateEmails"), emailSlice)
+	phonesKV := CreateAndSplit(s.Scope("CreatePhones"), phoneSlice)
+	output := beam.ParDo(s, asymJoinFn, phonesKV, beam.SideInput{Input: emailsKV})
+	passert.Count(s, output, "post-join", 2)
+	amyOut, jamesOut := beam.ParDo2(s, splitByName, output)
+	passert.Equals(s, amyOut, "amy@example.com", "111-222-3333")
+	passert.Equals(s, jamesOut, "james@email.com", "james@example.com", "222-333-4444")
+	return p
+}
+
+func asymJoinFn(k, v string, mapSide func(string) func(*string) bool) (string, []string) {
+	var out string
+	var results []string
+	results = append(results, v)

Review comment:
       ```suggestion
   	results := []string{v}
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org