You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/21 11:14:07 UTC

[GitHub] [beam] daria-malkova opened a new pull request #15049: [BEAM-12456] Parallel querying in JdbcIO

daria-malkova opened a new pull request #15049:
URL: https://github.com/apache/beam/pull/15049


   Design docs can be found here: https://docs.google.com/document/d/1wBzVhQEhTK23ALzTSZ_CVouEOXTm3w2-LjmO3ieUvFc/edit
   
   The proposed solution provides an extension for JdbcIO. This extension for JdbcIO supports parallel loading from JDBC sources with a query splitting mechanism. The user needs to specify a field for splitting the query, the range for values in this field, and the degree of parallelism. JdbcIO opens multiple connections to the database using split queries.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #15049: [BEAM-12456] Parallel querying in JdbcIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #15049:
URL: https://github.com/apache/beam/pull/15049#discussion_r659784617



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -252,4 +262,67 @@ private static Calendar withTimestampAndTimezone(DateTime dateTime) {
 
     return calendar;
   }
+
+  /** Create partitions on a table. */
+  static class PartitioningFn extends DoFn<List<Integer>, KV<String, Integer>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      List<Integer> params = c.element();
+      Integer lowerBound = params.get(0);
+      Integer upperBound = params.get(1);
+      Integer numPartitions = params.get(2);
+      int stride = (upperBound - lowerBound) / numPartitions + 1;
+      for (int i = lowerBound; i < upperBound - stride; i += stride) {
+        String range = String.format("%s,%s", i, i + stride);
+        KV<String, Integer> kvRange = KV.of(range, 1);
+        c.output(kvRange);
+      }
+      if (upperBound - lowerBound > stride * (numPartitions - 1)) {
+        int indexFrom = (numPartitions - 1) * stride;
+        int indexTo = upperBound + 1;
+        String range = String.format("%s,%s", indexFrom, indexTo);
+        KV<String, Integer> kvRange = KV.of(range, 1);
+        c.output(kvRange);
+      }
+    }
+  }
+
+  /**
+   * Select maximal and minimal value from a table by partitioning column.
+   *
+   * @return pair of integers corresponds to the upper and lower bounds.
+   */
+  static Integer[] getBounds(
+      PBegin input,
+      String table,
+      SerializableFunction<Void, DataSource> providerFunctionFn,
+      String partitionColumn) {
+    final Integer[] bounds = {0, 0};
+    input
+        .apply(
+            String.format("Read min and max value by %s", partitionColumn),
+            JdbcIO.<String>read()
+                .withDataSourceProviderFn(providerFunctionFn)
+                .withQuery(
+                    String.format("select min(%1$s), max(%1$s) from %2$s", partitionColumn, table))
+                .withRowMapper(
+                    (JdbcIO.RowMapper<String>)
+                        resultSet ->
+                            String.join(
+                                ",", Arrays.asList(resultSet.getString(1), resultSet.getString(2))))
+                .withOutputParallelization(false)
+                .withCoder(StringUtf8Coder.of()))
+        .apply(
+            ParDo.of(
+                new DoFn<String, String>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext context) {
+                    List<String> elements = Splitter.on(',').splitToList(context.element());
+                    bounds[0] = Integer.parseInt(Objects.requireNonNull(elements.get(0)));
+                    bounds[1] = Integer.parseInt(Objects.requireNonNull(elements.get(1)));
+                    context.output(context.element());
+                  }
+                }));
+    return bounds;

Review comment:
       Ping on this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] daria-malkova commented on a change in pull request #15049: [BEAM-12456] Parallel querying in JdbcIO

Posted by GitBox <gi...@apache.org>.
daria-malkova commented on a change in pull request #15049:
URL: https://github.com/apache/beam/pull/15049#discussion_r659629294



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -252,4 +262,67 @@ private static Calendar withTimestampAndTimezone(DateTime dateTime) {
 
     return calendar;
   }
