You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/08 23:22:51 UTC

[GitHub] [beam] lostluck opened a new pull request #15698: [BEAM-13013] Extend cross language expansion.

lostluck opened a new pull request #15698:
URL: https://github.com/apache/beam/pull/15698


   Extends cross language expansion to be able to substitute the default handler.
   
   In a future PR, will be used to replace and remove the existing impementations of beam.External and beam.ExternalTagged, as well as allow for simpler usage for expansion services, having the go code download jars, and spin up, query, and shutdown a beam java ExpansionService inside the CrossLanguge call.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] htyleo commented on pull request #15698: [BEAM-13013] Extend cross language expansion.

Posted by GitBox <gi...@apache.org>.
htyleo commented on pull request #15698:
URL: https://github.com/apache/beam/pull/15698#issuecomment-941383479


   LGTM. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] htyleo edited a comment on pull request #15698: [BEAM-13013] Extend cross language expansion.

Posted by GitBox <gi...@apache.org>.
htyleo edited a comment on pull request #15698:
URL: https://github.com/apache/beam/pull/15698#issuecomment-940370691


   Thanks for working on this extension!
   
   It does seem cleaner to consolidate the methods on HandlerParams. For SQL implementation, what we actually need are:
   1) input names + types (ExternalTransform.InputsMap + MultiEdge.Input.Type)
   2) output type (ExternalTransform.OutputsMap + MultiEdge.Output.Type)
   3) payload (ExternalTransform.Payload)
   
   Other HandlerParams info (windowing strategy, coder, etc) are used to properly construct the resulting Components and Transform in ExpansionResponse. If we can come up with some helper functions/methods, we may not need to expose them and can make user's life easier.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] htyleo commented on a change in pull request #15698: [BEAM-13013] Extend cross language expansion.

Posted by GitBox <gi...@apache.org>.
htyleo commented on a change in pull request #15698:
URL: https://github.com/apache/beam/pull/15698#discussion_r727442013



##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/registry.go
##########
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package xlangx
+
+import (
+	"context"
+	"fmt"
+	"net/url"
+	"strings"
+	"sync"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+)
+
+var defaultReg = newRegistry()
+
+// RegisterHandler associates a namespace with a HandlerFunc which can be used to
+// replace calls to a Beam ExpansionService.
+//
+// Then, expansion addresses of the forms
+//   "<namespace>" or
+//   "<namespace>:<configuration>"
+// can be used with beam.CrossLanguage. Any configuration after the separator is
+// provided to the HandlerFunc on call for the handler func to use at it's leisure.
+func RegisterHandler(namespace string, handler HandlerFunc) {
+	if err := defaultReg.RegisterHandler(namespace, handler); err != nil {
+		panic(err)
+	}
+}
+
+// RegisterExpansionForUrn overrides which expansion address is used to
+// expand a specific transform URN. The expansion address must be a URL
+// or be a namespaced handler registered with RegisterHandler.
+//
+// When the expansion address is for a handler, it may take the forms
+//  "<namespace>" or
+//  "<namespace>:<configuration>"
+func RegisterOverrideForUrn(urn, expansionAddr string) {
+	if err := defaultReg.RegisterOverrideForUrn(urn, expansionAddr); err != nil {
+		panic(err)
+	}
+}
+
+// HandlerParams is the parameter to an expansion service handler.
+type HandlerParams struct {
+	// Additional parameterization string, if any.
+	Config string
+
+	Req *jobpb.ExpansionRequest
+
+	// Additional pipeline graph information for custom handling
+	// Not exported to avoid mutation.
+	edge *graph.MultiEdge
+	ext  *graph.ExternalTransform

Review comment:
       ar, I see.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] htyleo commented on pull request #15698: [BEAM-13013] Extend cross language expansion.

Posted by GitBox <gi...@apache.org>.
htyleo commented on pull request #15698:
URL: https://github.com/apache/beam/pull/15698#issuecomment-941383479


   LGTM. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] htyleo commented on a change in pull request #15698: [BEAM-13013] Extend cross language expansion.

Posted by GitBox <gi...@apache.org>.
htyleo commented on a change in pull request #15698:
URL: https://github.com/apache/beam/pull/15698#discussion_r726504957



