You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/14 07:18:27 UTC

[GitHub] [beam] youngoli opened a new pull request #14996: [BEAM-12380] Add KafkaIO Transforms and Kafka Taxi example.

youngoli opened a new pull request #14996:
URL: https://github.com/apache/beam/pull/14996


   This change is not entirely complete as of this commit. Remaining work:
   * Using non-default types causes a coder-related failure. Either fix, or document the failure before merging.
   * Kafka Taxi example needs to be documented.
   * There are some extra examples used for manual testing. Remove them before merging, or later once tests are added.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on pull request #14996: [BEAM-12380] Add KafkaIO Transforms and Kafka Taxi example.

Posted by GitBox <gi...@apache.org>.
youngoli commented on pull request #14996:
URL: https://github.com/apache/beam/pull/14996#issuecomment-861196287


   Made changes. Note that because I had to rebase, I had to force-push the already reviewed commit, but there were no changes to it. You can safely review the second commit. Changes:
   
   1. Various fixup.
   2. Removed unnecessary examples.
   3. Added instructions to the Kafka Taxi example, mostly based off the python equivalent.
   4. Removed the Kafka serializer and deserializer API because it's a bit unstable, and left a TODO to find a better API.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #14996: [BEAM-12380] Add KafkaIO Transforms and Kafka Taxi example.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #14996:
URL: https://github.com/apache/beam/pull/14996#discussion_r651466862



##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -40,7 +40,7 @@ const (
 	URNGBK           = "beam:transform:group_by_key:v1"
 	URNReshuffle     = "beam:transform:reshuffle:v1"
 	URNCombinePerKey = "beam:transform:combine_per_key:v1"
-	URNWindow        = "beam:transform:window:v1"
+	URNWindow        = "beam:transform:window_into:v1"

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli merged pull request #14996: [BEAM-12380] Add KafkaIO Transforms and Kafka Taxi example.

Posted by GitBox <gi...@apache.org>.
youngoli merged pull request #14996:
URL: https://github.com/apache/beam/pull/14996


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #14996: [BEAM-12380] Add KafkaIO Transforms and Kafka Taxi example.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #14996:
URL: https://github.com/apache/beam/pull/14996#discussion_r650121735



##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -40,7 +40,7 @@ const (
 	URNGBK           = "beam:transform:group_by_key:v1"
 	URNReshuffle     = "beam:transform:reshuffle:v1"
 	URNCombinePerKey = "beam:transform:combine_per_key:v1"
-	URNWindow        = "beam:transform:window:v1"
+	URNWindow        = "beam:transform:window_into:v1"