+
+  /** Create partitions on a table. */
+  static class PartitioningFn extends DoFn<List<Integer>, KV<String, Integer>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      List<Integer> params = c.element();
+      Integer lowerBound = params.get(0);
+      Integer upperBound = params.get(1);
+      Integer numPartitions = params.get(2);
+      int stride = (upperBound - lowerBound) / numPartitions + 1;

Review comment:
       Added

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -873,8 +944,177 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #readWithPartitions}. */
+  @AutoValue
+  public abstract static class ReadWithPartitions<T> extends PTransform<PBegin, PCollection<T>> {
+
+    abstract @Nullable SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    abstract @Nullable RowMapper<T> getRowMapper();
+
+    abstract @Nullable Coder<T> getCoder();
+
+    abstract int getNumPartitions();
+
+    abstract @Nullable String getPartitionColumn();
+
+    abstract int getLowerBound();
+
+    abstract int getUpperBound();
+
+    abstract @Nullable String getTable();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setRowMapper(RowMapper<T> rowMapper);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setNumPartitions(int numPartitions);
+
+      abstract Builder<T> setPartitionColumn(String partitionColumn);
+
+      abstract Builder<T> setLowerBound(int lowerBound);
+
+      abstract Builder<T> setUpperBound(int upperBound);
+
+      abstract Builder<T> setTable(String tableName);
+
+      abstract ReadWithPartitions<T> build();
+    }
+
+    public ReadWithPartitions<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    public ReadWithPartitions<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    public ReadWithPartitions<T> withRowMapper(RowMapper<T> rowMapper) {
+      checkNotNull(rowMapper, "rowMapper can not be null");
+      return toBuilder().setRowMapper(rowMapper).build();
+    }
+
+    public ReadWithPartitions<T> withCoder(Coder<T> coder) {
+      checkNotNull(coder, "coder can not be null");
+      return toBuilder().setCoder(coder).build();
+    }
+
+    /**
+     * The number of partitions. This, along with withLowerBound and withUpperBound, form partitions
+     * strides for generated WHERE clause expressions used to split the column withPartitionColumn
+     * evenly. When the input is less than 1, the number is set to 1.
+     */
+    public ReadWithPartitions<T> withNumPartitions(int numPartitions) {
+      checkArgument(numPartitions > 0, "numPartitions can not be less than 1");
+      return toBuilder().setNumPartitions(numPartitions).build();
+    }
+
+    /** The name of a column of numeric type that will be used for partitioning */
+    public ReadWithPartitions<T> withPartitionColumn(String partitionColumn) {
+      checkNotNull(partitionColumn, "partitionColumn can not be null");
+      return toBuilder().setPartitionColumn(partitionColumn).build();
+    }
+
+    public ReadWithPartitions<T> withLowerBound(int lowerBound) {
+      return toBuilder().setLowerBound(lowerBound).build();
+    }
+
+    public ReadWithPartitions<T> withUpperBound(int upperBound) {
+      return toBuilder().setUpperBound(upperBound).build();
+    }
+
+    /** Name of the table in the external database. Can be used to pass a user-defined subqery. */
+    public ReadWithPartitions<T> withTable(String tableName) {
+      checkNotNull(tableName, "table can not be null");
+      return toBuilder().setTable(tableName).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      checkNotNull(getRowMapper(), "withRowMapper() is required");
+      checkNotNull(getCoder(), "withCoder() is required");
+      checkNotNull(
+          getDataSourceProviderFn(),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+      checkNotNull(getPartitionColumn(), "withPartitionColumn() is required");
+      checkNotNull(getTable(), "withTable() is required");
+      checkArgument(
+          getLowerBound() < getUpperBound(),
+          "The lower bound of partitioning column is larger or equal than the upper bound");
+      checkArgument(
+          getUpperBound() - getLowerBound() >= getNumPartitions(),
+          "The specified number of partitions is more than the difference between upper bound and lower bound");
+
+      if (getUpperBound() == MAX_VALUE || getLowerBound() == 0) {
+        refineBounds(input);
+      }
+
+      int stride = (getUpperBound() - getLowerBound()) / getNumPartitions();
+      PCollection<List<Integer>> params =
+          input.apply(
+              Create.of(
+                  Collections.singletonList(
+                      Arrays.asList(getLowerBound(), getUpperBound(), getNumPartitions()))));
+      PCollection<KV<String, Iterable<Integer>>> ranges =
+          params
+              .apply("Partitioning", ParDo.of(new PartitioningFn()))
+              .apply("Group partitions", GroupByKey.create());
+
+      return ranges.apply(
+          "Read ranges",
+          JdbcIO.<KV<String, Iterable<Integer>>, T>readAll()
+              .withDataSourceProviderFn(getDataSourceProviderFn())
+              .withFetchSize(stride)
+              .withQuery(
+                  String.format(
+                      "select * from %1$s where %2$s >= ? and %2$s < ?",
+                      getTable(), getPartitionColumn()))
+              .withCoder(getCoder())
+              .withRowMapper(getRowMapper())
+              .withParameterSetter(
+                  (PreparedStatementSetter<KV<String, Iterable<Integer>>>)
+                      (element, preparedStatement) -> {
+                        String[] range = element.getKey().split(",", -1);
+                        preparedStatement.setInt(1, Integer.parseInt(range[0]));
+                        preparedStatement.setInt(2, Integer.parseInt(range[1]));
+                      })
+              .withOutputParallelization(false));
+    }
+
+    private void refineBounds(PBegin input) {
+      Integer[] bounds =
+          JdbcUtil.getBounds(input, getTable(), getDataSourceProviderFn(), getPartitionColumn());
+      if (getLowerBound() == 0) {
+        withLowerBound(bounds[0]);
+      }
+      if (getUpperBound() == MAX_VALUE) {
+        withUpperBound(bounds[1]);
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {

Review comment:
       Added




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #15049: [BEAM-12456] Parallel querying in JdbcIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #15049:
URL: https://github.com/apache/beam/pull/15049#issuecomment-869686013


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #15049: [BEAM-12456] Parallel querying in JdbcIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #15049:
URL: https://github.com/apache/beam/pull/15049#discussion_r659784617



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -252,4 +262,67 @@ private static Calendar withTimestampAndTimezone(DateTime dateTime) {
 
     return calendar;
   }
+
+  /** Create partitions on a table. */
+  static class PartitioningFn extends DoFn<List<Integer>, KV<String, Integer>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      List<Integer> params = c.element();
+      Integer lowerBound = params.get(0);
+      Integer upperBound = params.get(1);
+      Integer numPartitions = params.get(2);
+      int stride = (upperBound - lowerBound) / numPartitions + 1;
+      for (int i = lowerBound; i < upperBound - stride; i += stride) {
+        String range = String.format("%s,%s", i, i + stride);
+        KV<String, Integer> kvRange = KV.of(range, 1);
+        c.output(kvRange);
+      }
+      if (upperBound - lowerBound > stride * (numPartitions - 1)) {
+        int indexFrom = (numPartitions - 1) * stride;
+        int indexTo = upperBound + 1;
+        String range = String.format("%s,%s", indexFrom, indexTo);
+        KV<String, Integer> kvRange = KV.of(range, 1);
+        c.output(kvRange);
+      }
+    }
+  }
+
+  /**
+   * Select maximal and minimal value from a table by partitioning column.
+   *
+   * @return pair of integers corresponds to the upper and lower bounds.
+   */
+  static Integer[] getBounds(
+      PBegin input,
+      String table,
+      SerializableFunction<Void, DataSource> providerFunctionFn,
+      String partitionColumn) {
+    final Integer[] bounds = {0, 0};
+    input
+        .apply(
+            String.format("Read min and max value by %s", partitionColumn),
+            JdbcIO.<String>read()
+                .withDataSourceProviderFn(providerFunctionFn)
+                .withQuery(
+                    String.format("select min(%1$s), max(%1$s) from %2$s", partitionColumn, table))
+                .withRowMapper(
+                    (JdbcIO.RowMapper<String>)
+                        resultSet ->
+                            String.join(
+                                ",", Arrays.asList(resultSet.getString(1), resultSet.getString(2))))
+                .withOutputParallelization(false)
+                .withCoder(StringUtf8Coder.of()))
+        .apply(
+            ParDo.of(
+                new DoFn<String, String>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext context) {
+                    List<String> elements = Splitter.on(',').splitToList(context.element());
+                    bounds[0] = Integer.parseInt(Objects.requireNonNull(elements.get(0)));
+                    bounds[1] = Integer.parseInt(Objects.requireNonNull(elements.get(1)));
+                    context.output(context.element());
+                  }
+                }));
+    return bounds;

Review comment:
       Ping on this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] daria-malkova commented on pull request #15049: [BEAM-12456] Parallel querying in JdbcIO

