You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/27 09:23:53 UTC

[GitHub] [flink] luoyuxia opened a new pull request, #20377: [FLINK-27338][hive] Improve splitting file for Hive source

luoyuxia opened a new pull request, #20377:
URL: https://github.com/apache/flink/pull/20377

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
     - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact*
     - *Deployments RPC transmits only the blob storage reference*
     - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (100MB)*
     - *Extended integration test for recovery after master (JobManager) failure*
     - *Added test that validates that TaskInfo is transferred only once across recoveries*
     - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no)
     - The serializers: (yes / no / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
     - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #20377: [FLINK-27338][hive] Improve splitting file for Hive table with orc format

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on code in PR #20377:
URL: https://github.com/apache/flink/pull/20377#discussion_r941121136


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java:
##########
@@ -99,6 +99,10 @@ private void commitPartitionWithGivenCreateTime(
     private void preparePartitionMonitor() {
         List<List<String>> seenPartitionsSinceOffset = new ArrayList<>();
         JobConf jobConf = new JobConf();

Review Comment:
   Also, when adding test, I found the current implementation only works for orc format. When it's other format, although we set `mapreduce.input.fileinputformat.split.maxsize`, but the other formats won't take this congfiguration as consideration when call method `InputFormat#getSplits` to get splits.
   
   So, I add a note in the doc to tell these related configuration only works for orc format. And  I changes the code so that we only try to set `mapreduce.input.fileinputformat.split.maxsize` when it's orc format.
   As a result, the pr titlte also changes.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #20377: [FLINK-27338][hive] Improve splitting file for Hive table with orc format

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on code in PR #20377:
URL: https://github.com/apache/flink/pull/20377#discussion_r941114418


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java:
##########
@@ -99,6 +99,10 @@ private void commitPartitionWithGivenCreateTime(
     private void preparePartitionMonitor() {
         List<List<String>> seenPartitionsSinceOffset = new ArrayList<>();
         JobConf jobConf = new JobConf();

Review Comment:
   Now, I have added a test in [HiveSourceFileEnumeratorTest.java](https://github.com/apache/flink/pull/20377/files#diff-059eba29b7890b2ab4886612e37be7ca35f255ce8519a5afb43d1a3a9f34b401)  for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #20377: [FLINK-27338][hive] Improve splitting file for Hive table with orc format

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on code in PR #20377:
URL: https://github.com/apache/flink/pull/20377#discussion_r1037725058


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java:
##########
@@ -76,10 +98,68 @@ public static List<HiveSourceSplit> createInputSplits(
                 }
             }
         }
-
         return hiveSplits;
     }
 
+    private static boolean supportSetSplitMaxSize(List<HiveTablePartition> partitions) {
+        // now, the configuration 'HiveConf.ConfVars.MAPREDMAXSPLITSIZE' we set only
+        // works for orc format
+        for (HiveTablePartition partition : partitions) {

Review Comment:
   IIRC, the configuration `HiveConf.ConfVars.MAPREDMAXSPLITSIZE` only makes difference  in method `format.getSplits(jobConf, minNumSplits)`  which we delegate to get file splits.
   We can implement our logic to get file splits considering `maxsplitsize`, which should be suitable to all formats,  but currently, we only delegate to Hive's implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20377: [FLINK-27338][hive] Improve splitting file for Hive source

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20377:
URL: https://github.com/apache/flink/pull/20377#discussion_r939780282


##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java:
##########
@@ -99,6 +99,10 @@ private void commitPartitionWithGivenCreateTime(
     private void preparePartitionMonitor() {
         List<List<String>> seenPartitionsSinceOffset = new ArrayList<>();
         JobConf jobConf = new JobConf();

Review Comment:
   do we have any tests to verify the changes, e.g. the split number will change with `table.exec.hive.file-open-cost` change and `table.exec.hive.split-max-size` change 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hackeryard commented on a diff in pull request #20377: [FLINK-27338][hive] Improve splitting file for Hive table with orc format

Posted by GitBox <gi...@apache.org>.
hackeryard commented on code in PR #20377:
URL: https://github.com/apache/flink/pull/20377#discussion_r1036756306


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java:
##########
@@ -76,10 +98,68 @@ public static List<HiveSourceSplit> createInputSplits(
                 }
             }
         }
-
         return hiveSplits;
     }
 
+    private static boolean supportSetSplitMaxSize(List<HiveTablePartition> partitions) {
+        // now, the configuration 'HiveConf.ConfVars.MAPREDMAXSPLITSIZE' we set only
+        // works for orc format
+        for (HiveTablePartition partition : partitions) {

Review Comment:
   @luoyuxia why this only works for orc format, instead of all hdfs file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20377: [FLINK-27338][hive] Improve splitting file for Hive source

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20377:
URL: https://github.com/apache/flink/pull/20377#discussion_r939777764


##########
docs/content/docs/connectors/table/hive/hive_read_write.md:
##########
@@ -166,6 +166,39 @@ following parameters in `TableConfig` (note that these parameters affect all sou
   </tbody>
 </table>
 
+### Tuning Split Size While Reading Hive Table
+While reading Hive table, the data files will be enumerated into splits, one of which is a portion of data consumed by the source.
+Splits are granularity by which the source distributes the work and parallelize the data reading.
+Users can to do some performance tuning by tuning the split's size with the follow configurations.

Review Comment:
   can do



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/PartitionMonitorTest.java:
##########
@@ -99,6 +99,10 @@ private void commitPartitionWithGivenCreateTime(
     private void preparePartitionMonitor() {
         List<List<String>> seenPartitionsSinceOffset = new ArrayList<>();
         JobConf jobConf = new JobConf();

Review Comment:
   do we have nay more test to verify the changes, e.g. the split number will change with `table.exec.hive.file-open-cost` change and `table.exec.hive.split-max-size` change 



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##########
@@ -77,6 +78,22 @@ public class HiveOptions {
                     .withDescription(
                             "The thread number to split hive's partitions to splits. It should be bigger than 0.");
 
+    public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_SPLIT_MAX_BYTES =
+            key("table.exec.hive.split-max-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("128mb"))
+                    .withDescription(
+                            "The maximum number of bytes (default is 128MB) to pack into a split while reading Hive table. A split will be assigned to a reader.");
+
+    public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_FILE_OPEN_COST =
+            key("table.exec.hive.file-open-cost")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("4mb"))
+                    .withDescription(
+                            "The estimated cost (default is 4MB) to open a file. Used to split Hive's files to splits."
+                                    + " When the value is over estimated, Flink wll tend to pack Hive's data into less splits, which will help when Hive's table contains many some files."

Review Comment:
   The comment should be update: `which will be helpful when Hive's table contains many small files` ?



##########
docs/content/docs/connectors/table/hive/hive_read_write.md:
##########
@@ -166,6 +166,39 @@ following parameters in `TableConfig` (note that these parameters affect all sou
   </tbody>
 </table>
 
+### Tuning Split Size While Reading Hive Table
+While reading Hive table, the data files will be enumerated into splits, one of which is a portion of data consumed by the source.
+Splits are granularity by which the source distributes the work and parallelize the data reading.
+Users can to do some performance tuning by tuning the split's size with the follow configurations.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>table.exec.hive.split-max-size</h5></td>
+        <td style="word-wrap: break-word;">128mb</td>
+        <td>MemorySize</td>
+        <td>The maximum number of bytes (default is 128MB) to pack into a split while reading Hive table.</td>
+    </tr>
+    <tr>
+        <td><h5>table.exec.hive.file-open-cost</h5></td>
+        <td style="word-wrap: break-word;">4mb</td>
+        <td>MemorySize</td>
+        <td>The estimated cost (default is 4MB) to open a file. Used to enumerate Hive's files to splits.
+            If the value is over estimated, Flink wll tend to pack Hive's data into less splits, which will help when Hive's table contains many some files.

Review Comment:
   overestimated
   wll -> will
   which will help -> which will be helpful
   some -> small



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe closed pull request #20377: [FLINK-27338][hive] Improve splitting file for Hive table with orc format

Posted by GitBox <gi...@apache.org>.
godfreyhe closed pull request #20377: [FLINK-27338][hive] Improve splitting file for Hive table with orc format
URL: https://github.com/apache/flink/pull/20377


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] hackeryard commented on a diff in pull request #20377: [FLINK-27338][hive] Improve splitting file for Hive table with orc format

Posted by GitBox <gi...@apache.org>.
hackeryard commented on code in PR #20377:
URL: https://github.com/apache/flink/pull/20377#discussion_r1036803986


##########
docs/content.zh/docs/connectors/table/hive/hive_read_write.md:
##########
@@ -150,6 +150,41 @@ Flink 允许你灵活的配置并发推断策略。你可以在 `TableConfig` 
   </tbody>
 </table>
 
+### 读 Hive 表时调整数据分片(Split) 大小
+读 Hive 表时, 数据文件将会被切分为若干个分片(split), 每一个分片是要读取的数据的一部分。
+分片是 Flink 进行任务分配和数据并行读取的基本粒度。
+用户可以通过下面的参数来调整每个分片的大小来做一定的读性能调优。
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>table.exec.hive.split-max-size</h5></td>
+        <td style="word-wrap: break-word;">128mb</td>
+        <td>MemorySize</td>
+        <td>读 Hive 表时,每个分片最大可以包含的字节数 (默认是 128MB) 
+    </tr>
+    <tr>
+        <td><h5>table.exec.hive.file-open-cost</h5></td>
+        <td style="word-wrap: break-word;">4mb</td>
+        <td>MemorySize</td>
+        <td> 打开一个文件预估的开销,以字节为单位,默认是 4MB。
+             如果这个值比较大,Flink 则将会倾向于将 Hive 表切分为更少的分片,这在 Hive 表中包含大量小文件的时候很有用。

Review Comment:
   @luoyuxia when there are a lot of small files, such 1M/file, I think the split number is as the same of file numbers. So I don't understand why open cost is a good way to solved this problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #20377: [FLINK-27338][hive] Improve splitting file for Hive table with orc format

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on code in PR #20377:
URL: https://github.com/apache/flink/pull/20377#discussion_r1037727995


##########
docs/content.zh/docs/connectors/table/hive/hive_read_write.md:
##########
@@ -150,6 +150,41 @@ Flink 允许你灵活的配置并发推断策略。你可以在 `TableConfig` 
   </tbody>
 </table>
 
+### 读 Hive 表时调整数据分片(Split) 大小
+读 Hive 表时, 数据文件将会被切分为若干个分片(split), 每一个分片是要读取的数据的一部分。
+分片是 Flink 进行任务分配和数据并行读取的基本粒度。
+用户可以通过下面的参数来调整每个分片的大小来做一定的读性能调优。
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>table.exec.hive.split-max-size</h5></td>
+        <td style="word-wrap: break-word;">128mb</td>
+        <td>MemorySize</td>
+        <td>读 Hive 表时,每个分片最大可以包含的字节数 (默认是 128MB) 
+    </tr>
+    <tr>
+        <td><h5>table.exec.hive.file-open-cost</h5></td>
+        <td style="word-wrap: break-word;">4mb</td>
+        <td>MemorySize</td>
+        <td> 打开一个文件预估的开销,以字节为单位,默认是 4MB。
+             如果这个值比较大,Flink 则将会倾向于将 Hive 表切分为更少的分片,这在 Hive 表中包含大量小文件的时候很有用。

Review Comment:
   We use the `opencost` to calculate `maxsplitsize`, to put it simply, bigger `opencost`, bigger `maxsplitsize`. 
   Take 1M/file with orc format as an example, if it only contains one strip, the whole file should be a single split. But if it contains more strips, which means it can be splitted to more splits, but with a  bigger `maxsplitsize`, less splits can be generated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on pull request #20377: [FLINK-27338][hive] Improve splitting file for Hive table with orc format

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on PR #20377:
URL: https://github.com/apache/flink/pull/20377#issuecomment-1209247982

   @godfreyhe Thanks for reviewing. I have addressed your comments. Could you please help review again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20377: [FLINK-27338][hive] Improve splitting file for Hive source

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20377:
URL: https://github.com/apache/flink/pull/20377#issuecomment-1196489950

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10c644b4d1d3f2d119e5c3c4da31a868ceb9f8fa",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "10c644b4d1d3f2d119e5c3c4da31a868ceb9f8fa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 10c644b4d1d3f2d119e5c3c4da31a868ceb9f8fa UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #20377: [FLINK-27338][hive] Improve splitting file for Hive table with orc format

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on code in PR #20377:
URL: https://github.com/apache/flink/pull/20377#discussion_r1037725058


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java:
##########
@@ -76,10 +98,68 @@ public static List<HiveSourceSplit> createInputSplits(
                 }
             }
         }
-
         return hiveSplits;
     }
 
+    private static boolean supportSetSplitMaxSize(List<HiveTablePartition> partitions) {
+        // now, the configuration 'HiveConf.ConfVars.MAPREDMAXSPLITSIZE' we set only
+        // works for orc format
+        for (HiveTablePartition partition : partitions) {

Review Comment:
   IIRC, the configuration `HiveConf.ConfVars.MAPREDMAXSPLITSIZE` only makes difference for ORC format  when it comes to method `format.getSplits(jobConf, minNumSplits)`  which we delegate to get file splits.
   We can implement our logic to get file splits considering `maxsplitsize`, which should be suitable to all formats,  but currently, we only delegate to Hive's implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org