You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/01 17:35:13 UTC

[GitHub] [beam] n-oden opened a new pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

n-oden opened a new pull request #15858:
URL: https://github.com/apache/beam/pull/15858


   This change adds support for writing PCollections of `KV<String, Map<String,String>>` to [Redis Streams](https://redis.io/topics/streams-intro)
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [X] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [X] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [X] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-986632262


   @n-oden Sorry, I was off in the last 3 weeks. I'll try to take a look asap


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-991311919


   @mosche I think I've addressed most/all of your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-991751346


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r767964402



##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -105,6 +107,57 @@
  *   .apply(RedisIO.write().withEndpoint("::1", 6379))
  *
  * }</pre>
+ *
+ * <h3>Writing Redis Streams</h3>
+ *
+ * <p>{@link #writeStreams()} provides a sink to write key/value pairs represented as {@link KV}

Review comment:
       👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden removed a comment on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden removed a comment on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-959433035


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-961633082






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-961633082






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev merged pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
aromanenko-dev merged pull request #15858:
URL: https://github.com/apache/beam/pull/15858


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r767981731



##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -250,6 +260,102 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    ArrayList<String> streams = new ArrayList<String>();
+    for (int i = 0; i <= 10; i++) {
+      UUID uuid = UUID.randomUUID();
+      /* stream keys are uuids to ensure that test runs are idempotent */
+      streams.add(uuid.toString());
+    }
+    Map<String, String> fooValues = ImmutableMap.of("sensor-id", "1234", "temperature", "19.8");
+    Map<String, String> barValues = ImmutableMap.of("sensor-id", "9999", "temperature", "18.2");
+
+    List<KV<String, Map<String, String>>> fooData =
+        streams.stream().map(key -> KV.of(key, fooValues)).collect(Collectors.toList());
+
+    List<KV<String, Map<String, String>>> barData =
+        streams.stream().map(key -> KV.of(key, barValues)).collect(Collectors.toList());
+
+    List<KV<String, Map<String, String>>> allData =
+        Stream.of(fooData, barData).flatMap(Collection::stream).collect(Collectors.toList());
+
+    PCollection<KV<String, Map<String, String>>> write =
+        p.apply(
+            Create.of(allData)
+                .withCoder(
+                    KvCoder.of(
+                        StringUtf8Coder.of(),
+                        MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port));
+    p.run();
+
+    for (String stream : streams) {

Review comment:
       Woof, yes, that's much more concise, thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r766806161



##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -142,6 +144,15 @@ public static Write write() {
         .build();
   }
 
+  /** Write data to a Redis server. */
+  public static WriteStreams writeStreams() {

Review comment:
       👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden removed a comment on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden removed a comment on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-956463142






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden removed a comment on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden removed a comment on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-956463142






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-961633350


   > Run Java_Examples_Dataflow PreCommit Run Java_Examples_Dataflow_Java11 PreCommit Run Whitespace PreCommit
   
   I believe you need to make a separate comment for each one you want to run.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-961328386


   Run Java_Examples_Dataflow PreCommit
   Run Java_Examples_Dataflow_Java11 PreCommit
   Run Whitespace PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-959433035






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden removed a comment on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden removed a comment on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-959433035






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r766887718



##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -205,6 +212,62 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =

Review comment:
       Oh nice, that's a much more idiomatic pattern, thank you.  (Java is to put it mildly not my primar language.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r766855152



##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();
+
+    abstract @Nullable Long maxLen();
+
+    abstract boolean approximateTrim();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setConnectionConfiguration(
+          RedisConnectionConfiguration connectionConfiguration);
+
+      abstract Builder setMaxLen(Long maxLen);
+
+      abstract Builder setApproximateTrim(boolean approximateTrim);
+
+      abstract WriteStreams build();
+    }
+
+    public WriteStreams withEndpoint(String host, int port) {
+      checkArgument(host != null, "host can not be null");
+      checkArgument(port > 0, "port can not be negative or 0");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port))
+          .build();
+    }
+
+    public WriteStreams withAuth(String auth) {
+      checkArgument(auth != null, "auth can not be null");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
+          .build();
+    }
+
+    public WriteStreams withTimeout(int timeout) {
+      checkArgument(timeout >= 0, "timeout can not be negative");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
+          .build();
+    }
+
+    public WriteStreams withConnectionConfiguration(RedisConnectionConfiguration connection) {
+      checkArgument(connection != null, "connection can not be null");
+      return toBuilder().setConnectionConfiguration(connection).build();
+    }
+
+    public WriteStreams withMaxLen(Long maxLen) {
+      checkArgument(maxLen >= 0L, "maxLen must be positive if set");
+      return toBuilder().setMaxLen(maxLen).build();
+    }
+
+    public WriteStreams withApproximateTrim(boolean approximateTrim) {
+      return toBuilder().setApproximateTrim(approximateTrim).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<String, Map<String, String>>> input) {
+      checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
+
+      input.apply(ParDo.of(new WriteStreamFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriteStreamFn extends DoFn<KV<String, Map<String, String>>, Void> {
+
+      private static final int DEFAULT_BATCH_SIZE = 1000;
+
+      private final WriteStreams spec;
+
+      private transient Jedis jedis;
+      private transient Pipeline pipeline;
+
+      private int batchCount;
+
+      public WriteStreamFn(WriteStreams spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() {
+        jedis = spec.connectionConfiguration().connect();
+      }
+
+      @StartBundle
+      public void startBundle() {
+        pipeline = jedis.pipelined();
+        pipeline.multi();
+        batchCount = 0;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        KV<String, Map<String, String>> record = c.element();
+
+        writeRecord(record);
+
+        batchCount++;
+
+        if (batchCount >= DEFAULT_BATCH_SIZE) {
+          pipeline.exec();
+          pipeline.sync();
+          pipeline.multi();
+          batchCount = 0;
+        }
+      }
+
+      private void writeRecord(KV<String, Map<String, String>> record) {
+        String key = record.getKey();
+        Map<String, String> value = record.getValue();
+        if (spec.maxLen() > 0L) {
+          pipeline.xadd(key, StreamEntryID.NEW_ENTRY, value, spec.maxLen(), spec.approximateTrim());
+        } else {
+          pipeline.xadd(key, StreamEntryID.NEW_ENTRY, value);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() {
+        if (pipeline.isInMulti()) {
+          pipeline.exec();

Review comment:
       I feel like "make RedisIO's error handling better" is a worthwhile goal but possibly for another PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r766953848



##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();
+
+    abstract @Nullable Long maxLen();
+
+    abstract boolean approximateTrim();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setConnectionConfiguration(
+          RedisConnectionConfiguration connectionConfiguration);
+
+      abstract Builder setMaxLen(Long maxLen);
+
+      abstract Builder setApproximateTrim(boolean approximateTrim);
+
+      abstract WriteStreams build();
+    }
+
+    public WriteStreams withEndpoint(String host, int port) {
+      checkArgument(host != null, "host can not be null");
+      checkArgument(port > 0, "port can not be negative or 0");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port))
+          .build();
+    }
+
+    public WriteStreams withAuth(String auth) {
+      checkArgument(auth != null, "auth can not be null");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
+          .build();
+    }
+
+    public WriteStreams withTimeout(int timeout) {
+      checkArgument(timeout >= 0, "timeout can not be negative");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
+          .build();
+    }
+
+    public WriteStreams withConnectionConfiguration(RedisConnectionConfiguration connection) {
+      checkArgument(connection != null, "connection can not be null");
+      return toBuilder().setConnectionConfiguration(connection).build();
+    }
+
+    public WriteStreams withMaxLen(Long maxLen) {
+      checkArgument(maxLen >= 0L, "maxLen must be positive if set");
+      return toBuilder().setMaxLen(maxLen).build();
+    }
+
+    public WriteStreams withApproximateTrim(boolean approximateTrim) {
+      return toBuilder().setApproximateTrim(approximateTrim).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<String, Map<String, String>>> input) {
+      checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
+
+      input.apply(ParDo.of(new WriteStreamFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriteStreamFn extends DoFn<KV<String, Map<String, String>>, Void> {
+
+      private static final int DEFAULT_BATCH_SIZE = 1000;
+
+      private final WriteStreams spec;
+
+      private transient Jedis jedis;
+      private transient Pipeline pipeline;
+
+      private int batchCount;
+
+      public WriteStreamFn(WriteStreams spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() {
+        jedis = spec.connectionConfiguration().connect();
+      }
+
+      @StartBundle
+      public void startBundle() {
+        pipeline = jedis.pipelined();
+        pipeline.multi();
+        batchCount = 0;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        KV<String, Map<String, String>> record = c.element();
+
+        writeRecord(record);
+
+        batchCount++;
+
+        if (batchCount >= DEFAULT_BATCH_SIZE) {
+          pipeline.exec();
+          pipeline.sync();
+          pipeline.multi();
+          batchCount = 0;
+        }
+      }
+
+      private void writeRecord(KV<String, Map<String, String>> record) {
+        String key = record.getKey();
+        Map<String, String> value = record.getValue();
+        if (spec.maxLen() > 0L) {
+          pipeline.xadd(key, StreamEntryID.NEW_ENTRY, value, spec.maxLen(), spec.approximateTrim());
+        } else {
+          pipeline.xadd(key, StreamEntryID.NEW_ENTRY, value);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() {
+        if (pipeline.isInMulti()) {
+          pipeline.exec();

Review comment:
       Small followup: if we're inside a MULTI transaction, we definitely don't need to check each nested response.  Per the redis docs, ```Either all of the commands or none are processed, so a Redis transaction is also atomic.``` -- if EXEC does not throw an error, the transaction has applied.

##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -205,6 +212,62 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =
+          Stream.of(
+                  new AbstractMap.SimpleEntry<String, String>("foo", "bar"),
+                  new AbstractMap.SimpleEntry<String, String>("baz", "qux"))
+              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      data.add(KV.of(key, values));
+    }
+    PCollection<KV<String, Map<String, String>>> write =
+        p.apply(
+            Create.of(data)
+                .withCoder(
+                    KvCoder.of(
+                        StringUtf8Coder.of(),
+                        MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port));
+    p.run();
+
+    for (String key : keys) {
+      long count = client.xlen(key);
+      assertEquals(2, count);

Review comment:
       👍 

##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -205,6 +212,62 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =
+          Stream.of(
+                  new AbstractMap.SimpleEntry<String, String>("foo", "bar"),
+                  new AbstractMap.SimpleEntry<String, String>("baz", "qux"))
+              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      data.add(KV.of(key, values));
+    }
+    PCollection<KV<String, Map<String, String>>> write =
+        p.apply(
+            Create.of(data)
+                .withCoder(
+                    KvCoder.of(
+                        StringUtf8Coder.of(),
+                        MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port));
+    p.run();
+
+    for (String key : keys) {
+      long count = client.xlen(key);
+      assertEquals(2, count);
+    }
+  }
+
+  @Test
+  public void testWriteStreamsWithTruncation() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =
+          Stream.of(
+                  new AbstractMap.SimpleEntry<String, String>("foo", "bar"),
+                  new AbstractMap.SimpleEntry<String, String>("baz", "qux"))
+              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      data.add(KV.of(key, values));
+    }
+    PCollection<KV<String, Map<String, String>>> write =
+        p.apply(
+            Create.of(data)
+                .withCoder(
+                    KvCoder.of(
+                        StringUtf8Coder.of(),
+                        MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port).withMaxLen(1L));

Review comment:
       Yeah, I think I confused myself with the overly complicated test data creation pattern.  This should actually test what it claims to now.

##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -205,6 +212,62 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =

Review comment:
       Oh nice, that's a much more idiomatic pattern, thank you.  (Java is to put it mildly not my primary language.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-970324631


   @aromanenko-dev thoughts? :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-956438087


   R:@jbonofre this is my first attempt and Java is definitely not my lingua franca, so any and all feedback would be appreciated. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-956442603


   fry_squint.jpg at the java precommit test failure -- I don't think that's related to my change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-961633193


   Run Whitespace PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-962077386


   R: @jbonofre


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-959433035






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-961633082


   > @lukecwik I'm not sure what's going on here: jenkins reports that all of the builds have succeeded (e.g. [the whitespace precommit](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Commit/4400/)) but github still thinks a bunch of them are pending. In any case I believe I am passing all builds here.
   
   That looks like a glitch between Jenkins and Github. The 3 pending runs say that they are passing and this should be good to review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-961633422


   Run Java_Examples_Dataflow_Java11 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden removed a comment on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden removed a comment on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-956971051






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r768050109



##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -250,6 +263,72 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+
+    /* test data is 10 keys (stream IDs), each with two entries, each entry having one k/v pair of data */
+    List<String> redisKeys =
+        IntStream.range(0, 9).boxed().map(idx -> UUID.randomUUID().toString()).collect(toList());

Review comment:
       Just fyi, start is inclusive here but end is exclusive 🤷‍♂️ not a very consistent api... 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev edited a comment on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
aromanenko-dev edited a comment on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-989665018


   @mosche Could you take a look on this PR, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r766953848



##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();
+
+    abstract @Nullable Long maxLen();
+
+    abstract boolean approximateTrim();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setConnectionConfiguration(
+          RedisConnectionConfiguration connectionConfiguration);
+
+      abstract Builder setMaxLen(Long maxLen);
+
+      abstract Builder setApproximateTrim(boolean approximateTrim);
+
+      abstract WriteStreams build();
+    }
+
+    public WriteStreams withEndpoint(String host, int port) {
+      checkArgument(host != null, "host can not be null");
+      checkArgument(port > 0, "port can not be negative or 0");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port))
+          .build();
+    }
+
+    public WriteStreams withAuth(String auth) {
+      checkArgument(auth != null, "auth can not be null");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
+          .build();
+    }
+
+    public WriteStreams withTimeout(int timeout) {
+      checkArgument(timeout >= 0, "timeout can not be negative");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
+          .build();
+    }
+
+    public WriteStreams withConnectionConfiguration(RedisConnectionConfiguration connection) {
+      checkArgument(connection != null, "connection can not be null");
+      return toBuilder().setConnectionConfiguration(connection).build();
+    }
+
+    public WriteStreams withMaxLen(Long maxLen) {
+      checkArgument(maxLen >= 0L, "maxLen must be positive if set");
+      return toBuilder().setMaxLen(maxLen).build();
+    }
+
+    public WriteStreams withApproximateTrim(boolean approximateTrim) {
+      return toBuilder().setApproximateTrim(approximateTrim).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<String, Map<String, String>>> input) {
+      checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
+
+      input.apply(ParDo.of(new WriteStreamFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriteStreamFn extends DoFn<KV<String, Map<String, String>>, Void> {
+
+      private static final int DEFAULT_BATCH_SIZE = 1000;
+
+      private final WriteStreams spec;
+
+      private transient Jedis jedis;
+      private transient Pipeline pipeline;
+
+      private int batchCount;
+
+      public WriteStreamFn(WriteStreams spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() {
+        jedis = spec.connectionConfiguration().connect();
+      }
+
+      @StartBundle
+      public void startBundle() {
+        pipeline = jedis.pipelined();
+        pipeline.multi();
+        batchCount = 0;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        KV<String, Map<String, String>> record = c.element();
+
+        writeRecord(record);
+
+        batchCount++;
+
+        if (batchCount >= DEFAULT_BATCH_SIZE) {
+          pipeline.exec();
+          pipeline.sync();
+          pipeline.multi();
+          batchCount = 0;
+        }
+      }
+
+      private void writeRecord(KV<String, Map<String, String>> record) {
+        String key = record.getKey();
+        Map<String, String> value = record.getValue();
+        if (spec.maxLen() > 0L) {
+          pipeline.xadd(key, StreamEntryID.NEW_ENTRY, value, spec.maxLen(), spec.approximateTrim());
+        } else {
+          pipeline.xadd(key, StreamEntryID.NEW_ENTRY, value);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() {
+        if (pipeline.isInMulti()) {
+          pipeline.exec();

Review comment:
       Small followup: if we're inside a MULTI transaction, we definitely don't need to check each nested response.  Per the redis docs, ```Either all of the commands or none are processed, so a Redis transaction is also atomic.``` -- if EXEC does not throw an error, the transaction has applied.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-991311919






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r767593044



##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();
+
+    abstract @Nullable Long maxLen();
+
+    abstract boolean approximateTrim();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setConnectionConfiguration(
+          RedisConnectionConfiguration connectionConfiguration);
+
+      abstract Builder setMaxLen(Long maxLen);
+
+      abstract Builder setApproximateTrim(boolean approximateTrim);
+
+      abstract WriteStreams build();
+    }
+
+    public WriteStreams withEndpoint(String host, int port) {
+      checkArgument(host != null, "host can not be null");
+      checkArgument(port > 0, "port can not be negative or 0");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port))
+          .build();
+    }
+
+    public WriteStreams withAuth(String auth) {
+      checkArgument(auth != null, "auth can not be null");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
+          .build();
+    }
+
+    public WriteStreams withTimeout(int timeout) {
+      checkArgument(timeout >= 0, "timeout can not be negative");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
+          .build();
+    }
+
+    public WriteStreams withConnectionConfiguration(RedisConnectionConfiguration connection) {
+      checkArgument(connection != null, "connection can not be null");
+      return toBuilder().setConnectionConfiguration(connection).build();
+    }
+
+    public WriteStreams withMaxLen(Long maxLen) {
+      checkArgument(maxLen >= 0L, "maxLen must be positive if set");
+      return toBuilder().setMaxLen(maxLen).build();
+    }
+
+    public WriteStreams withApproximateTrim(boolean approximateTrim) {
+      return toBuilder().setApproximateTrim(approximateTrim).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<String, Map<String, String>>> input) {
+      checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
+
+      input.apply(ParDo.of(new WriteStreamFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriteStreamFn extends DoFn<KV<String, Map<String, String>>, Void> {
+
+      private static final int DEFAULT_BATCH_SIZE = 1000;
+
+      private final WriteStreams spec;
+
+      private transient Jedis jedis;
+      private transient Pipeline pipeline;
+
+      private int batchCount;
+
+      public WriteStreamFn(WriteStreams spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() {
+        jedis = spec.connectionConfiguration().connect();
+      }
+
+      @StartBundle
+      public void startBundle() {
+        pipeline = jedis.pipelined();
+        pipeline.multi();
+        batchCount = 0;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        KV<String, Map<String, String>> record = c.element();
+
+        writeRecord(record);
+
+        batchCount++;
+
+        if (batchCount >= DEFAULT_BATCH_SIZE) {
+          pipeline.exec();
+          pipeline.sync();
+          pipeline.multi();
+          batchCount = 0;
+        }
+      }
+
+      private void writeRecord(KV<String, Map<String, String>> record) {
+        String key = record.getKey();
+        Map<String, String> value = record.getValue();
+        if (spec.maxLen() > 0L) {
+          pipeline.xadd(key, StreamEntryID.NEW_ENTRY, value, spec.maxLen(), spec.approximateTrim());
+        } else {
+          pipeline.xadd(key, StreamEntryID.NEW_ENTRY, value);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() {
+        if (pipeline.isInMulti()) {
+          pipeline.exec();

Review comment:
       👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r766807296



##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();

Review comment:
       👍 

##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();
+
+    abstract @Nullable Long maxLen();

Review comment:
       👍 to both




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r767973423



##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -250,6 +260,102 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    ArrayList<String> streams = new ArrayList<String>();
+    for (int i = 0; i <= 10; i++) {
+      UUID uuid = UUID.randomUUID();
+      /* stream keys are uuids to ensure that test runs are idempotent */
+      streams.add(uuid.toString());
+    }
+    Map<String, String> fooValues = ImmutableMap.of("sensor-id", "1234", "temperature", "19.8");
+    Map<String, String> barValues = ImmutableMap.of("sensor-id", "9999", "temperature", "18.2");
+
+    List<KV<String, Map<String, String>>> fooData =
+        streams.stream().map(key -> KV.of(key, fooValues)).collect(Collectors.toList());
+
+    List<KV<String, Map<String, String>>> barData =
+        streams.stream().map(key -> KV.of(key, barValues)).collect(Collectors.toList());
+
+    List<KV<String, Map<String, String>>> allData =
+        Stream.of(fooData, barData).flatMap(Collection::stream).collect(Collectors.toList());

Review comment:
       Aha, thank you, I knew there had to be a simpler way to do this.  I've made the slight adjustment of preserving the randomized key names (otherwise test runs end up not necessarily being idempotent) but this is much more readable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-956971051


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden removed a comment on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden removed a comment on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-956971051


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-956444430






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-961331425


   @lukecwik I'm not sure what's going on here: jenkins reports that all of the builds have succeeded (e.g. [the whitespace precommit](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Commit/4400/)) but github still thinks a bunch of them are pending.  In any case I believe I am passing all builds here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden removed a comment on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden removed a comment on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-956463142






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden removed a comment on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden removed a comment on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-959433035


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-956971051


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-956463142


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r766972012



##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -205,6 +212,62 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =
+          Stream.of(
+                  new AbstractMap.SimpleEntry<String, String>("foo", "bar"),
+                  new AbstractMap.SimpleEntry<String, String>("baz", "qux"))
+              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      data.add(KV.of(key, values));
+    }
+    PCollection<KV<String, Map<String, String>>> write =
+        p.apply(
+            Create.of(data)
+                .withCoder(
+                    KvCoder.of(
+                        StringUtf8Coder.of(),
+                        MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port));
+    p.run();
+
+    for (String key : keys) {
+      long count = client.xlen(key);
+      assertEquals(2, count);

Review comment:
       👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r766988562



##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -205,6 +212,62 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =
+          Stream.of(
+                  new AbstractMap.SimpleEntry<String, String>("foo", "bar"),
+                  new AbstractMap.SimpleEntry<String, String>("baz", "qux"))
+              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      data.add(KV.of(key, values));
+    }
+    PCollection<KV<String, Map<String, String>>> write =
+        p.apply(
+            Create.of(data)
+                .withCoder(
+                    KvCoder.of(
+                        StringUtf8Coder.of(),
+                        MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port));
+    p.run();
+
+    for (String key : keys) {
+      long count = client.xlen(key);
+      assertEquals(2, count);
+    }
+  }
+
+  @Test
+  public void testWriteStreamsWithTruncation() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =
+          Stream.of(
+                  new AbstractMap.SimpleEntry<String, String>("foo", "bar"),
+                  new AbstractMap.SimpleEntry<String, String>("baz", "qux"))
+              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      data.add(KV.of(key, values));
+    }
+    PCollection<KV<String, Map<String, String>>> write =
+        p.apply(
+            Create.of(data)
+                .withCoder(
+                    KvCoder.of(
+                        StringUtf8Coder.of(),
+                        MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port).withMaxLen(1L));

Review comment:
       Yeah, I think I confused myself with the overly complicated test data creation pattern.  This should actually test what it claims to now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-990112276


   @mosche thanks for the thorough review!  I'll try to get all comments addressed in the next few days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
mosche commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-993553932


   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r768686614



##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();
+
+    abstract @Nullable Long maxLen();
+
+    abstract boolean approximateTrim();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setConnectionConfiguration(
+          RedisConnectionConfiguration connectionConfiguration);
+
+      abstract Builder setMaxLen(Long maxLen);
+
+      abstract Builder setApproximateTrim(boolean approximateTrim);
+
+      abstract WriteStreams build();
+    }
+
+    public WriteStreams withEndpoint(String host, int port) {
+      checkArgument(host != null, "host can not be null");
+      checkArgument(port > 0, "port can not be negative or 0");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port))
+          .build();
+    }
+
+    public WriteStreams withAuth(String auth) {
+      checkArgument(auth != null, "auth can not be null");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
+          .build();
+    }
+
+    public WriteStreams withTimeout(int timeout) {
+      checkArgument(timeout >= 0, "timeout can not be negative");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
+          .build();
+    }
+
+    public WriteStreams withConnectionConfiguration(RedisConnectionConfiguration connection) {
+      checkArgument(connection != null, "connection can not be null");
+      return toBuilder().setConnectionConfiguration(connection).build();
+    }
+
+    public WriteStreams withMaxLen(Long maxLen) {
+      checkArgument(maxLen >= 0L, "maxLen must be positive if set");
+      return toBuilder().setMaxLen(maxLen).build();
+    }
+
+    public WriteStreams withApproximateTrim(boolean approximateTrim) {
+      return toBuilder().setApproximateTrim(approximateTrim).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<String, Map<String, String>>> input) {
+      checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
+
+      input.apply(ParDo.of(new WriteStreamFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriteStreamFn extends DoFn<KV<String, Map<String, String>>, Void> {
+
+      private static final int DEFAULT_BATCH_SIZE = 1000;
+
+      private final WriteStreams spec;
+
+      private transient Jedis jedis;
+      private transient Pipeline pipeline;
+
+      private int batchCount;
+
+      public WriteStreamFn(WriteStreams spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() {
+        jedis = spec.connectionConfiguration().connect();
+      }
+
+      @StartBundle
+      public void startBundle() {
+        pipeline = jedis.pipelined();
+        pipeline.multi();
+        batchCount = 0;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        KV<String, Map<String, String>> record = c.element();
+
+        writeRecord(record);
+
+        batchCount++;
+
+        if (batchCount >= DEFAULT_BATCH_SIZE) {
+          pipeline.exec();
+          pipeline.sync();
+          pipeline.multi();
+          batchCount = 0;
+        }
+      }
+
+      private void writeRecord(KV<String, Map<String, String>> record) {
+        String key = record.getKey();
+        Map<String, String> value = record.getValue();
+        if (spec.maxLen() > 0L) {
+          pipeline.xadd(key, StreamEntryID.NEW_ENTRY, value, spec.maxLen(), spec.approximateTrim());
+        } else {
+          pipeline.xadd(key, StreamEntryID.NEW_ENTRY, value);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() {
+        if (pipeline.isInMulti()) {
+          pipeline.exec();

Review comment:
       I created a ticket to follow up on above. @n-oden if you are interested in contributing more, you are more than welcome :)
   https://issues.apache.org/jira/browse/BEAM-13458




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev edited a comment on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
aromanenko-dev edited a comment on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-964229955


   Thanks! Yes, I'll take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-991751186


   Sigh, looks like the precommit failed because google throttled an unrelated test:
   
   ```
   {
     "code" : 403,
     "errors" : [ {
       "domain" : "global",
       "reason" : "rateLimitExceeded"
     } ],
     "message" : "rateLimitExceeded"
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r768673840



##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();
+
+    abstract @Nullable Long maxLen();
+
+    abstract boolean approximateTrim();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setConnectionConfiguration(
+          RedisConnectionConfiguration connectionConfiguration);
+
+      abstract Builder setMaxLen(Long maxLen);
+
+      abstract Builder setApproximateTrim(boolean approximateTrim);
+
+      abstract WriteStreams build();
+    }
+
+    public WriteStreams withEndpoint(String host, int port) {
+      checkArgument(host != null, "host can not be null");
+      checkArgument(port > 0, "port can not be negative or 0");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port))
+          .build();
+    }
+
+    public WriteStreams withAuth(String auth) {
+      checkArgument(auth != null, "auth can not be null");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
+          .build();
+    }
+
+    public WriteStreams withTimeout(int timeout) {
+      checkArgument(timeout >= 0, "timeout can not be negative");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
+          .build();
+    }
+
+    public WriteStreams withConnectionConfiguration(RedisConnectionConfiguration connection) {
+      checkArgument(connection != null, "connection can not be null");
+      return toBuilder().setConnectionConfiguration(connection).build();
+    }
+
+    public WriteStreams withMaxLen(Long maxLen) {
+      checkArgument(maxLen >= 0L, "maxLen must be positive if set");
+      return toBuilder().setMaxLen(maxLen).build();
+    }
+
+    public WriteStreams withApproximateTrim(boolean approximateTrim) {
+      return toBuilder().setApproximateTrim(approximateTrim).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<String, Map<String, String>>> input) {
+      checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
+
+      input.apply(ParDo.of(new WriteStreamFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriteStreamFn extends DoFn<KV<String, Map<String, String>>, Void> {
+
+      private static final int DEFAULT_BATCH_SIZE = 1000;
+
+      private final WriteStreams spec;
+
+      private transient Jedis jedis;
+      private transient Pipeline pipeline;
+
+      private int batchCount;
+
+      public WriteStreamFn(WriteStreams spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() {
+        jedis = spec.connectionConfiguration().connect();
+      }
+
+      @StartBundle
+      public void startBundle() {
+        pipeline = jedis.pipelined();
+        pipeline.multi();
+        batchCount = 0;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        KV<String, Map<String, String>> record = c.element();
+
+        writeRecord(record);
+
+        batchCount++;
+
+        if (batchCount >= DEFAULT_BATCH_SIZE) {
+          pipeline.exec();
+          pipeline.sync();
+          pipeline.multi();
+          batchCount = 0;
+        }
+      }
+
+      private void writeRecord(KV<String, Map<String, String>> record) {
+        String key = record.getKey();
+        Map<String, String> value = record.getValue();
+        if (spec.maxLen() > 0L) {
+          pipeline.xadd(key, StreamEntryID.NEW_ENTRY, value, spec.maxLen(), spec.approximateTrim());
+        } else {
+          pipeline.xadd(key, StreamEntryID.NEW_ENTRY, value);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() {
+        if (pipeline.isInMulti()) {
+          pipeline.exec();

Review comment:
       I'd propose to create a separate Jira issue for error handling refactoring.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-961633477


   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-956438087






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-956444430






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-961328386






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-961328386






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden removed a comment on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden removed a comment on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-959560378


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r768078124



##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -250,6 +263,72 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+
+    /* test data is 10 keys (stream IDs), each with two entries, each entry having one k/v pair of data */
+    List<String> redisKeys =
+        IntStream.range(0, 9).boxed().map(idx -> UUID.randomUUID().toString()).collect(toList());

Review comment:
       lordy.  Fixing. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r767724831



##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -105,6 +107,57 @@
  *   .apply(RedisIO.write().withEndpoint("::1", 6379))
  *
  * }</pre>
+ *
+ * <h3>Writing Redis Streams</h3>
+ *
+ * <p>{@link #writeStreams()} provides a sink to write key/value pairs represented as {@link KV}

Review comment:
       I find the Redis docs quite intuitive there:
   
   > Appends the specified stream entry to the stream at the specified key.
   
   How about changing this to the following?
   
   ```
   {@link #writeStreams()} appends the entries of a {@link PCollection} of key/value pairs represented as {@link KV}
   to the Redis stream at the specified key using the <a href='https://redis.io/commands/XADD'>XADD</a> API.
   ```

##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -250,6 +260,102 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    ArrayList<String> streams = new ArrayList<String>();
+    for (int i = 0; i <= 10; i++) {
+      UUID uuid = UUID.randomUUID();
+      /* stream keys are uuids to ensure that test runs are idempotent */
+      streams.add(uuid.toString());
+    }
+    Map<String, String> fooValues = ImmutableMap.of("sensor-id", "1234", "temperature", "19.8");
+    Map<String, String> barValues = ImmutableMap.of("sensor-id", "9999", "temperature", "18.2");
+
+    List<KV<String, Map<String, String>>> fooData =
+        streams.stream().map(key -> KV.of(key, fooValues)).collect(Collectors.toList());
+
+    List<KV<String, Map<String, String>>> barData =
+        streams.stream().map(key -> KV.of(key, barValues)).collect(Collectors.toList());
+
+    List<KV<String, Map<String, String>>> allData =
+        Stream.of(fooData, barData).flatMap(Collection::stream).collect(Collectors.toList());
+
+    PCollection<KV<String, Map<String, String>>> write =
+        p.apply(
+            Create.of(allData)
+                .withCoder(
+                    KvCoder.of(
+                        StringUtf8Coder.of(),
+                        MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port));
+    p.run();
+
+    for (String stream : streams) {

Review comment:
       Please use matchers to express this in a more intuitive way, e.g.
   
   ```
       Set<String> streams = ImmutableSet.copyOf(transform(allData, KV::getKey)); // if using above
       for (String stream : streams) {
         List<StreamEntry> results = client.xrange(stream, null, null, Integer.MAX_VALUE);
         assertEquals(2, results.size());
         assertThat(transform(results, StreamEntry::getFields), hasItems(fooValues, barValues));
       }
   ```
   
   Note, this uses a few static imports:
   ```
   import static java.util.stream.Collectors.toList;
   import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.transform;
   import static org.hamcrest.CoreMatchers.hasItems;
   import static org.hamcrest.MatcherAssert.assertThat;
   ```

##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -250,6 +260,102 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    ArrayList<String> streams = new ArrayList<String>();
+    for (int i = 0; i <= 10; i++) {
+      UUID uuid = UUID.randomUUID();
+      /* stream keys are uuids to ensure that test runs are idempotent */
+      streams.add(uuid.toString());
+    }
+    Map<String, String> fooValues = ImmutableMap.of("sensor-id", "1234", "temperature", "19.8");
+    Map<String, String> barValues = ImmutableMap.of("sensor-id", "9999", "temperature", "18.2");
+
+    List<KV<String, Map<String, String>>> fooData =
+        streams.stream().map(key -> KV.of(key, fooValues)).collect(Collectors.toList());
+
+    List<KV<String, Map<String, String>>> barData =
+        streams.stream().map(key -> KV.of(key, barValues)).collect(Collectors.toList());
+
+    List<KV<String, Map<String, String>>> allData =
+        Stream.of(fooData, barData).flatMap(Collection::stream).collect(Collectors.toList());

Review comment:
       Just pointing out a more idiomatic  alternative here, again using streams to generate the input data (though not blocking)
   ```java
       Map<String, String> fooValues = ImmutableMap.of("sensor-id", "1234", "temperature", "19.8");
       Map<String, String> barValues = ImmutableMap.of("sensor-id", "9999", "temperature", "18.2");
   
       List<KV<String, Map<String, String>>> allData = IntStream.range(0, 10)
           .boxed()
           .flatMap(id -> Stream.of(
               KV.of("testWriteStreams"+id, fooValues),
               KV.of("testWriteStreams"+id, barValues)))
           .collect(toList());
   ```

##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -142,6 +144,15 @@ public static Write write() {
         .build();
   }
 
+  /** Write data to a Redis server. */
+  public static WriteStreams writeStreams() {

Review comment:
       Thanks for adding the docs 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r765643092



##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -205,6 +212,62 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {

Review comment:
       As the two test cases share the same streams/keys, results are currently dependent on the execution order as they share a redis instance (per class).
   
   One option to isolate tests might be to simply use different streams/keys. Alternatively you could add an additional configuration to allow the selection of a different Redis db (index) for isolation.
   
   Please note that isolation of tests is actually a critical requirement so they can be run in parallel / forked if needed.  And additionally, such tests provide good documentation and help understanding.
   
   
   

##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -142,6 +144,15 @@ public static Write write() {
         .build();
   }
 
+  /** Write data to a Redis server. */
+  public static WriteStreams writeStreams() {

Review comment:
       @n-oden Please document the usage of `writeStreams` in the Javadocs of RedisIO.

##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -205,6 +212,62 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =

Review comment:
       Please use (vendored) guava ImmutableMap to generate your maps more easily, e.g.
   ```java
   Map<String, String> values = ImmutableMap.of(
     "foo", "bar",
     "baz", "qux"
   );
   ```
   You could use stream, however, to generate your test data:
   ```java
       Map<String, String> values =
           ImmutableMap.of(
               "foo", "bar",
               "baz", "qux");
       List<KV<String, Map<String, String>>> data =
           Stream.of("a", "b", "c", "d", "e", "f", "g", "h", "i", "j")
               .map(key -> KV.of(key, values))
               .collect(Collectors.toList());
   ```

##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();

Review comment:
       You're setting a default in `writeStreams()` already, so this doesn't have to be @Nullable anymore.

##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();
+
+    abstract @Nullable Long maxLen();

Review comment:
       Same as above, `maxLen` doesn't have to be nullable and could actually be a primitive `long`

##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();
+
+    abstract @Nullable Long maxLen();
+
+    abstract boolean approximateTrim();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setConnectionConfiguration(
+          RedisConnectionConfiguration connectionConfiguration);
+
+      abstract Builder setMaxLen(Long maxLen);
+
+      abstract Builder setApproximateTrim(boolean approximateTrim);
+
+      abstract WriteStreams build();
+    }
+
+    public WriteStreams withEndpoint(String host, int port) {
+      checkArgument(host != null, "host can not be null");
+      checkArgument(port > 0, "port can not be negative or 0");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port))
+          .build();
+    }
+
+    public WriteStreams withAuth(String auth) {
+      checkArgument(auth != null, "auth can not be null");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
+          .build();
+    }
+
+    public WriteStreams withTimeout(int timeout) {
+      checkArgument(timeout >= 0, "timeout can not be negative");
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
+          .build();
+    }
+
+    public WriteStreams withConnectionConfiguration(RedisConnectionConfiguration connection) {
+      checkArgument(connection != null, "connection can not be null");
+      return toBuilder().setConnectionConfiguration(connection).build();
+    }
+
+    public WriteStreams withMaxLen(Long maxLen) {
+      checkArgument(maxLen >= 0L, "maxLen must be positive if set");
+      return toBuilder().setMaxLen(maxLen).build();
+    }
+
+    public WriteStreams withApproximateTrim(boolean approximateTrim) {
+      return toBuilder().setApproximateTrim(approximateTrim).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<String, Map<String, String>>> input) {
+      checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
+
+      input.apply(ParDo.of(new WriteStreamFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    private static class WriteStreamFn extends DoFn<KV<String, Map<String, String>>, Void> {
+
+      private static final int DEFAULT_BATCH_SIZE = 1000;
+
+      private final WriteStreams spec;
+
+      private transient Jedis jedis;
+      private transient Pipeline pipeline;
+
+      private int batchCount;
+
+      public WriteStreamFn(WriteStreams spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() {
+        jedis = spec.connectionConfiguration().connect();
+      }
+
+      @StartBundle
+      public void startBundle() {
+        pipeline = jedis.pipelined();
+        pipeline.multi();
+        batchCount = 0;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        KV<String, Map<String, String>> record = c.element();
+
+        writeRecord(record);
+
+        batchCount++;
+
+        if (batchCount >= DEFAULT_BATCH_SIZE) {
+          pipeline.exec();
+          pipeline.sync();
+          pipeline.multi();
+          batchCount = 0;
+        }
+      }
+
+      private void writeRecord(KV<String, Map<String, String>> record) {
+        String key = record.getKey();
+        Map<String, String> value = record.getValue();
+        if (spec.maxLen() > 0L) {
+          pipeline.xadd(key, StreamEntryID.NEW_ENTRY, value, spec.maxLen(), spec.approximateTrim());
+        } else {
+          pipeline.xadd(key, StreamEntryID.NEW_ENTRY, value);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() {
+        if (pipeline.isInMulti()) {
+          pipeline.exec();

Review comment:
       I'm a bit concerned about error handling here, also same in `processElement`. I know the code here happens to be the same as in the existing `write`, but probably a good time to discuss. 
   
   It's been a while for me since using `jedis`, but as far as I remember it 's important to check each response to be sure errors are not silently ignored. Where I'm honestly not sure is if all the nested responses have to be checked as well.
   
   ```java
             Response<List<Object>> resp = pipeline.exec();
             pipeline.close(); // does sync internally
             resp.get(); // this may throw
   ```
   
   

##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();
+
+    abstract @Nullable Long maxLen();
+
+    abstract boolean approximateTrim();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setConnectionConfiguration(
+          RedisConnectionConfiguration connectionConfiguration);
+
+      abstract Builder setMaxLen(Long maxLen);
+
+      abstract Builder setApproximateTrim(boolean approximateTrim);
+
+      abstract WriteStreams build();
+    }
+
+    public WriteStreams withEndpoint(String host, int port) {

Review comment:
       Would be great to add Javadocs to all the builder methods starting from here

##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -205,6 +212,62 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =
+          Stream.of(
+                  new AbstractMap.SimpleEntry<String, String>("foo", "bar"),
+                  new AbstractMap.SimpleEntry<String, String>("baz", "qux"))
+              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      data.add(KV.of(key, values));
+    }
+    PCollection<KV<String, Map<String, String>>> write =
+        p.apply(
+            Create.of(data)
+                .withCoder(
+                    KvCoder.of(
+                        StringUtf8Coder.of(),
+                        MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port));
+    p.run();
+
+    for (String key : keys) {
+      long count = client.xlen(key);
+      assertEquals(2, count);
+    }
+  }
+
+  @Test
+  public void testWriteStreamsWithTruncation() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =
+          Stream.of(
+                  new AbstractMap.SimpleEntry<String, String>("foo", "bar"),
+                  new AbstractMap.SimpleEntry<String, String>("baz", "qux"))
+              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      data.add(KV.of(key, values));
+    }
+    PCollection<KV<String, Map<String, String>>> write =
+        p.apply(
+            Create.of(data)
+                .withCoder(
+                    KvCoder.of(
+                        StringUtf8Coder.of(),
+                        MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port).withMaxLen(1L));

Review comment:
       Please reconsider what / how you are testing here.
   As you are only adding one item per stream / key, there's really not much to be truncated. Also, I'd recommend disabling approximate trim for testing. You likely won't get the result you're expecting otherwise.

##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -205,6 +212,62 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =
+          Stream.of(
+                  new AbstractMap.SimpleEntry<String, String>("foo", "bar"),
+                  new AbstractMap.SimpleEntry<String, String>("baz", "qux"))
+              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      data.add(KV.of(key, values));
+    }
+    PCollection<KV<String, Map<String, String>>> write =
+        p.apply(
+            Create.of(data)
+                .withCoder(
+                    KvCoder.of(
+                        StringUtf8Coder.of(),
+                        MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port));
+    p.run();
+
+    for (String key : keys) {
+      long count = client.xlen(key);
+      assertEquals(2, count);

Review comment:
       Please make sure there's at least one test case to verify the data is also correctly written to the stream.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-989665018


   @mosche Could you take a look on this, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r766847300



##########
File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -709,4 +720,146 @@ public void teardown() {
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to write stream key pairs (https://redis.io/topics/streams-intro) to a
+   * Redis server.
+   */
+  @AutoValue
+  public abstract static class WriteStreams
+      extends PTransform<PCollection<KV<String, Map<String, String>>>, PDone> {
+
+    abstract @Nullable RedisConnectionConfiguration connectionConfiguration();
+
+    abstract @Nullable Long maxLen();
+
+    abstract boolean approximateTrim();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setConnectionConfiguration(
+          RedisConnectionConfiguration connectionConfiguration);
+
+      abstract Builder setMaxLen(Long maxLen);
+
+      abstract Builder setApproximateTrim(boolean approximateTrim);
+
+      abstract WriteStreams build();
+    }
+
+    public WriteStreams withEndpoint(String host, int port) {

Review comment:
       👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r766858129



##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -205,6 +212,62 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {

Review comment:
       good point.  For now I'm making the keys non-overlapping.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on a change in pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r766887718



##########
File path: sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -205,6 +212,62 @@ public void testWriteUsingDECRBY() {
     assertEquals(-1, count);
   }
 
+  @Test
+  public void testWriteStreams() {
+    List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
+    List<KV<String, Map<String, String>>> data = new ArrayList<>();
+    for (String key : keys) {
+      Map<String, String> values =

Review comment:
       Oh nice, that's a much more idiomatic pattern, thank you.  (Java is to put it mildly not my primary language.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-956497864


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-981156834


   @kennknowles @lukecwik @aromanenko-dev gentle ping? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-959433035


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] n-oden commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
n-oden commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-959560378


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-964229955


   Thanks! I'll take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #15858: [BEAM-13159] Add Redis Stream (XADD) Write Support

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #15858:
URL: https://github.com/apache/beam/pull/15858#issuecomment-963520414


   I think @aromanenko-dev has been reviewing some similar changes. We could also start to ask the people who wrote them to do the review of this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org