Posted by GitBox <gi...@apache.org>.
daria-malkova commented on pull request #15049:
URL: https://github.com/apache/beam/pull/15049#issuecomment-872801481


   @aromanenko-dev CHANGES.md and commit message are updated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #15049: [BEAM-12456] Parallel querying in JdbcIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #15049:
URL: https://github.com/apache/beam/pull/15049#discussion_r658067920



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -252,4 +262,67 @@ private static Calendar withTimestampAndTimezone(DateTime dateTime) {
 
     return calendar;
   }
+
+  /** Create partitions on a table. */

Review comment:
       It would be useful to add a javadoc with examples of different cases.

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -252,4 +262,67 @@ private static Calendar withTimestampAndTimezone(DateTime dateTime) {
 
     return calendar;
   }
+
+  /** Create partitions on a table. */
+  static class PartitioningFn extends DoFn<List<Integer>, KV<String, Integer>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      List<Integer> params = c.element();
+      Integer lowerBound = params.get(0);
+      Integer upperBound = params.get(1);
+      Integer numPartitions = params.get(2);
+      int stride = (upperBound - lowerBound) / numPartitions + 1;

Review comment:
       Should we assert here that `upperBound > lowerBound`?

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -873,8 +944,177 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #readWithPartitions}. */
+  @AutoValue
+  public abstract static class ReadWithPartitions<T> extends PTransform<PBegin, PCollection<T>> {
+
+    abstract @Nullable SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    abstract @Nullable RowMapper<T> getRowMapper();
+
+    abstract @Nullable Coder<T> getCoder();
+
+    abstract int getNumPartitions();
+
+    abstract @Nullable String getPartitionColumn();
+
+    abstract int getLowerBound();
+
+    abstract int getUpperBound();
+
+    abstract @Nullable String getTable();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setRowMapper(RowMapper<T> rowMapper);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setNumPartitions(int numPartitions);
+
+      abstract Builder<T> setPartitionColumn(String partitionColumn);
+
+      abstract Builder<T> setLowerBound(int lowerBound);
+
+      abstract Builder<T> setUpperBound(int upperBound);
+
+      abstract Builder<T> setTable(String tableName);
+
+      abstract ReadWithPartitions<T> build();
+    }
+
+    public ReadWithPartitions<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    public ReadWithPartitions<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    public ReadWithPartitions<T> withRowMapper(RowMapper<T> rowMapper) {
+      checkNotNull(rowMapper, "rowMapper can not be null");
+      return toBuilder().setRowMapper(rowMapper).build();
+    }
+
+    public ReadWithPartitions<T> withCoder(Coder<T> coder) {
+      checkNotNull(coder, "coder can not be null");
+      return toBuilder().setCoder(coder).build();
+    }
+
+    /**
+     * The number of partitions. This, along with withLowerBound and withUpperBound, form partitions
+     * strides for generated WHERE clause expressions used to split the column withPartitionColumn
+     * evenly. When the input is less than 1, the number is set to 1.
+     */
+    public ReadWithPartitions<T> withNumPartitions(int numPartitions) {
+      checkArgument(numPartitions > 0, "numPartitions can not be less than 1");
+      return toBuilder().setNumPartitions(numPartitions).build();
+    }
+
+    /** The name of a column of numeric type that will be used for partitioning */
+    public ReadWithPartitions<T> withPartitionColumn(String partitionColumn) {
+      checkNotNull(partitionColumn, "partitionColumn can not be null");
+      return toBuilder().setPartitionColumn(partitionColumn).build();
+    }
+
+    public ReadWithPartitions<T> withLowerBound(int lowerBound) {
+      return toBuilder().setLowerBound(lowerBound).build();
+    }
+
+    public ReadWithPartitions<T> withUpperBound(int upperBound) {
+      return toBuilder().setUpperBound(upperBound).build();
+    }
+
+    /** Name of the table in the external database. Can be used to pass a user-defined subqery. */
+    public ReadWithPartitions<T> withTable(String tableName) {
+      checkNotNull(tableName, "table can not be null");
+      return toBuilder().setTable(tableName).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      checkNotNull(getRowMapper(), "withRowMapper() is required");
+      checkNotNull(getCoder(), "withCoder() is required");
+      checkNotNull(
+          getDataSourceProviderFn(),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+      checkNotNull(getPartitionColumn(), "withPartitionColumn() is required");
+      checkNotNull(getTable(), "withTable() is required");
+      checkArgument(
+          getLowerBound() < getUpperBound(),
+          "The lower bound of partitioning column is larger or equal than the upper bound");
+      checkArgument(
+          getUpperBound() - getLowerBound() >= getNumPartitions(),
+          "The specified number of partitions is more than the difference between upper bound and lower bound");
+
+      if (getUpperBound() == MAX_VALUE || getLowerBound() == 0) {
+        refineBounds(input);
+      }
+
+      int stride = (getUpperBound() - getLowerBound()) / getNumPartitions();
+      PCollection<List<Integer>> params =
+          input.apply(
+              Create.of(
+                  Collections.singletonList(
+                      Arrays.asList(getLowerBound(), getUpperBound(), getNumPartitions()))));
+      PCollection<KV<String, Iterable<Integer>>> ranges =
+          params
+              .apply("Partitioning", ParDo.of(new PartitioningFn()))
+              .apply("Group partitions", GroupByKey.create());
+
+      return ranges.apply(
+          "Read ranges",
+          JdbcIO.<KV<String, Iterable<Integer>>, T>readAll()
+              .withDataSourceProviderFn(getDataSourceProviderFn())
+              .withFetchSize(stride)
+              .withQuery(
+                  String.format(
+                      "select * from %1$s where %2$s >= ? and %2$s < ?",
+                      getTable(), getPartitionColumn()))
+              .withCoder(getCoder())
+              .withRowMapper(getRowMapper())
+              .withParameterSetter(
+                  (PreparedStatementSetter<KV<String, Iterable<Integer>>>)
+                      (element, preparedStatement) -> {
+                        String[] range = element.getKey().split(",", -1);
+                        preparedStatement.setInt(1, Integer.parseInt(range[0]));
+                        preparedStatement.setInt(2, Integer.parseInt(range[1]));
+                      })
+              .withOutputParallelization(false));
+    }
+
+    private void refineBounds(PBegin input) {
+      Integer[] bounds =
+          JdbcUtil.getBounds(input, getTable(), getDataSourceProviderFn(), getPartitionColumn());
+      if (getLowerBound() == 0) {
+        withLowerBound(bounds[0]);
+      }
+      if (getUpperBound() == MAX_VALUE) {
+        withUpperBound(bounds[1]);
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {

Review comment:
       Maybe also add the bounds, table,  partition id and partitions number as well? 

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -252,4 +262,67 @@ private static Calendar withTimestampAndTimezone(DateTime dateTime) {
 
     return calendar;
   }
+
+  /** Create partitions on a table. */
+  static class PartitioningFn extends DoFn<List<Integer>, KV<String, Integer>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      List<Integer> params = c.element();
+      Integer lowerBound = params.get(0);
+      Integer upperBound = params.get(1);
+      Integer numPartitions = params.get(2);
+      int stride = (upperBound - lowerBound) / numPartitions + 1;
+      for (int i = lowerBound; i < upperBound - stride; i += stride) {
+        String range = String.format("%s,%s", i, i + stride);
+        KV<String, Integer> kvRange = KV.of(range, 1);
+        c.output(kvRange);
+      }
+      if (upperBound - lowerBound > stride * (numPartitions - 1)) {
+        int indexFrom = (numPartitions - 1) * stride;
+        int indexTo = upperBound + 1;
+        String range = String.format("%s,%s", indexFrom, indexTo);
+        KV<String, Integer> kvRange = KV.of(range, 1);
+        c.output(kvRange);
+      }
+    }
+  }
+
+  /**
+   * Select maximal and minimal value from a table by partitioning column.
+   *
+   * @return pair of integers corresponds to the upper and lower bounds.
+   */
+  static Integer[] getBounds(
+      PBegin input,
+      String table,
+      SerializableFunction<Void, DataSource> providerFunctionFn,
+      String partitionColumn) {
+    final Integer[] bounds = {0, 0};
+    input
+        .apply(
+            String.format("Read min and max value by %s", partitionColumn),
+            JdbcIO.<String>read()
+                .withDataSourceProviderFn(providerFunctionFn)
+                .withQuery(
+                    String.format("select min(%1$s), max(%1$s) from %2$s", partitionColumn, table))
+                .withRowMapper(
+                    (JdbcIO.RowMapper<String>)
+                        resultSet ->
+                            String.join(
+                                ",", Arrays.asList(resultSet.getString(1), resultSet.getString(2))))
+                .withOutputParallelization(false)
+                .withCoder(StringUtf8Coder.of()))
+        .apply(
+            ParDo.of(
+                new DoFn<String, String>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext context) {
+                    List<String> elements = Splitter.on(',').splitToList(context.element());
+                    bounds[0] = Integer.parseInt(Objects.requireNonNull(elements.get(0)));
+                    bounds[1] = Integer.parseInt(Objects.requireNonNull(elements.get(1)));
+                    context.output(context.element());
+                  }
+                }));
+    return bounds;

Review comment:
       I think in distributed environment it will return always `{0, 0}`, no? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #15049: [BEAM-12456] Parallel querying in JdbcIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #15049:
URL: https://github.com/apache/beam/pull/15049#issuecomment-867752147


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev merged pull request #15049: [BEAM-12456] Parallel querying in JdbcIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev merged pull request #15049:
URL: https://github.com/apache/beam/pull/15049


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] daria-malkova commented on a change in pull request #15049: [BEAM-12456] Parallel querying in JdbcIO

