You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/02 04:52:10 UTC

[GitHub] [beam] pabloem opened a new pull request #15863: Autosharding jfor JdbcIO.write* transforms

pabloem opened a new pull request #15863:
URL: https://github.com/apache/beam/pull/15863


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-964433242


   PTAL friends : )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] calvin-orange commented on a change in pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
calvin-orange commented on a change in pull request #15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r786248807



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -2085,17 +2167,12 @@ private Connection getConnection() throws SQLException {
 
     @ProcessElement
     public void processElement(ProcessContext context) throws Exception {
-      T record = context.element();
-      records.add(record);
-      if (records.size() >= spec.getBatchSize()) {

Review comment:
       @pabloem Hi Pablo. Not sure if this is the right way to ask this, but I was wondering if this removal removes the configurability of batch size? In batchElements I see that it utilizes a constant of MAX_BUNDLE_SIZE. Seems like it may not be utilizing batchSize anymore but I may be missing or misunderstanding something though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-998298446


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: Autosharding jfor JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-958125943


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r748533384



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1494,8 +1509,49 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          ((getAutoSharding() == null || !getAutoSharding())

Review comment:
       hehe thanks! appreciate it.

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1494,8 +1509,49 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          ((getAutoSharding() == null || !getAutoSharding())
+                  && input.isBounded() == IsBounded.BOUNDED)
+              || input.isBounded() == IsBounded.UNBOUNDED,
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {
+        iterables =
+            input
+                .apply(WithKeys.<String, T>of(""))
+                .apply(
+                    GroupIntoBatches.<String, T>ofSize(DEFAULT_BATCH_SIZE)
+                        .withMaxBufferingDuration(Duration.millis(200))
+                        .withShardedKey())
+                .apply(Values.create());
+      } else {
+        iterables =
+            input.apply(
+                ParDo.of(
+                    new DoFn<T, Iterable<T>>() {
+                      List<T> outputList;
+
+                      @ProcessElement
+                      public void process(ProcessContext c) {
+                        if (outputList == null) {
+                          outputList = new ArrayList<>();
+                        }
+                        outputList.add(c.element());
+                      }
+
+                      @FinishBundle
+                      public void finish(FinishBundleContext c) {
+                        System.out.println("List size is " + outputList.size());

Review comment:
       removed. Thanks!

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1494,8 +1509,49 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          ((getAutoSharding() == null || !getAutoSharding())
+                  && input.isBounded() == IsBounded.BOUNDED)
+              || input.isBounded() == IsBounded.UNBOUNDED,
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {
+        iterables =
+            input
+                .apply(WithKeys.<String, T>of(""))
+                .apply(
+                    GroupIntoBatches.<String, T>ofSize(DEFAULT_BATCH_SIZE)
+                        .withMaxBufferingDuration(Duration.millis(200))
+                        .withShardedKey())
+                .apply(Values.create());
+      } else {
+        iterables =

Review comment:
       that's correct. Since the WriteFn now takes Lists instead of individual elements, we need to perform batching in the previous operation

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1432,6 +1442,11 @@ void set(
       return toBuilder().setPreparedStatementSetter(setter).build();
     }
 
+    /** If true, enables using a dynamically determined number of shards to write. */
+    public WriteWithResults<T, V> withAutoSharding() {

Review comment:
       hmmm I think we should wait for feedback before adding that capability. If users find that they want more control over sharding, they'd probably file issues / ask questions on stackoverflow. If we add this capability we'll need to support it for the longer term... What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-1001785761


   anyone got some time? : )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-960122226






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #15863:
URL: https://github.com/apache/beam/pull/15863


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-965563065


   @pabloem Sorry for delay, I was busy with some other things. I'll try to take a look in the next few days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-970727554


   hi @aromanenko-dev : ) - I'd love to get this in for the release cut on wednesday (though don't feel pressured to allow bad quality code. if fixes are needed, I'll happily make them)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r748398715



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1494,8 +1509,49 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          ((getAutoSharding() == null || !getAutoSharding())
+                  && input.isBounded() == IsBounded.BOUNDED)
+              || input.isBounded() == IsBounded.UNBOUNDED,
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {
+        iterables =
+            input
+                .apply(WithKeys.<String, T>of(""))

Review comment:
       Why `of("")` ?

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1494,8 +1509,49 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          ((getAutoSharding() == null || !getAutoSharding())
+                  && input.isBounded() == IsBounded.BOUNDED)
+              || input.isBounded() == IsBounded.UNBOUNDED,
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {
+        iterables =
+            input
+                .apply(WithKeys.<String, T>of(""))
+                .apply(
+                    GroupIntoBatches.<String, T>ofSize(DEFAULT_BATCH_SIZE)
+                        .withMaxBufferingDuration(Duration.millis(200))
+                        .withShardedKey())
+                .apply(Values.create());
+      } else {
+        iterables =

Review comment:
       Why this branch is needed? To batch per bundle for bounded case?

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1494,8 +1509,49 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          ((getAutoSharding() == null || !getAutoSharding())

Review comment:
       A bit simpler condition:
   ```
   checkArgument(getAutoSharding() != null && getAutoSharding() && input.isBounded() != IsBounded.UNBOUNDED)
   ```

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1494,8 +1509,49 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          ((getAutoSharding() == null || !getAutoSharding())
+                  && input.isBounded() == IsBounded.BOUNDED)
+              || input.isBounded() == IsBounded.UNBOUNDED,
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {
+        iterables =
+            input
+                .apply(WithKeys.<String, T>of(""))
+                .apply(
+                    GroupIntoBatches.<String, T>ofSize(DEFAULT_BATCH_SIZE)
+                        .withMaxBufferingDuration(Duration.millis(200))
+                        .withShardedKey())
+                .apply(Values.create());
+      } else {
+        iterables =
+            input.apply(
+                ParDo.of(
+                    new DoFn<T, Iterable<T>>() {
+                      List<T> outputList;
+
+                      @ProcessElement
+                      public void process(ProcessContext c) {
+                        if (outputList == null) {
+                          outputList = new ArrayList<>();
+                        }
+                        outputList.add(c.element());
+                      }
+
+                      @FinishBundle
+                      public void finish(FinishBundleContext c) {
+                        System.out.println("List size is " + outputList.size());

Review comment:
       Please, remove it or add to logs.

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1432,6 +1442,11 @@ void set(
       return toBuilder().setPreparedStatementSetter(setter).build();
     }
 
+    /** If true, enables using a dynamically determined number of shards to write. */
+    public WriteWithResults<T, V> withAutoSharding() {

Review comment:
       Should we allow to customise a sharding function? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: Autosharding jfor JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-957100316


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r788239608



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -2085,17 +2167,12 @@ private Connection getConnection() throws SQLException {
 
     @ProcessElement
     public void processElement(ProcessContext context) throws Exception {
-      T record = context.element();
-      records.add(record);
-      if (records.size() >= spec.getBatchSize()) {

Review comment:
       good catch. I'll fix this before the release. (2.36.0), so it'll only be available on the next one




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r764214549



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1708,7 +1777,41 @@ void set(
         checkArgument(
             spec.getPreparedStatementSetter() != null, "withPreparedStatementSetter() is required");
       }
-      return input
+      PCollection<Iterable<T>> iterables;

Review comment:
       +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-995292515


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] calvin-orange commented on a change in pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
calvin-orange commented on a change in pull request #15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r791185003



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -2085,17 +2167,12 @@ private Connection getConnection() throws SQLException {
 
     @ProcessElement
     public void processElement(ProcessContext context) throws Exception {
-      T record = context.element();
-      records.add(record);
-      if (records.size() >= spec.getBatchSize()) {

Review comment:
       Great. Thank you!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: Autosharding jfor JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-957100316






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-967735753


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-960122226


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-995292449


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-995412714


   Run Java PostCommit
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-998296077


   Run Java PostCommit
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r764209748



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1549,8 +1570,47 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          getAutoSharding() == null
+              || (getAutoSharding() && input.isBounded() != IsBounded.UNBOUNDED),
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {

Review comment:
       In theory, it may return `false`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r767088333



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1708,7 +1777,41 @@ void set(
         checkArgument(
             spec.getPreparedStatementSetter() != null, "withPreparedStatementSetter() is required");
       }
-      return input
+      PCollection<Iterable<T>> iterables;

Review comment:
       done. Added a helper method.

##########
File path: sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
##########
@@ -258,6 +263,40 @@ private PipelineResult runRead() {
     return pipelineRead.run();
   }
 
+  @Test
+  public void testWriteWithAutosharding() throws Exception {
+    String firstTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+    DatabaseTestHelper.createTable(dataSource, firstTableName);
+    try {
+      List<KV<Integer, String>> data = getTestDataToWrite(EXPECTED_ROW_COUNT);
+      TestStream.Builder<KV<Integer, String>> ts =
+          TestStream.create(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
+              .advanceWatermarkTo(Instant.now());
+      for (KV<Integer, String> elm : data) {
+        ts.addElements(elm);
+      }
+
+      PCollection<KV<Integer, String>> dataCollection =
+          pipelineWrite.apply(ts.advanceWatermarkToInfinity());
+      dataCollection.apply(
+          JdbcIO.<KV<Integer, String>>write()
+              .withDataSourceProviderFn(voidInput -> dataSource)
+              .withStatement(String.format("insert into %s values(?, ?) returning *", tableName))
+              .withAutoSharding()

Review comment:
       hm not really the way things are now - perhaps analyze the graph and see that the GIB transform is in it - but is that worth it?

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1549,8 +1570,47 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          getAutoSharding() == null
+              || (getAutoSharding() && input.isBounded() != IsBounded.UNBOUNDED),
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {
+        iterables =
+            input
+                .apply(WithKeys.<String, T>of(""))
+                .apply(
+                    GroupIntoBatches.<String, T>ofSize(DEFAULT_BATCH_SIZE)
+                        .withMaxBufferingDuration(Duration.millis(200))
+                        .withShardedKey())
+                .apply(Values.create());
+      } else {
+        iterables =
+            input.apply(
+                ParDo.of(
+                    new DoFn<T, Iterable<T>>() {
+                      List<T> outputList;
+
+                      @ProcessElement
+                      public void process(ProcessContext c) {
+                        if (outputList == null) {
+                          outputList = new ArrayList<>();
+                        }
+                        outputList.add(c.element());

Review comment:
       fixed. Added a maximum row limit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-989663717


   Hey @pabloem, kind ping on this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-961336483


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-961336483


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-961336483


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-967377167


   adding a test makes sense. I'll work on that


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-967733143


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-967703267


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r767088333



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1708,7 +1777,41 @@ void set(
         checkArgument(
             spec.getPreparedStatementSetter() != null, "withPreparedStatementSetter() is required");
       }
-      return input
+      PCollection<Iterable<T>> iterables;

Review comment:
       done. Added a helper method.

##########
File path: sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
##########
@@ -258,6 +263,40 @@ private PipelineResult runRead() {
     return pipelineRead.run();
   }
 
+  @Test
+  public void testWriteWithAutosharding() throws Exception {
+    String firstTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+    DatabaseTestHelper.createTable(dataSource, firstTableName);
+    try {
+      List<KV<Integer, String>> data = getTestDataToWrite(EXPECTED_ROW_COUNT);
+      TestStream.Builder<KV<Integer, String>> ts =
+          TestStream.create(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
+              .advanceWatermarkTo(Instant.now());
+      for (KV<Integer, String> elm : data) {
+        ts.addElements(elm);
+      }
+
+      PCollection<KV<Integer, String>> dataCollection =
+          pipelineWrite.apply(ts.advanceWatermarkToInfinity());
+      dataCollection.apply(
+          JdbcIO.<KV<Integer, String>>write()
+              .withDataSourceProviderFn(voidInput -> dataSource)
+              .withStatement(String.format("insert into %s values(?, ?) returning *", tableName))
+              .withAutoSharding()

Review comment:
       hm not really the way things are now - perhaps analyze the graph and see that the GIB transform is in it - but is that worth it?

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1549,8 +1570,47 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          getAutoSharding() == null
+              || (getAutoSharding() && input.isBounded() != IsBounded.UNBOUNDED),
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {
+        iterables =
+            input
+                .apply(WithKeys.<String, T>of(""))
+                .apply(
+                    GroupIntoBatches.<String, T>ofSize(DEFAULT_BATCH_SIZE)
+                        .withMaxBufferingDuration(Duration.millis(200))
+                        .withShardedKey())
+                .apply(Values.create());
+      } else {
+        iterables =
+            input.apply(
+                ParDo.of(
+                    new DoFn<T, Iterable<T>>() {
+                      List<T> outputList;
+
+                      @ProcessElement
+                      public void process(ProcessContext c) {
+                        if (outputList == null) {
+                          outputList = new ArrayList<>();
+                        }
+                        outputList.add(c.element());

Review comment:
       fixed. Added a maximum row limit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] steveniemitz edited a comment on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
steveniemitz edited a comment on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-1017579519


   random musings from me, because we've tried to do something like this as well w/ our own SQL-ish IO.  
   
   If you introduce an (implicit) reshuffle between the producer of the rows being written and the writer, you'll possibly break an implicit contract that users have been relying on that mutations produced are applied in-order to the JDBC destination.
   
   For example, if a GBK is triggering every 10 seconds and the next transform is a JdbcIO, by default that GBK trigger will fuse w/ the JdbcIO writer and apply "inline", so all triggers will apply in order.  If you apply batching (with autosharding or not), multiple mutations for the same row may be grouped into multiple different batches, which will then be applied in a non-deterministic order.  This can cause older firings to overwrite newer ones depending on the order they're applied in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-1003089476


   Thanks, it's ok for me but maybe @chamikaramj has some additional comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-995311878


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r748534021



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1494,8 +1509,49 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          ((getAutoSharding() == null || !getAutoSharding())
+                  && input.isBounded() == IsBounded.BOUNDED)
+              || input.isBounded() == IsBounded.UNBOUNDED,
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {
+        iterables =
+            input
+                .apply(WithKeys.<String, T>of(""))

Review comment:
       GroupIntoBatches needs to receive a PColl<KV<..>> to do the per-batch grouping and autosharding, so I assign any key for the elements to go into the GIB transform. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r764203432



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1432,6 +1442,11 @@ void set(
       return toBuilder().setPreparedStatementSetter(setter).build();
     }
 
+    /** If true, enables using a dynamically determined number of shards to write. */
+    public WriteWithResults<T, V> withAutoSharding() {

Review comment:
       Ok, let's keep it only automatically for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-960122226


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-1005158063


   LGTM. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r759755819



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1549,8 +1570,47 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          getAutoSharding() == null
+              || (getAutoSharding() && input.isBounded() != IsBounded.UNBOUNDED),
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {

Review comment:
       "&& getAutoSharding()" is redundant ?

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1549,8 +1570,47 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          getAutoSharding() == null
+              || (getAutoSharding() && input.isBounded() != IsBounded.UNBOUNDED),
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {
+        iterables =
+            input
+                .apply(WithKeys.<String, T>of(""))
+                .apply(
+                    GroupIntoBatches.<String, T>ofSize(DEFAULT_BATCH_SIZE)
+                        .withMaxBufferingDuration(Duration.millis(200))
+                        .withShardedKey())
+                .apply(Values.create());
+      } else {
+        iterables =
+            input.apply(
+                ParDo.of(
+                    new DoFn<T, Iterable<T>>() {
+                      List<T> outputList;
+
+                      @ProcessElement
+                      public void process(ProcessContext c) {
+                        if (outputList == null) {
+                          outputList = new ArrayList<>();
+                        }
+                        outputList.add(c.element());

Review comment:
       Won't we run into OOMs if this list grows too much ?

##########
File path: sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
##########
@@ -258,6 +263,40 @@ private PipelineResult runRead() {
     return pipelineRead.run();
   }
 
+  @Test
+  public void testWriteWithAutosharding() throws Exception {
+    String firstTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+    DatabaseTestHelper.createTable(dataSource, firstTableName);
+    try {
+      List<KV<Integer, String>> data = getTestDataToWrite(EXPECTED_ROW_COUNT);
+      TestStream.Builder<KV<Integer, String>> ts =
+          TestStream.create(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
+              .advanceWatermarkTo(Instant.now());
+      for (KV<Integer, String> elm : data) {
+        ts.addElements(elm);
+      }
+
+      PCollection<KV<Integer, String>> dataCollection =
+          pipelineWrite.apply(ts.advanceWatermarkToInfinity());
+      dataCollection.apply(
+          JdbcIO.<KV<Integer, String>>write()
+              .withDataSourceProviderFn(voidInput -> dataSource)
+              .withStatement(String.format("insert into %s values(?, ?) returning *", tableName))
+              .withAutoSharding()

Review comment:
       Are we actually able to test that auto-sharding worked somehow ?

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1494,8 +1509,49 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          ((getAutoSharding() == null || !getAutoSharding())
+                  && input.isBounded() == IsBounded.BOUNDED)
+              || input.isBounded() == IsBounded.UNBOUNDED,
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {
+        iterables =
+            input
+                .apply(WithKeys.<String, T>of(""))

Review comment:
       Should we consider moving this WithKeys.of() and the subsequent Values.create() into GroupIntoBatches to improve it's user interface (in a separate PR) ?

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1432,6 +1442,11 @@ void set(
       return toBuilder().setPreparedStatementSetter(setter).build();
     }
 
+    /** If true, enables using a dynamically determined number of shards to write. */
+    public WriteWithResults<T, V> withAutoSharding() {

Review comment:
       Yeah, that sounds like a separate feature. The purpose of this feature is for the runner to determine the appropriate amount of sharding automatically.

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1708,7 +1777,41 @@ void set(
         checkArgument(
             spec.getPreparedStatementSetter() != null, "withPreparedStatementSetter() is required");
       }
-      return input
+      PCollection<Iterable<T>> iterables;

Review comment:
       Can we share code between WriteWithResult and WriteVoid ? This introduces a significant amount of code duplication.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-998199128






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r764213648



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1494,8 +1509,49 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          ((getAutoSharding() == null || !getAutoSharding())
+                  && input.isBounded() == IsBounded.BOUNDED)
+              || input.isBounded() == IsBounded.UNBOUNDED,
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {
+        iterables =
+            input
+                .apply(WithKeys.<String, T>of(""))

Review comment:
       @pabloem Ok. I'd suggest to add a quick comment on this here to make it clear.
   @chamikaramj Makes sense for me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15863: Autosharding jfor JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-957100316


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] calvin-orange commented on a change in pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
calvin-orange commented on a change in pull request #15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r791185003



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -2085,17 +2167,12 @@ private Connection getConnection() throws SQLException {
 
     @ProcessElement
     public void processElement(ProcessContext context) throws Exception {
-      T record = context.element();
-      records.add(record);
-      if (records.size() >= spec.getBatchSize()) {

Review comment:
       Great. Thank you!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] steveniemitz commented on pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on pull request #15863:
URL: https://github.com/apache/beam/pull/15863#issuecomment-1017579519


   random musings from me, because we've tried to do something like this as well w/ our own SQL-ish IO.  
   
   If you introduce an (implicit) reshuffle between the producer of the rows being written and the writer, you'll possibly break an implicit contract that users have been relying on that mutations produced are applied in-order to the JDBC destination.
   
   For example, if a GBK is triggering every 10 seconds and the next transform is a JdbcIO, by default that GBK trigger will fuse w/ the JdbcIO writer and apply "inline", so all triggers will apply in order.  If you apply batching (with autosharding or not), multiple mutations for the same row may be grouped into multiple different batches, which will then be applied in a non-deterministic order.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org