You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/15 22:12:48 UTC

[GitHub] [beam] jrmccluskey opened a new pull request #15981: [BEAM-3293] Finish E2E implementation of MultiMap side inputs, add integration test

jrmccluskey opened a new pull request #15981:
URL: https://github.com/apache/beam/pull/15981


   Corrects key handling to coder in NewKeyedIterable, adds param masking for FnMultiMap type, adds unit testing for invoke() calls using multimap access pattern, and adds an integration test to validate that multimap side inputs work as expected when used. The integration test is currently disabled on the Go direct runner, as it currently does not support the multimap side input form properly.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on a change in pull request #15981: [BEAM-3293] Finish E2E implementation of MultiMap side inputs, add integration test

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15981:
URL: https://github.com/apache/beam/pull/15981#discussion_r750309234



##########
File path: sdks/go/test/integration/primitives/pardo.go
##########
@@ -81,3 +87,71 @@ func ParDoKVSideInput() *beam.Pipeline {
 
 	return p
 }
+
+type stringPair struct {
+	K, V string
+}
+
+func splitStringPair(e stringPair) (string, string) {
+	return e.K, e.V
+}
+
+var emailSlice = []stringPair{
+	{"amy", "amy@example.com"},
+	{"james", "james@email.com"},
+	{"carl", "carl@example.com"},
+	{"julia", "julia@example.com"},
+	{"carl", "carl@email.com"},
+	{"james", "james@example.com"},
+}
+
+var phoneSlice = []stringPair{
+	{"amy", "111-222-3333"},
+	{"james", "222-333-4444"},
+}
+
+// CreateAndSplit makes a KV PCollection from a list of stringPair types
+func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
+	initial := beam.CreateList(s, input)
+	return beam.ParDo(s, splitStringPair, initial)
+}
+
+// ParDoMultiMapSideInput checkls that the multimap side input access pattern
+// works correctly, properly producing the correct output. for the asymmetric joing
+func ParDoMultiMapSideInput() *beam.Pipeline {
+	beam.Init()
+	p, s := beam.NewPipelineWithRoot()
+	emailsKV := CreateAndSplit(s.Scope("CreateEmails"), emailSlice)
+	phonesKV := CreateAndSplit(s.Scope("CreatePhones"), phoneSlice)
+	output := beam.ParDo(s, asymJoinFn, phonesKV, beam.SideInput{Input: emailsKV})
+	passert.Count(s, output, "post-join", 2)
+	amyOut, jamesOut := beam.ParDo2(s, splitByName, output)
+	passert.Equals(s, amyOut, "amy@example.com", "111-222-3333")
+	passert.Equals(s, jamesOut, "james@email.com", "james@example.com", "222-333-4444")
+	return p
+}
+
+func asymJoinFn(k, v string, mapSide func(string) func(*string) bool) (string, []string) {
+	var out string
+	var results []string
+	results = append(results, v)
+	iter := mapSide(k)
+	for iter(&out) {
+		results = append(results, out)
+	}
+	return k, results
+}
+
+func splitByName(key string, vals []string, a, j func(string)) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15981: [BEAM-3293] Finish E2E implementation of MultiMap side inputs, add integration test

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15981:
URL: https://github.com/apache/beam/pull/15981#discussion_r749736267



##########
File path: sdks/go/test/integration/primitives/pardo.go
##########
@@ -81,3 +87,71 @@ func ParDoKVSideInput() *beam.Pipeline {
 
 	return p
 }
+
+type stringPair struct {
+	K, V string
+}
+
+func splitStringPair(e stringPair) (string, string) {
+	return e.K, e.V
+}
+
+var emailSlice = []stringPair{
+	{"amy", "amy@example.com"},
+	{"james", "james@email.com"},
+	{"carl", "carl@example.com"},
+	{"julia", "julia@example.com"},
+	{"carl", "carl@email.com"},
+	{"james", "james@example.com"},
+}
+
+var phoneSlice = []stringPair{
+	{"amy", "111-222-3333"},
+	{"james", "222-333-4444"},
+}
+
+// CreateAndSplit makes a KV PCollection from a list of stringPair types
+func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
+	initial := beam.CreateList(s, input)
+	return beam.ParDo(s, splitStringPair, initial)
+}
+
+// ParDoMultiMapSideInput checkls that the multimap side input access pattern