Posted by GitBox <gi...@apache.org>.
daria-malkova commented on a change in pull request #15049:
URL: https://github.com/apache/beam/pull/15049#discussion_r659623022



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -252,4 +262,67 @@ private static Calendar withTimestampAndTimezone(DateTime dateTime) {
 
     return calendar;
   }
+
+  /** Create partitions on a table. */

Review comment:
       There are few examples in the header of the JdbcIO.java file, do I need to add more?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] daria-malkova commented on a change in pull request #15049: [BEAM-12456] Parallel querying in JdbcIO

Posted by GitBox <gi...@apache.org>.
daria-malkova commented on a change in pull request #15049:
URL: https://github.com/apache/beam/pull/15049#discussion_r661405448



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -252,4 +262,67 @@ private static Calendar withTimestampAndTimezone(DateTime dateTime) {
 
     return calendar;
   }
+
+  /** Create partitions on a table. */
+  static class PartitioningFn extends DoFn<List<Integer>, KV<String, Integer>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      List<Integer> params = c.element();
+      Integer lowerBound = params.get(0);
+      Integer upperBound = params.get(1);
+      Integer numPartitions = params.get(2);
+      int stride = (upperBound - lowerBound) / numPartitions + 1;
+      for (int i = lowerBound; i < upperBound - stride; i += stride) {
+        String range = String.format("%s,%s", i, i + stride);
+        KV<String, Integer> kvRange = KV.of(range, 1);
+        c.output(kvRange);
+      }
+      if (upperBound - lowerBound > stride * (numPartitions - 1)) {
+        int indexFrom = (numPartitions - 1) * stride;
+        int indexTo = upperBound + 1;
+        String range = String.format("%s,%s", indexFrom, indexTo);
+        KV<String, Integer> kvRange = KV.of(range, 1);
+        c.output(kvRange);
+      }
+    }
+  }
+
+  /**
+   * Select maximal and minimal value from a table by partitioning column.
+   *
+   * @return pair of integers corresponds to the upper and lower bounds.
+   */
+  static Integer[] getBounds(
+      PBegin input,
+      String table,
+      SerializableFunction<Void, DataSource> providerFunctionFn,
+      String partitionColumn) {
+    final Integer[] bounds = {0, 0};
+    input
+        .apply(
+            String.format("Read min and max value by %s", partitionColumn),
+            JdbcIO.<String>read()
+                .withDataSourceProviderFn(providerFunctionFn)
+                .withQuery(
+                    String.format("select min(%1$s), max(%1$s) from %2$s", partitionColumn, table))
+                .withRowMapper(
+                    (JdbcIO.RowMapper<String>)
+                        resultSet ->
+                            String.join(
+                                ",", Arrays.asList(resultSet.getString(1), resultSet.getString(2))))
+                .withOutputParallelization(false)
+                .withCoder(StringUtf8Coder.of()))
+        .apply(
+            ParDo.of(
+                new DoFn<String, String>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext context) {
+                    List<String> elements = Splitter.on(',').splitToList(context.element());
+                    bounds[0] = Integer.parseInt(Objects.requireNonNull(elements.get(0)));
+                    bounds[1] = Integer.parseInt(Objects.requireNonNull(elements.get(1)));
+                    context.output(context.element());
+                  }
+                }));
+    return bounds;