Review comment:
       This was also fixed in my windowing PR (https://github.com/apache/beam/pull/14966) You'll need to rebase.

##########
File path: sdks/go/examples/kafka/types/types.go
##########
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"context"
+	"flag"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/kafkaio"
+	log "github.com/apache/beam/sdks/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+	"reflect"
+	"time"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+)
+
+var (
+	expansionAddr    = flag.String("expansion_addr", "", "Address of Expansion Service")
+	bootstrapServers = flag.String("bootstrap_servers", "",
+		"URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")
+	topic = flag.String("topic", "kafka_integers_test", "Kafka topic to write to and read from.")
+)
+
+func init() {
+	beam.RegisterType(reflect.TypeOf((*LogFn)(nil)).Elem())
+}
+
+// LogFn is a DoFn to log rides.
+type LogFn struct{}
+
+// ProcessElement logs each element it receives.
+func (fn *LogFn) ProcessElement(ctx context.Context, elm []byte) {
+	log.Infof(ctx, "Ride info: %v", string(elm))
+}
+
+// FinishBundle waits a bit so the job server finishes receiving logs.
+func (fn *LogFn) FinishBundle() {
+	time.Sleep(2 * time.Second)
+}
+
+const intSerializer = "org.apache.kafka.common.serialization.IntegerSerializer"
+const intDeserializer = "org.apache.kafka.common.serialization.IntegerDeserializer"
+
+func main() {
+	flag.Parse()
+	beam.Init()
+
+	ctx := context.Background()
+	if *expansionAddr == "" {
+		log.Fatal(ctx, "No expansion address provided")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	elms := make([]interface{}, 100, 100)
+	for i := 1; i <= 100; i++ {
+		elms[i-1] = i
+	}
+	data := beam.Create(s, elms...)
+	kvData := beam.ParDo(s, func(elm int) (int, int) { return 200, elm }, data)

Review comment:
       Not knowing what the error you mentioned is, I'm reasonably certain you likely want to have LongSerializer/Deserializer and use int64 types here.
   
   The Go SDK tightly binds the Beam VarInt coder to int64, and uses custom coders for other integer types. So if it becomes an issue, we'd need some other solution to handle int32, float32 and float64.

##########
File path: sdks/go/pkg/beam/io/kafkaio/kafka.go
##########
@@ -0,0 +1,330 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more

Review comment:
       Question: In the eventual "a  native Go SDK Kafka IO could be written" future, do you think we should have a separate package or also include it in this one?
   
   If it's included in this one, we should name the Read and Write transforms ReadXlang and WriteXlang or similar to leave space for the native transforms.
   If they're going to be separate packages anyway, we should move this package under a io/xlang/ directory instead.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go
##########
@@ -29,6 +29,9 @@ func mergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
 
 	for _, e := range edges {
 		if e.Op == graph.External {
+			if e.External == nil {

Review comment:
       Consider changing this pair of if statements (this one and immmediately below) to match the updated if clause in purgeOutputInput. Consistency is king.
   
   Style Nit for another time. The over arching Op check should have continued instead of having the entire loop contents be indented one further. (same for purgeOutputInput frankly)

##########
File path: sdks/go/pkg/beam/io/kafkaio/kafka.go
##########
@@ -0,0 +1,330 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package kafkaio contains cross-language functionality for using Apache Kafka
+// (http://kafka.apache.org/). These transforms only work on runners that
+// support cross-language transforms.
+//
+// Setup
+//
+// Transforms specified here are cross-language transforms implemented in a
+// different SDK (listed below). During pipeline construction, the Go SDK will
+// need to connect to an expansion service containing information on these
+// transforms in their native SDK.
+//
+// To use an expansion service, it must be run as a separate process accessible
+// during pipeline construction. The address of that process must be passed to
+// the transforms in this package.
+//
+// The version of the expansion service should match the version of the Beam SDK
+// being used. For numbered releases of Beam, these expansions services are
+// released to the Maven repository as modules. For development versions of
+// Beam, it is recommended to build and run it from source using Gradle.
+//
+// Current supported SDKs, including expansion service modules and reference
+// documentation:
+// * Java
+//    - Vendored Module: beam-sdks-java-io-expansion-service
+//    - Run via Gradle: ./gradlew :sdks:java:io:expansion-service:runExpansionService
+//    - Reference Class: org.apache.beam.sdk.io.kafka.KafkaIO
+package kafkaio
+
+import (
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"reflect"
+)
+
+func init() {
+	beam.RegisterType(reflect.TypeOf((*readPayload)(nil)).Elem())
+	beam.RegisterType(reflect.TypeOf((*writePayload)(nil)).Elem())
+}
+
+type policy string
+
+const (
+	ByteArrayDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
+	ByteArraySerializer   = "org.apache.kafka.common.serialization.ByteArraySerializer"
+
+	// ProcessingTime is a timestamp policy that assigns processing time to
+	// each record. Specifically, this is the timestamp when the record becomes
+	// "current" in the reader. Further documentation can be found in Java's
+	// KafkaIO documentation.
+	ProcessingTime policy = "ProcessingTime"
+
+	// CreateTime is a timestamp policy based on the CREATE_TIME timestamps of
+	// kafka records. Requires the records to have a type set to
+	// org.apache.kafka.common.record.TimestampTypeCREATE_TIME. Further
+	// documentation can be found in Java's KafkaIO documentation.
+	CreateTime policy = "CreateTime"
+
+	// LogAppendTime is a timestamp policy that assigns Kafka's log append time
+	// (server side ingestion time) to each record. Further documentation can
+	// be found in Java's KafkaIO documentation.
+	LogAppendTime policy = "LogAppendTime"
+
+	readURN  = "beam:external:java:kafka:read:v1"
+	writeURN = "beam:external:java:kafka:write:v1"
+)
+
+// Read is a cross-language PTransform which reads from Kafka and returns a
+// KV pair for each item in the specified Kafka topics. By default, this runs
+// as an unbounded transform and outputs keys and values as raw byte arrays.
+// These properties can be changed through optional parameters.
+//
+// Read requires the address for an expansion service for Kafka Read transforms,
+// a comma-seperated list of bootstrap server addresses (see the Kafka property
+// "bootstrap.servers" for details), and at least one topic to read from.
+//
+// Read also accepts optional parameters as readOptions. All optional parameters
+// are predefined in this package as functions that return readOption. To set
+// an optional parameter, call the function within Read's function signature.
+//
+// Example of Read with required and optional parameters:
+//
+//   expansionAddr := "localhost:1234"
+//   bootstrapServer := "bootstrap-server:1234"
+//   topic := "topic_name"
+//   pcol := kafkaio.Read( s, expansionAddr, bootstrapServer, []string{topic},
+//       kafkaio.MaxNumRecords(100), kafkaio.CommitOffsetInFinalize(true))
+func Read(s beam.Scope, addr string, servers string, topics []string, opts ...readOption) beam.PCollection {
+	s = s.Scope("kafkaio.Read")
+
+	if len(topics) == 0 {
+		panic("kafkaio.Read requires at least one topic to read from.")
+	}
+
+	rpl := readPayload{
+		ConsumerConfig:    map[string]string{"bootstrap.servers": servers},
+		Topics:            topics,
+		KeyDeserializer:   ByteArrayDeserializer,
+		ValueDeserializer: ByteArrayDeserializer,
+		TimestampPolicy:   string(ProcessingTime),
+	}
+	rcfg := readConfig{
+		pl:  &rpl,
+		key: reflectx.ByteSlice,
+		val: reflectx.ByteSlice,
+	}
+	for _, opt := range opts {
+		opt(&rcfg)
+	}
+
+	pl := beam.CrossLanguagePayload(rpl)
+	outT := beam.UnnamedOutput(typex.NewKV(typex.New(rcfg.key), typex.New(rcfg.val)))
+	out := beam.CrossLanguage(s, readURN, pl, addr, nil, outT)
+	return out[graph.UnnamedOutputTag]
+}
+
+type readOption func(*readConfig)
+type readConfig struct {
+	pl  *readPayload
+	key reflect.Type
+	val reflect.Type
+}
+
+// ConsumerConfigs is a Read option that adds consumer properties to the
+// Consumer configuration of the transform. Each usage of this adds the given
+// elements to the existing map without removing existing elements.
+//
+// Note that the "bootstrap.servers" property is automatically set by
+// kafkaio.Read and does not need to be specified via this option.
+func ConsumerConfigs(cfgs map[string]string) readOption {
+	return func(cfg *readConfig) {
+		for k, v := range cfgs {
+			cfg.pl.ConsumerConfig[k] = v
+		}
+	}
+}
+
+// KeyDeserializer is a Read option that specifies a fully-qualified Java class
+// name of a Kafka Deserializer for the topic's key, along with the
+// corresponding Go type to deserialize keys to.
+//
+// Defaults to []byte, with classname
+// "org.apache.kafka.common.serialization.ByteArrayDeserializer".
+func KeyDeserializer(classname string, keyType reflect.Type) readOption {
+	return func(cfg *readConfig) {
+		cfg.pl.KeyDeserializer = classname
+		cfg.key = keyType
+	}
+}
+
+// ValueDeserializer is a Read option that specifies a fully-qualified Java
+// class name of a Kafka Deserializer for the topic's value, along with the
+// corresponding Go type to deserialize values to.
+//
+// Defaults to []byte, with classname
+// "org.apache.kafka.common.serialization.ByteArrayDeserializer".
+func ValueDeserializer(classname string, valType reflect.Type) readOption {
+	return func(cfg *readConfig) {
+		cfg.pl.ValueDeserializer = classname
+		cfg.val = valType
+	}
+}

Review comment:
       Regarding Deserializers and Serializers, it doesn't seem like we could really use anything other than ByteArray or Integer. I assume that when Integer is used, we receive varint encoded data. At that point, we would always require int64s as the element type.
   
   Similarly, Anything else would likely be ByteArraySerialized anyway, if only to be transported between the SDKs for decoding on the Go side. At that point, the SDK would use whatever coder we have for the specified type to try and interpret the bytes anyway whether that's schema coding, proto coding, avro, etc. It depends on what's registered for the type.
   
   So my suggestion would be to drop the class name parameter here so the more common usage can be shorter, as most uses will just re-specify kafkaio.ByteArraySerializer every time. Or have some other short convenience options where the byte array serializer isn't re-specified.
   
   Similarly, for integerSerializer, consider having a parameterless fixed options like ReadValueInt64() and ReadKeyInt64() instead. On the write side we could do the same, or similarly infer the serializer when int64s are used as the input pcollection type, rather than manually overriding.
   
   How many of these serializers exist anyway? (https://kafka.apache.org/27/javadoc/org/apache/kafka/common/serialization/package-frame.html) 
   
   I think for most of these we'd want to have specialized handling anyway since the coding/maping to types isn't necessarily clean between Java and Go. (eg. LongSerializer correctly maps to int64, likely with varint as the SDK interchange, and Integer would be int32, but I'm not sure if varint is the SDK interchange...)
   
   Either way, we should consider making the right inference and handling simpler for users where possible.
   
   For now, I think punting to later, and making the "[]byte -> my desired type (and vice versa)" using whatever the Go SDK coder is is probably the right move to start, and we can expand additional to conveniences for the other serializers later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #14996: [BEAM-12380] Add KafkaIO Transforms and Kafka Taxi example.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #14996:
URL: https://github.com/apache/beam/pull/14996#discussion_r651467016



##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go
##########
@@ -29,6 +29,9 @@ func mergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
 
 	for _, e := range edges {
 		if e.Op == graph.External {
+			if e.External == nil {

Review comment:
       Done, and I fixed up the nit too. I figured it was unlikely I'd remember to get to it otherwise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on pull request #14996: [BEAM-12380] Add KafkaIO Transforms and Kafka Taxi example.

Posted by GitBox <gi...@apache.org>.
youngoli commented on pull request #14996:
URL: https://github.com/apache/beam/pull/14996#issuecomment-859280192


   R: @lostluck 
   
   As the title indicates, this change is not _quite_ done, but it's close enough that I think it's ready for review and I don't want to delay it any longer since it's already spent a bit too long brewing.
   
   I documented all the changes you can expect up above, but to reiterate: You can skip all the Kafka examples I wrote except the top-level one (examples/kafka/taxi.go) that I'll be keeping. All other files will probably only have minor changes and should be ready enough for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on pull request #14996: [BEAM-12380] Add KafkaIO Transforms and Kafka Taxi example.

Posted by GitBox <gi...@apache.org>.
youngoli commented on pull request #14996:
URL: https://github.com/apache/beam/pull/14996#issuecomment-861923839


   Run GoPortable PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #14996: [BEAM-12380] Add KafkaIO Transforms and Kafka Taxi example.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #14996:
URL: https://github.com/apache/beam/pull/14996#discussion_r651469744



##########
File path: sdks/go/examples/kafka/types/types.go
##########
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"context"
+	"flag"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/kafkaio"
+	log "github.com/apache/beam/sdks/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+	"reflect"
+	"time"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+)
+
+var (
+	expansionAddr    = flag.String("expansion_addr", "", "Address of Expansion Service")
+	bootstrapServers = flag.String("bootstrap_servers", "",
+		"URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")
+	topic = flag.String("topic", "kafka_integers_test", "Kafka topic to write to and read from.")
+)
+
+func init() {
+	beam.RegisterType(reflect.TypeOf((*LogFn)(nil)).Elem())
+}
+
+// LogFn is a DoFn to log rides.
+type LogFn struct{}
+
+// ProcessElement logs each element it receives.
+func (fn *LogFn) ProcessElement(ctx context.Context, elm []byte) {
+	log.Infof(ctx, "Ride info: %v", string(elm))
+}
+
+// FinishBundle waits a bit so the job server finishes receiving logs.
+func (fn *LogFn) FinishBundle() {
+	time.Sleep(2 * time.Second)
+}
+
+const intSerializer = "org.apache.kafka.common.serialization.IntegerSerializer"
+const intDeserializer = "org.apache.kafka.common.serialization.IntegerDeserializer"
+
+func main() {
+	flag.Parse()
+	beam.Init()
+
+	ctx := context.Background()
+	if *expansionAddr == "" {
+		log.Fatal(ctx, "No expansion address provided")
+	}
+
+	p := beam.NewPipeline()
+	s := p.Root()
+
+	elms := make([]interface{}, 100, 100)
+	for i := 1; i <= 100; i++ {
+		elms[i-1] = i
+	}
+	data := beam.Create(s, elms...)
+	kvData := beam.ParDo(s, func(elm int) (int, int) { return 200, elm }, data)

Review comment:
       Yeah switching it to int64 seemed to work, although I couldn't get the pipeline to ultimately give output. Regardless, I ended up punting this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #14996: [BEAM-12380] Add KafkaIO Transforms and Kafka Taxi example.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #14996:
URL: https://github.com/apache/beam/pull/14996#discussion_r652223396



##########
File path: sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go
##########
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package kafkaio contains cross-language functionality for using Apache Kafka
+// (http://kafka.apache.org/). These transforms only work on runners that
+// support cross-language transforms.
+//
+// Setup
+//
+// Transforms specified here are cross-language transforms implemented in a
+// different SDK (listed below). During pipeline construction, the Go SDK will
+// need to connect to an expansion service containing information on these
+// transforms in their native SDK.
+//
+// To use an expansion service, it must be run as a separate process accessible
+// during pipeline construction. The address of that process must be passed to
+// the transforms in this package.
+//
+// The version of the expansion service should match the version of the Beam SDK
+// being used. For numbered releases of Beam, these expansions services are
+// released to the Maven repository as modules. For development versions of
+// Beam, it is recommended to build and run it from source using Gradle.
+//
+// Current supported SDKs, including expansion service modules and reference
+// documentation:
+// * Java
+//    - Vendored Module: beam-sdks-java-io-expansion-service
+//    - Run via Gradle: ./gradlew :sdks:java:io:expansion-service:runExpansionService
+//    - Reference Class: org.apache.beam.sdk.io.kafka.KafkaIO
+package kafkaio
+
+// TODO(BEAM-12492): Implement an API for specifying Kafka type serializers and
+// deserializers.
+
+import (
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+)
+
+func init() {
+	beam.RegisterType(reflect.TypeOf((*readPayload)(nil)).Elem())
+	beam.RegisterType(reflect.TypeOf((*writePayload)(nil)).Elem())
+}
+
+type policy string
+
+const (
+	ByteArrayDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
+	ByteArraySerializer   = "org.apache.kafka.common.serialization.ByteArraySerializer"
+
+	// ProcessingTime is a timestamp policy that assigns processing time to
+	// each record. Specifically, this is the timestamp when the record becomes
+	// "current" in the reader. Further documentation can be found in Java's
+	// KafkaIO documentation.
+	ProcessingTime policy = "ProcessingTime"
+
+	// CreateTime is a timestamp policy based on the CREATE_TIME timestamps of
+	// kafka records. Requires the records to have a type set to
+	// org.apache.kafka.common.record.TimestampTypeCREATE_TIME. Further
+	// documentation can be found in Java's KafkaIO documentation.
+	CreateTime policy = "CreateTime"
+
+	// LogAppendTime is a timestamp policy that assigns Kafka's log append time
+	// (server side ingestion time) to each record. Further documentation can
+	// be found in Java's KafkaIO documentation.
+	LogAppendTime policy = "LogAppendTime"
+
+	readURN  = "beam:external:java:kafka:read:v1"
+	writeURN = "beam:external:java:kafka:write:v1"
+)
+
+// Read is a cross-language PTransform which reads from Kafka and returns a
+// KV pair for each item in the specified Kafka topics. By default, this runs
+// as an unbounded transform and outputs keys and values as raw byte arrays.
+// These properties can be changed through optional parameters.
+//
+// Read requires the address for an expansion service for Kafka Read transforms,
+// a comma-seperated list of bootstrap server addresses (see the Kafka property
+// "bootstrap.servers" for details), and at least one topic to read from.
+//
+// Read also accepts optional parameters as readOptions. All optional parameters
+// are predefined in this package as functions that return readOption. To set
+// an optional parameter, call the function within Read's function signature.
+//
+// Example of Read with required and optional parameters:
+//
+//   expansionAddr := "localhost:1234"
+//   bootstrapServer := "bootstrap-server:1234"
+//   topic := "topic_name"
+//   pcol := kafkaio.Read( s, expansionAddr, bootstrapServer, []string{topic},
+//       kafkaio.MaxNumRecords(100), kafkaio.CommitOffsetInFinalize(true))
+func Read(s beam.Scope, addr string, servers string, topics []string, opts ...readOption) beam.PCollection {
+	s = s.Scope("kafkaio.Read")
+
+	if len(topics) == 0 {
+		panic("kafkaio.Read requires at least one topic to read from.")
+	}
+
+	rpl := readPayload{
+		ConsumerConfig:    map[string]string{"bootstrap.servers": servers},
+		Topics:            topics,
+		KeyDeserializer:   ByteArrayDeserializer,
+		ValueDeserializer: ByteArrayDeserializer,
+		TimestampPolicy:   string(ProcessingTime),
+	}
+	rcfg := readConfig{
+		pl:  &rpl,
+		key: reflectx.ByteSlice,
+		val: reflectx.ByteSlice,
+	}
+	for _, opt := range opts {
+		opt(&rcfg)
+	}
+
+	pl := beam.CrossLanguagePayload(rpl)
+	outT := beam.UnnamedOutput(typex.NewKV(typex.New(rcfg.key), typex.New(rcfg.val)))
+	out := beam.CrossLanguage(s, readURN, pl, addr, nil, outT)
+	return out[graph.UnnamedOutputTag]
+}
+
+type readOption func(*readConfig)
+type readConfig struct {
+	pl  *readPayload
+	key reflect.Type
+	val reflect.Type
+}
+
+// ConsumerConfigs is a Read option that adds consumer properties to the
+// Consumer configuration of the transform. Each usage of this adds the given
+// elements to the existing map without removing existing elements.
+//
+// Note that the "bootstrap.servers" property is automatically set by
+// kafkaio.Read and does not need to be specified via this option.
+func ConsumerConfigs(cfgs map[string]string) readOption {
+	return func(cfg *readConfig) {
+		for k, v := range cfgs {
+			cfg.pl.ConsumerConfig[k] = v
+		}
+	}
+}
+
+// StartReadTimestamp is a Read option that specifies a start timestamp in
+// milliseconds epoch, so only records after that timestamp will be read.
+//
+// This results in failures if one or more partitions don't contain messages
+// with a timestamp larger than or equal to the one specified, or if the
+// message format version in a partition is before 0.10.0, meaning messages do
+// not have timestamps.
+func StartReadTimestamp(ts int64) readOption {
+	return func(cfg *readConfig) {
+		cfg.pl.StartReadTime = &ts
+	}
+}
+
+// MaxNumRecords is a Read option that specifies the maximum amount of records
+// to be read. Setting this will cause the Read to execute as a bounded
+// transform. Useful for tests tests and demo applications.
+func MaxNumRecords(num int64) readOption {
+	return func(cfg *readConfig) {
+		cfg.pl.MaxNumRecords = &num
+	}
+}
+
+// MaxReadSecs is a Read option that specifies the maximum amount of time in
+// seconds the transform executes. Setting this will cause the Read to execute
+// as a bounded transform. Useful for tests and demo applications.
+func MaxReadSecs(secs int64) readOption {
+	return func(cfg *readConfig) {
+		cfg.pl.MaxReadTime = &secs
+	}
+}
+
+// CommitOffsetInFinalize is a Read option that specifies whether to commit
+// offsets when finalizing.
+//
+// Default: false
+func CommitOffsetInFinalize(enabled bool) readOption {
+	return func(cfg *readConfig) {
+		cfg.pl.CommitOffsetInFinalize = enabled
+	}
+}
+
+// TimestampPolicy is a Read option that specifies the timestamp policy to use
+// for extracting timestamps from the KafkaRecord. Must be one of the predefined
+// constant timestamp policies in this package.
+//
+// Default: kafkaio.ProcessingTime
+func TimestampPolicy(name policy) readOption {
+	return func(cfg *readConfig) {
+		cfg.pl.TimestampPolicy = string(name)
+	}
+}
+
+// readPayload should produce a schema matching the expected cross-language
+// payload for Kafka reads. An example of this on the receiving end can be
+// found in the Java SDK class
+// org.apache.beam.sdk.io.kafka.KafkaIO.Read.External.Configuration.
+type readPayload struct {
+	ConsumerConfig         map[string]string
+	Topics                 []string
+	KeyDeserializer        string
+	ValueDeserializer      string
+	StartReadTime          *int64
+	MaxNumRecords          *int64
+	MaxReadTime            *int64
+	CommitOffsetInFinalize bool
+	TimestampPolicy        string
+}
+
+// Write is a cross-language PTransform which writes KV data to a specified
+// Kafka topic. By default, this assumes keys and values to be received as raw
+// byte arrays. This can be changed through optional parameters.

Review comment:
       ```suggestion
   // Kafka topic. By default, this assumes keys and values to be received as 
   // byte slices. This can be changed through optional parameters.
   ```

##########
File path: sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go
##########
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package kafkaio contains cross-language functionality for using Apache Kafka
+// (http://kafka.apache.org/). These transforms only work on runners that
+// support cross-language transforms.
+//
+// Setup
+//
+// Transforms specified here are cross-language transforms implemented in a
+// different SDK (listed below). During pipeline construction, the Go SDK will
+// need to connect to an expansion service containing information on these
+// transforms in their native SDK.
+//
+// To use an expansion service, it must be run as a separate process accessible
+// during pipeline construction. The address of that process must be passed to
+// the transforms in this package.
+//
+// The version of the expansion service should match the version of the Beam SDK
+// being used. For numbered releases of Beam, these expansions services are
+// released to the Maven repository as modules. For development versions of
+// Beam, it is recommended to build and run it from source using Gradle.
+//
+// Current supported SDKs, including expansion service modules and reference
+// documentation:
+// * Java
+//    - Vendored Module: beam-sdks-java-io-expansion-service
+//    - Run via Gradle: ./gradlew :sdks:java:io:expansion-service:runExpansionService
+//    - Reference Class: org.apache.beam.sdk.io.kafka.KafkaIO
+package kafkaio
+
+// TODO(BEAM-12492): Implement an API for specifying Kafka type serializers and
+// deserializers.
+
+import (
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+)
+
+func init() {
+	beam.RegisterType(reflect.TypeOf((*readPayload)(nil)).Elem())
+	beam.RegisterType(reflect.TypeOf((*writePayload)(nil)).Elem())
+}
+
+type policy string
+
+const (
+	ByteArrayDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
+	ByteArraySerializer   = "org.apache.kafka.common.serialization.ByteArraySerializer"
+
+	// ProcessingTime is a timestamp policy that assigns processing time to
+	// each record. Specifically, this is the timestamp when the record becomes
+	// "current" in the reader. Further documentation can be found in Java's
+	// KafkaIO documentation.
+	ProcessingTime policy = "ProcessingTime"
+
+	// CreateTime is a timestamp policy based on the CREATE_TIME timestamps of
+	// kafka records. Requires the records to have a type set to
+	// org.apache.kafka.common.record.TimestampTypeCREATE_TIME. Further
+	// documentation can be found in Java's KafkaIO documentation.
+	CreateTime policy = "CreateTime"
+
+	// LogAppendTime is a timestamp policy that assigns Kafka's log append time
+	// (server side ingestion time) to each record. Further documentation can
+	// be found in Java's KafkaIO documentation.
+	LogAppendTime policy = "LogAppendTime"
+
+	readURN  = "beam:external:java:kafka:read:v1"
+	writeURN = "beam:external:java:kafka:write:v1"
+)
+
+// Read is a cross-language PTransform which reads from Kafka and returns a
+// KV pair for each item in the specified Kafka topics. By default, this runs
+// as an unbounded transform and outputs keys and values as raw byte arrays.

Review comment:
       ```suggestion
   // as an unbounded transform and outputs keys and values as byte slices.
   ```
   Byte arrays have a different meaning in Go (arrays are all fixed length), and the user would be receiving them as slices.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #14996: [BEAM-12380] Add KafkaIO Transforms and Kafka Taxi example.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #14996:
URL: https://github.com/apache/beam/pull/14996#discussion_r651468480



##########
File path: sdks/go/pkg/beam/io/kafkaio/kafka.go
##########
@@ -0,0 +1,330 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more

Review comment:
       I moved it to the io/xlang/kafka.go, that seemed like the best approach. Besides the fact that it allows for a native implementation later, it seems like a smoother way to make it self-documenting that it's cross language.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #14996: [BEAM-12380] Add KafkaIO Transforms and Kafka Taxi example.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #14996:
URL: https://github.com/apache/beam/pull/14996#discussion_r651468815



##########
File path: sdks/go/pkg/beam/io/kafkaio/kafka.go
##########
@@ -0,0 +1,330 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package kafkaio contains cross-language functionality for using Apache Kafka
+// (http://kafka.apache.org/). These transforms only work on runners that
+// support cross-language transforms.
+//
+// Setup
+//
+// Transforms specified here are cross-language transforms implemented in a
+// different SDK (listed below). During pipeline construction, the Go SDK will
+// need to connect to an expansion service containing information on these
+// transforms in their native SDK.
+//
+// To use an expansion service, it must be run as a separate process accessible
+// during pipeline construction. The address of that process must be passed to
+// the transforms in this package.
+//
+// The version of the expansion service should match the version of the Beam SDK
+// being used. For numbered releases of Beam, these expansions services are
+// released to the Maven repository as modules. For development versions of
+// Beam, it is recommended to build and run it from source using Gradle.
+//
+// Current supported SDKs, including expansion service modules and reference
+// documentation:
+// * Java
+//    - Vendored Module: beam-sdks-java-io-expansion-service
+//    - Run via Gradle: ./gradlew :sdks:java:io:expansion-service:runExpansionService
+//    - Reference Class: org.apache.beam.sdk.io.kafka.KafkaIO
+package kafkaio
+
+import (
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"reflect"
+)
+
+func init() {
+	beam.RegisterType(reflect.TypeOf((*readPayload)(nil)).Elem())
+	beam.RegisterType(reflect.TypeOf((*writePayload)(nil)).Elem())
+}
+
+type policy string
+
+const (
+	ByteArrayDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
+	ByteArraySerializer   = "org.apache.kafka.common.serialization.ByteArraySerializer"
+
+	// ProcessingTime is a timestamp policy that assigns processing time to
+	// each record. Specifically, this is the timestamp when the record becomes
+	// "current" in the reader. Further documentation can be found in Java's
+	// KafkaIO documentation.
+	ProcessingTime policy = "ProcessingTime"
+
+	// CreateTime is a timestamp policy based on the CREATE_TIME timestamps of
+	// kafka records. Requires the records to have a type set to
+	// org.apache.kafka.common.record.TimestampTypeCREATE_TIME. Further
+	// documentation can be found in Java's KafkaIO documentation.
+	CreateTime policy = "CreateTime"
+
+	// LogAppendTime is a timestamp policy that assigns Kafka's log append time
+	// (server side ingestion time) to each record. Further documentation can
+	// be found in Java's KafkaIO documentation.
+	LogAppendTime policy = "LogAppendTime"
+
+	readURN  = "beam:external:java:kafka:read:v1"
+	writeURN = "beam:external:java:kafka:write:v1"
+)
+
+// Read is a cross-language PTransform which reads from Kafka and returns a
+// KV pair for each item in the specified Kafka topics. By default, this runs
+// as an unbounded transform and outputs keys and values as raw byte arrays.
+// These properties can be changed through optional parameters.
+//
+// Read requires the address for an expansion service for Kafka Read transforms,
+// a comma-seperated list of bootstrap server addresses (see the Kafka property
+// "bootstrap.servers" for details), and at least one topic to read from.
+//
+// Read also accepts optional parameters as readOptions. All optional parameters
+// are predefined in this package as functions that return readOption. To set
+// an optional parameter, call the function within Read's function signature.
+//
+// Example of Read with required and optional parameters:
+//
+//   expansionAddr := "localhost:1234"
+//   bootstrapServer := "bootstrap-server:1234"
+//   topic := "topic_name"
+//   pcol := kafkaio.Read( s, expansionAddr, bootstrapServer, []string{topic},
+//       kafkaio.MaxNumRecords(100), kafkaio.CommitOffsetInFinalize(true))
+func Read(s beam.Scope, addr string, servers string, topics []string, opts ...readOption) beam.PCollection {
+	s = s.Scope("kafkaio.Read")
+
+	if len(topics) == 0 {
+		panic("kafkaio.Read requires at least one topic to read from.")
+	}
+
+	rpl := readPayload{
+		ConsumerConfig:    map[string]string{"bootstrap.servers": servers},
+		Topics:            topics,
+		KeyDeserializer:   ByteArrayDeserializer,
+		ValueDeserializer: ByteArrayDeserializer,
+		TimestampPolicy:   string(ProcessingTime),
+	}
+	rcfg := readConfig{
+		pl:  &rpl,
+		key: reflectx.ByteSlice,
+		val: reflectx.ByteSlice,
+	}
+	for _, opt := range opts {
+		opt(&rcfg)
+	}
+
+	pl := beam.CrossLanguagePayload(rpl)
+	outT := beam.UnnamedOutput(typex.NewKV(typex.New(rcfg.key), typex.New(rcfg.val)))
+	out := beam.CrossLanguage(s, readURN, pl, addr, nil, outT)
+	return out[graph.UnnamedOutputTag]
+}
+
+type readOption func(*readConfig)
+type readConfig struct {
+	pl  *readPayload
+	key reflect.Type
+	val reflect.Type
+}
+
+// ConsumerConfigs is a Read option that adds consumer properties to the
+// Consumer configuration of the transform. Each usage of this adds the given
+// elements to the existing map without removing existing elements.
+//
+// Note that the "bootstrap.servers" property is automatically set by
+// kafkaio.Read and does not need to be specified via this option.
+func ConsumerConfigs(cfgs map[string]string) readOption {
+	return func(cfg *readConfig) {
+		for k, v := range cfgs {
+			cfg.pl.ConsumerConfig[k] = v
+		}
+	}
+}
+
+// KeyDeserializer is a Read option that specifies a fully-qualified Java class
+// name of a Kafka Deserializer for the topic's key, along with the
+// corresponding Go type to deserialize keys to.
+//
+// Defaults to []byte, with classname
+// "org.apache.kafka.common.serialization.ByteArrayDeserializer".
+func KeyDeserializer(classname string, keyType reflect.Type) readOption {
+	return func(cfg *readConfig) {
+		cfg.pl.KeyDeserializer = classname
+		cfg.key = keyType
+	}
+}
+
+// ValueDeserializer is a Read option that specifies a fully-qualified Java
+// class name of a Kafka Deserializer for the topic's value, along with the
+// corresponding Go type to deserialize values to.
+//
+// Defaults to []byte, with classname
+// "org.apache.kafka.common.serialization.ByteArrayDeserializer".
+func ValueDeserializer(classname string, valType reflect.Type) readOption {
+	return func(cfg *readConfig) {
+		cfg.pl.ValueDeserializer = classname
+		cfg.val = valType
+	}
+}

Review comment:
       Yeah, I was brainstorming some of this too. I went with punting this to later and just leaving it as only []byte.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org