##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/registry.go
##########
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package xlangx
+
+import (
+	"context"
+	"fmt"
+	"net/url"
+	"strings"
+	"sync"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+)
+
+var defaultReg = newRegistry()
+
+// RegisterHandler associates a namespace with a HandlerFunc which can be used to
+// replace calls to a Beam ExpansionService.
+//
+// Then, expansion addresses of the forms
+//   "<namespace>" or
+//   "<namespace>:<configuration>"
+// can be used with beam.CrossLanguage. Any configuration after the separator is
+// provided to the HandlerFunc on call for the handler func to use at it's leisure.
+func RegisterHandler(namespace string, handler HandlerFunc) {
+	if err := defaultReg.RegisterHandler(namespace, handler); err != nil {
+		panic(err)
+	}
+}
+
+// RegisterExpansionForUrn overrides which expansion address is used to
+// expand a specific transform URN. The expansion address must be a URL
+// or be a namespaced handler registered with RegisterHandler.
+//
+// When the expansion address is for a handler, it may take the forms
+//  "<namespace>" or
+//  "<namespace>:<configuration>"
+func RegisterOverrideForUrn(urn, expansionAddr string) {
+	if err := defaultReg.RegisterOverrideForUrn(urn, expansionAddr); err != nil {
+		panic(err)
+	}
+}
+
+// HandlerParams is the parameter to an expansion service handler.
+type HandlerParams struct {
+	// Additional parameterization string, if any.
+	Config string
+
+	Req *jobpb.ExpansionRequest
+
+	// Additional pipeline graph information for custom handling
+	// Not exported to avoid mutation.
+	edge *graph.MultiEdge
+	ext  *graph.ExternalTransform

Review comment:
       ExternalTransform.Payload also needs to be exposed?

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/registry.go
##########
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package xlangx
+
+import (
+	"context"
+	"fmt"
+	"net/url"
+	"strings"
+	"sync"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+)
+
+var defaultReg = newRegistry()
+
+// RegisterHandler associates a namespace with a HandlerFunc which can be used to
+// replace calls to a Beam ExpansionService.
+//
+// Then, expansion addresses of the forms
+//   "<namespace>" or
+//   "<namespace>:<configuration>"
+// can be used with beam.CrossLanguage. Any configuration after the separator is
+// provided to the HandlerFunc on call for the handler func to use at it's leisure.
+func RegisterHandler(namespace string, handler HandlerFunc) {
+	if err := defaultReg.RegisterHandler(namespace, handler); err != nil {
+		panic(err)
+	}
+}
+
+// RegisterExpansionForUrn overrides which expansion address is used to
+// expand a specific transform URN. The expansion address must be a URL
+// or be a namespaced handler registered with RegisterHandler.
+//
+// When the expansion address is for a handler, it may take the forms
+//  "<namespace>" or
+//  "<namespace>:<configuration>"
+func RegisterOverrideForUrn(urn, expansionAddr string) {
+	if err := defaultReg.RegisterOverrideForUrn(urn, expansionAddr); err != nil {
+		panic(err)
+	}
+}
+
+// HandlerParams is the parameter to an expansion service handler.
+type HandlerParams struct {
+	// Additional parameterization string, if any.
+	Config string
+
+	Req *jobpb.ExpansionRequest
+
+	// Additional pipeline graph information for custom handling
+	// Not exported to avoid mutation.
+	edge *graph.MultiEdge
+	ext  *graph.ExternalTransform
+}
+
+// CoderMarshaller returns a coder marshaller initialized with the request's namespace.
+func (p *HandlerParams) CoderMarshaller() *graphx.CoderMarshaller {
+	cm := graphx.NewCoderMarshaller()
+	cm.Namespace = p.Req.Namespace
+	return cm
+}
+
+// OutputPCollections returns the local identifiers for expected outputs
+// for this expansion service request.
+//
+// If no collections are returned, none are currently expected.
+func (p *HandlerParams) OutputPCollections() []string {
+	var out []string
+	for local := range p.ext.OutputsMap {
+		out = append(out, local)
+	}
+	return out
+}
+
+// InputPCollections returns the local identifiers for expected outputs
+// for this expansion service request.
+//
+// If no collections are returned, none are currently expected.
+func (p *HandlerParams) InputPCollections() []string {
+	var out []string
+	for local := range p.ext.InputsMap {
+		out = append(out, local)
+	}
+	return out
+}
+
+func (p *HandlerParams) panicIfMissing(m map[string]int, local string) int {
+	i, ok := m[local]
+	if !ok {
+		panic(fmt.Errorf("unknown local output identifier provided: %v", local))
+	}
+	return i
+}
+
+// OutputCoder returns the coder for the associated output PCollection.
+// Panics if local is not returned by OutputPCollections.
+func (p *HandlerParams) OutputCoder(local string) *coder.Coder {
+	i := p.panicIfMissing(p.ext.OutputsMap, local)
+	return p.edge.Output[i].To.Coder
+}
+
+// OutputType returns the full type for the associated output PCollection.
+// Panics if local is not returned by OutputPCollections.
+func (p *HandlerParams) OutputType(local string) typex.FullType {
+	i := p.panicIfMissing(p.ext.OutputsMap, local)
+	return p.edge.Output[i].Type
+}
+
+// OutputBounded returns whether the associated output PCollection is bounded.
+// Panics if local is not returned by OutputPCollections.
+func (p *HandlerParams) OutputBounded(local string) pipepb.IsBounded_Enum {
+	i := p.panicIfMissing(p.ext.OutputsMap, local)
+	return pipelinex.BoolToBounded(p.edge.Output[i].To.Bounded())
+}
+
+// OutputWindowingStrategy returns the windowing strategy for the associated output PCollection.
+// Panics if local is not returned by OutputPCollections.
+func (p *HandlerParams) OutputWindowingStrategy(local string, cm *graphx.CoderMarshaller) *pipepb.WindowingStrategy {
+	i := p.panicIfMissing(p.ext.OutputsMap, local)
+	wspb, err := graphx.MarshalWindowingStrategy(cm, p.edge.Output[i].To.WindowingStrategy())
+	if err != nil {
+		panic(fmt.Errorf("unable to marshal windowing strategy for output %v: %w", local, err))
+	}
+	return wspb
+}
+
+// OutputCoder returns the coder for the associated output PCollection.
+// Panics if local is not returned by InputPCollections.
+func (p *HandlerParams) InputCoder(local string) *coder.Coder {
+	i := p.panicIfMissing(p.ext.InputsMap, local)
+	return p.edge.Input[i].From.Coder
+}
+
+// InputType returns the full type for the associated output PCollection.
+// Panics if local is not returned by InputPCollections.
+func (p *HandlerParams) InputType(local string) typex.FullType {
+	i := p.panicIfMissing(p.ext.OutputsMap, local)
+	return p.edge.Output[i].Type

Review comment:
       Output[i] -> Input[i]




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] htyleo commented on a change in pull request #15698: [BEAM-13013] Extend cross language expansion.

Posted by GitBox <gi...@apache.org>.
htyleo commented on a change in pull request #15698:
URL: https://github.com/apache/beam/pull/15698#discussion_r726502148



##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/registry.go
##########
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package xlangx
+
+import (
+	"context"
+	"fmt"
+	"net/url"
+	"strings"
+	"sync"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+)
+
+var defaultReg = newRegistry()
+
+// RegisterHandler associates a namespace with a HandlerFunc which can be used to
+// replace calls to a Beam ExpansionService.
+//
+// Then, expansion addresses of the forms
+//   "<namespace>" or
+//   "<namespace>:<configuration>"
+// can be used with beam.CrossLanguage. Any configuration after the separator is
+// provided to the HandlerFunc on call for the handler func to use at it's leisure.
+func RegisterHandler(namespace string, handler HandlerFunc) {
+	if err := defaultReg.RegisterHandler(namespace, handler); err != nil {
+		panic(err)
+	}
+}
+
+// RegisterExpansionForUrn overrides which expansion address is used to
+// expand a specific transform URN. The expansion address must be a URL
+// or be a namespaced handler registered with RegisterHandler.
+//
+// When the expansion address is for a handler, it may take the forms
+//  "<namespace>" or
+//  "<namespace>:<configuration>"
+func RegisterOverrideForUrn(urn, expansionAddr string) {
+	if err := defaultReg.RegisterOverrideForUrn(urn, expansionAddr); err != nil {
+		panic(err)
+	}
+}
+
+// HandlerParams is the parameter to an expansion service handler.
+type HandlerParams struct {
+	// Additional parameterization string, if any.
+	Config string
+
+	Req *jobpb.ExpansionRequest
+
+	// Additional pipeline graph information for custom handling
+	// Not exported to avoid mutation.
+	edge *graph.MultiEdge
+	ext  *graph.ExternalTransform
+}
+
+// CoderMarshaller returns a coder marshaller initialized with the request's namespace.
+func (p *HandlerParams) CoderMarshaller() *graphx.CoderMarshaller {
+	cm := graphx.NewCoderMarshaller()
+	cm.Namespace = p.Req.Namespace
+	return cm
+}
+
+// OutputPCollections returns the local identifiers for expected outputs
+// for this expansion service request.
+//
+// If no collections are returned, none are currently expected.
+func (p *HandlerParams) OutputPCollections() []string {
+	var out []string
+	for local := range p.ext.OutputsMap {
+		out = append(out, local)
+	}
+	return out
+}
+
+// InputPCollections returns the local identifiers for expected outputs
+// for this expansion service request.
+//
+// If no collections are returned, none are currently expected.
+func (p *HandlerParams) InputPCollections() []string {
+	var out []string
+	for local := range p.ext.InputsMap {
+		out = append(out, local)
+	}
+	return out
+}
+
+func (p *HandlerParams) panicIfMissing(m map[string]int, local string) int {
+	i, ok := m[local]
+	if !ok {
+		panic(fmt.Errorf("unknown local output identifier provided: %v", local))
+	}
+	return i
+}
+
+// OutputCoder returns the coder for the associated output PCollection.
+// Panics if local is not returned by OutputPCollections.
+func (p *HandlerParams) OutputCoder(local string) *coder.Coder {
+	i := p.panicIfMissing(p.ext.OutputsMap, local)
+	return p.edge.Output[i].To.Coder
+}
+
+// OutputType returns the full type for the associated output PCollection.
+// Panics if local is not returned by OutputPCollections.
+func (p *HandlerParams) OutputType(local string) typex.FullType {
+	i := p.panicIfMissing(p.ext.OutputsMap, local)
+	return p.edge.Output[i].Type
+}
+
+// OutputBounded returns whether the associated output PCollection is bounded.
+// Panics if local is not returned by OutputPCollections.
+func (p *HandlerParams) OutputBounded(local string) pipepb.IsBounded_Enum {
+	i := p.panicIfMissing(p.ext.OutputsMap, local)
+	return pipelinex.BoolToBounded(p.edge.Output[i].To.Bounded())
+}
+
+// OutputWindowingStrategy returns the windowing strategy for the associated output PCollection.
+// Panics if local is not returned by OutputPCollections.
+func (p *HandlerParams) OutputWindowingStrategy(local string, cm *graphx.CoderMarshaller) *pipepb.WindowingStrategy {
+	i := p.panicIfMissing(p.ext.OutputsMap, local)
+	wspb, err := graphx.MarshalWindowingStrategy(cm, p.edge.Output[i].To.WindowingStrategy())
+	if err != nil {
+		panic(fmt.Errorf("unable to marshal windowing strategy for output %v: %w", local, err))
+	}
+	return wspb
+}
+
+// OutputCoder returns the coder for the associated output PCollection.
+// Panics if local is not returned by InputPCollections.
+func (p *HandlerParams) InputCoder(local string) *coder.Coder {
+	i := p.panicIfMissing(p.ext.InputsMap, local)
+	return p.edge.Input[i].From.Coder
+}
+
+// InputType returns the full type for the associated output PCollection.
+// Panics if local is not returned by InputPCollections.
+func (p *HandlerParams) InputType(local string) typex.FullType {
+	i := p.panicIfMissing(p.ext.OutputsMap, local)
+	return p.edge.Output[i].Type

Review comment:
       Output[i] -> Input[i]




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15698: [BEAM-13013] Extend cross language expansion.

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15698:
URL: https://github.com/apache/beam/pull/15698#issuecomment-940647659


   PTAL  switching to a set of objects dropped 40 lines which is nice.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15698: [BEAM-13013] Extend cross language expansion.

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15698:
URL: https://github.com/apache/beam/pull/15698#issuecomment-939176631


   R: @youngoli @htyleo 
   
   Separating out the External replacment to test it out a bit more first. However, this feels good to go. 
   
   Open Question: I think I might change the handful Input and Output methods on HandlerParams to instead 2 different ones, where they return a "PCollection" object of some kind consolodating all this information, since those methods are a bit messy. What do you two think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #15698: [BEAM-13013] Extend cross language expansion.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #15698:
URL: https://github.com/apache/beam/pull/15698#discussion_r726750973



##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/registry.go
##########
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package xlangx
+
+import (
+	"context"
+	"fmt"
+	"net/url"
+	"strings"
+	"sync"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+)
+
+var defaultReg = newRegistry()
+
+// RegisterHandler associates a namespace with a HandlerFunc which can be used to
+// replace calls to a Beam ExpansionService.
+//
+// Then, expansion addresses of the forms
+//   "<namespace>" or
+//   "<namespace>:<configuration>"
+// can be used with beam.CrossLanguage. Any configuration after the separator is
+// provided to the HandlerFunc on call for the handler func to use at it's leisure.
+func RegisterHandler(namespace string, handler HandlerFunc) {
+	if err := defaultReg.RegisterHandler(namespace, handler); err != nil {
+		panic(err)
+	}
+}
+
+// RegisterExpansionForUrn overrides which expansion address is used to
+// expand a specific transform URN. The expansion address must be a URL
+// or be a namespaced handler registered with RegisterHandler.
+//
+// When the expansion address is for a handler, it may take the forms
+//  "<namespace>" or
+//  "<namespace>:<configuration>"
+func RegisterOverrideForUrn(urn, expansionAddr string) {
+	if err := defaultReg.RegisterOverrideForUrn(urn, expansionAddr); err != nil {
+		panic(err)
+	}
+}
+
+// HandlerParams is the parameter to an expansion service handler.
+type HandlerParams struct {
+	// Additional parameterization string, if any.
+	Config string
+
+	Req *jobpb.ExpansionRequest
+
+	// Additional pipeline graph information for custom handling
+	// Not exported to avoid mutation.
+	edge *graph.MultiEdge
+	ext  *graph.ExternalTransform

Review comment:
       Payload is already exposed via handlerParams.Req.GetTransform().GetSpec().GetPayload(), just like it would be for the RPC expansion service.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] htyleo commented on a change in pull request #15698: [BEAM-13013] Extend cross language expansion.

Posted by GitBox <gi...@apache.org>.
htyleo commented on a change in pull request #15698:
URL: https://github.com/apache/beam/pull/15698#discussion_r727442013



##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/registry.go
##########
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package xlangx
+
+import (
+	"context"
+	"fmt"
+	"net/url"
+	"strings"
+	"sync"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+)
+
+var defaultReg = newRegistry()
+
+// RegisterHandler associates a namespace with a HandlerFunc which can be used to
+// replace calls to a Beam ExpansionService.
+//
+// Then, expansion addresses of the forms
+//   "<namespace>" or
+//   "<namespace>:<configuration>"
+// can be used with beam.CrossLanguage. Any configuration after the separator is
+// provided to the HandlerFunc on call for the handler func to use at it's leisure.
+func RegisterHandler(namespace string, handler HandlerFunc) {
+	if err := defaultReg.RegisterHandler(namespace, handler); err != nil {
+		panic(err)
+	}
+}
+
+// RegisterExpansionForUrn overrides which expansion address is used to
+// expand a specific transform URN. The expansion address must be a URL
+// or be a namespaced handler registered with RegisterHandler.
+//
+// When the expansion address is for a handler, it may take the forms
+//  "<namespace>" or
+//  "<namespace>:<configuration>"
+func RegisterOverrideForUrn(urn, expansionAddr string) {
+	if err := defaultReg.RegisterOverrideForUrn(urn, expansionAddr); err != nil {
+		panic(err)
+	}
+}
+
+// HandlerParams is the parameter to an expansion service handler.
+type HandlerParams struct {
+	// Additional parameterization string, if any.
+	Config string
+
+	Req *jobpb.ExpansionRequest
+
+	// Additional pipeline graph information for custom handling
+	// Not exported to avoid mutation.
+	edge *graph.MultiEdge
+	ext  *graph.ExternalTransform

Review comment:
       ar, I see.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] htyleo commented on a change in pull request #15698: [BEAM-13013] Extend cross language expansion.

Posted by GitBox <gi...@apache.org>.
htyleo commented on a change in pull request #15698:
URL: https://github.com/apache/beam/pull/15698#discussion_r726504072



##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/registry.go
##########
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package xlangx
+
+import (
+	"context"
+	"fmt"
+	"net/url"
+	"strings"
+	"sync"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+)
+
+var defaultReg = newRegistry()
+
+// RegisterHandler associates a namespace with a HandlerFunc which can be used to
+// replace calls to a Beam ExpansionService.
+//
+// Then, expansion addresses of the forms
+//   "<namespace>" or
+//   "<namespace>:<configuration>"
+// can be used with beam.CrossLanguage. Any configuration after the separator is
+// provided to the HandlerFunc on call for the handler func to use at it's leisure.
+func RegisterHandler(namespace string, handler HandlerFunc) {
+	if err := defaultReg.RegisterHandler(namespace, handler); err != nil {
+		panic(err)
+	}
+}
+
+// RegisterExpansionForUrn overrides which expansion address is used to
+// expand a specific transform URN. The expansion address must be a URL
+// or be a namespaced handler registered with RegisterHandler.
+//
+// When the expansion address is for a handler, it may take the forms
+//  "<namespace>" or
+//  "<namespace>:<configuration>"
+func RegisterOverrideForUrn(urn, expansionAddr string) {
+	if err := defaultReg.RegisterOverrideForUrn(urn, expansionAddr); err != nil {
+		panic(err)
+	}
+}
+
+// HandlerParams is the parameter to an expansion service handler.
+type HandlerParams struct {
+	// Additional parameterization string, if any.
+	Config string
+
+	Req *jobpb.ExpansionRequest
+
+	// Additional pipeline graph information for custom handling
+	// Not exported to avoid mutation.
+	edge *graph.MultiEdge
+	ext  *graph.ExternalTransform

Review comment:
       ExternalTransform.Payload also needs to be exposed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #15698: [BEAM-13013] Extend cross language expansion.

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #15698:
URL: https://github.com/apache/beam/pull/15698#issuecomment-940645092


   OK we should be good then.
   1 & 2. Everything's keyed on the local names right now, but right now it's currently hiding the ordered position, which I think is also necessary to expose, as the Go SDK is positionally sensitive.
   3. Payload is accessible on `handlerParams.Req.GetTransform().GetSpec().GetPayload()` , this is partly why I want to narrow things down a bit. Much information is duplicated, and some of it can't be, so I'm forcing the request to be canonical.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck merged pull request #15698: [BEAM-13013] Extend cross language expansion.

Posted by GitBox <gi...@apache.org>.
lostluck merged pull request #15698:
URL: https://github.com/apache/beam/pull/15698


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] htyleo commented on pull request #15698: [BEAM-13013] Extend cross language expansion.

Posted by GitBox <gi...@apache.org>.
htyleo commented on pull request #15698:
URL: https://github.com/apache/beam/pull/15698#issuecomment-940370691


   Thanks for working on this extension!
   
   It does seem cleaner to consolidate the methods on HandlerParams. For SQL implementation, what we actually need are:
   1) input names + types (ExternalTransform.InputsMap + MultiEdge.Input.Type)
   2) output type (ExternalTransform.OutputsMap + MultiEdge.Output.Type)
   3) payload (ExternalTransform.Payload)
   
   Other HandlerParams info (windowing strategy, coder, etc) are used properly construct the resulting Components and Transform in ExpansionResponse. If we can come up with some helper functions/methods, we may not need to expose them and can make user's life easier.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck merged pull request #15698: [BEAM-13013] Extend cross language expansion.

Posted by GitBox <gi...@apache.org>.
lostluck merged pull request #15698:
URL: https://github.com/apache/beam/pull/15698


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org