Review comment:
       ```suggestion
   // ParDoMultiMapSideInput checks that the multimap side input access pattern
   ```

##########
File path: sdks/go/test/integration/primitives/pardo.go
##########
@@ -81,3 +87,71 @@ func ParDoKVSideInput() *beam.Pipeline {
 
 	return p
 }
+
+type stringPair struct {
+	K, V string
+}
+
+func splitStringPair(e stringPair) (string, string) {
+	return e.K, e.V
+}
+
+var emailSlice = []stringPair{
+	{"amy", "amy@example.com"},
+	{"james", "james@email.com"},
+	{"carl", "carl@example.com"},
+	{"julia", "julia@example.com"},
+	{"carl", "carl@email.com"},
+	{"james", "james@example.com"},
+}
+
+var phoneSlice = []stringPair{
+	{"amy", "111-222-3333"},
+	{"james", "222-333-4444"},
+}
+
+// CreateAndSplit makes a KV PCollection from a list of stringPair types
+func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
+	initial := beam.CreateList(s, input)
+	return beam.ParDo(s, splitStringPair, initial)
+}
+
+// ParDoMultiMapSideInput checkls that the multimap side input access pattern
+// works correctly, properly producing the correct output. for the asymmetric joing

Review comment:
       ```suggestion
   // works correctly, properly producing the correct output with an asymmetric join.
   ```