Review comment:
       I've checked on dataflow - everything works well, min and max values were defined.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #15049: [BEAM-12456] Parallel querying in JdbcIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #15049:
URL: https://github.com/apache/beam/pull/15049#issuecomment-868442372


   Also, checkstyle task  fails, ptal:
   ```
   Execution failed for task ':sdks:java:io:jdbc:checkstyleMain'.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #15049: [BEAM-12456] Parallel querying in JdbcIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #15049:
URL: https://github.com/apache/beam/pull/15049#issuecomment-870687536


   @daria-malkova Could you squash all commits and rebase it against master branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] daria-malkova commented on pull request #15049: [BEAM-12456] Parallel querying in JdbcIO

Posted by GitBox <gi...@apache.org>.
daria-malkova commented on pull request #15049:
URL: https://github.com/apache/beam/pull/15049#issuecomment-869514763


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] daria-malkova commented on pull request #15049: [BEAM-12456] Parallel querying in JdbcIO

Posted by GitBox <gi...@apache.org>.
daria-malkova commented on pull request #15049:
URL: https://github.com/apache/beam/pull/15049#issuecomment-866846566


   @aromanenko-dev Could you please review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #15049: [BEAM-12456] Parallel querying in JdbcIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #15049:
URL: https://github.com/apache/beam/pull/15049#issuecomment-869686013


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org