##########
File path: sdks/go/test/integration/integration.go
##########
@@ -67,6 +67,8 @@ var directFilters = []string{
 	"TestTestStream.*",
 	// (BEAM-13075): The direct runner does not support windowed side inputs
 	"TestValidateWindowedSideInputs",
+	// The direct runner does not currently support multimap side inputs

Review comment:
       Please add the JIRA for this issue. Likely the work can be consolidated w/ the windowed side inputs above, as it'll require similar/identical work.
   
   Scope creep, but very related scope creep.

##########
File path: sdks/go/test/integration/primitives/pardo.go
##########
@@ -81,3 +87,71 @@ func ParDoKVSideInput() *beam.Pipeline {
 
 	return p
 }
+
+type stringPair struct {
+	K, V string
+}
+
+func splitStringPair(e stringPair) (string, string) {
+	return e.K, e.V
+}
+
+var emailSlice = []stringPair{
+	{"amy", "amy@example.com"},
+	{"james", "james@email.com"},
+	{"carl", "carl@example.com"},
+	{"julia", "julia@example.com"},
+	{"carl", "carl@email.com"},
+	{"james", "james@example.com"},
+}
+
+var phoneSlice = []stringPair{
+	{"amy", "111-222-3333"},
+	{"james", "222-333-4444"},
+}
+
+// CreateAndSplit makes a KV PCollection from a list of stringPair types
+func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
+	initial := beam.CreateList(s, input)
+	return beam.ParDo(s, splitStringPair, initial)
+}
+
+// ParDoMultiMapSideInput checkls that the multimap side input access pattern
+// works correctly, properly producing the correct output. for the asymmetric joing
+func ParDoMultiMapSideInput() *beam.Pipeline {
+	beam.Init()
+	p, s := beam.NewPipelineWithRoot()
+	emailsKV := CreateAndSplit(s.Scope("CreateEmails"), emailSlice)
+	phonesKV := CreateAndSplit(s.Scope("CreatePhones"), phoneSlice)
+	output := beam.ParDo(s, asymJoinFn, phonesKV, beam.SideInput{Input: emailsKV})
+	passert.Count(s, output, "post-join", 2)
+	amyOut, jamesOut := beam.ParDo2(s, splitByName, output)
+	passert.Equals(s, amyOut, "amy@example.com", "111-222-3333")
+	passert.Equals(s, jamesOut, "james@email.com", "james@example.com", "222-333-4444")
+	return p
+}
+
+func asymJoinFn(k, v string, mapSide func(string) func(*string) bool) (string, []string) {
+	var out string
+	var results []string
+	results = append(results, v)
+	iter := mapSide(k)
+	for iter(&out) {
+		results = append(results, out)
+	}
+	return k, results
+}
+
+func splitByName(key string, vals []string, a, j func(string)) {

Review comment:
       Consider adding a 3rd "catch all" output that is used if the keys don't match as a default case. Then it can be verified to be empty with `passert.Empty`.

##########
File path: sdks/go/test/integration/primitives/pardo.go
##########
@@ -81,3 +87,71 @@ func ParDoKVSideInput() *beam.Pipeline {
 
 	return p
 }
+
+type stringPair struct {
+	K, V string
+}
+
+func splitStringPair(e stringPair) (string, string) {
+	return e.K, e.V
+}
+
+var emailSlice = []stringPair{
+	{"amy", "amy@example.com"},
+	{"james", "james@email.com"},
+	{"carl", "carl@example.com"},
+	{"julia", "julia@example.com"},
+	{"carl", "carl@email.com"},
+	{"james", "james@example.com"},
+}
+
+var phoneSlice = []stringPair{
+	{"amy", "111-222-3333"},
+	{"james", "222-333-4444"},
+}
+
+// CreateAndSplit makes a KV PCollection from a list of stringPair types
+func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
+	initial := beam.CreateList(s, input)
+	return beam.ParDo(s, splitStringPair, initial)
+}
+
+// ParDoMultiMapSideInput checkls that the multimap side input access pattern
+// works correctly, properly producing the correct output. for the asymmetric joing
+func ParDoMultiMapSideInput() *beam.Pipeline {
+	beam.Init()
+	p, s := beam.NewPipelineWithRoot()
+	emailsKV := CreateAndSplit(s.Scope("CreateEmails"), emailSlice)
+	phonesKV := CreateAndSplit(s.Scope("CreatePhones"), phoneSlice)
+	output := beam.ParDo(s, asymJoinFn, phonesKV, beam.SideInput{Input: emailsKV})
+	passert.Count(s, output, "post-join", 2)
+	amyOut, jamesOut := beam.ParDo2(s, splitByName, output)
+	passert.Equals(s, amyOut, "amy@example.com", "111-222-3333")
+	passert.Equals(s, jamesOut, "james@email.com", "james@example.com", "222-333-4444")
+	return p
+}
+
+func asymJoinFn(k, v string, mapSide func(string) func(*string) bool) (string, []string) {
+	var out string
+	var results []string
+	results = append(results, v)

Review comment:
       ```suggestion
   	results := []string{v}
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on pull request #15981: [BEAM-3293] Finish E2E implementation of MultiMap side inputs, add integration test

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on pull request #15981:
URL: https://github.com/apache/beam/pull/15981#issuecomment-969376265


   R: @lostluck 
   CC: @youngoli @riteshghorse 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15981: [BEAM-3293] Finish E2E implementation of MultiMap side inputs, add integration test

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15981:
URL: https://github.com/apache/beam/pull/15981#issuecomment-969403471


   Consider adding an entry to CHANGES.md for this as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jrmccluskey commented on a change in pull request #15981: [BEAM-3293] Finish E2E implementation of MultiMap side inputs, add integration test

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on a change in pull request #15981:
URL: https://github.com/apache/beam/pull/15981#discussion_r750309720



##########
File path: sdks/go/test/integration/integration.go
##########
@@ -67,6 +67,8 @@ var directFilters = []string{
 	"TestTestStream.*",
 	// (BEAM-13075): The direct runner does not support windowed side inputs
 	"TestValidateWindowedSideInputs",
+	// The direct runner does not currently support multimap side inputs

Review comment:
       Done. I also updated the JIRA to cover both side input changes in the direct runner




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck merged pull request #15981: [BEAM-3293] Finish E2E implementation of MultiMap side inputs, add integration test

Posted by GitBox <gi...@apache.org>.
lostluck merged pull request #15981:
URL: https://github.com/apache/beam/pull/15981


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org