You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/25 10:53:51 UTC

[GitHub] [flink] AHeise commented on a diff in pull request #18299: [FLINK-25311][core] Fix deal with delimited compressed file not correctly

AHeise commented on code in PR #18299:
URL: https://github.com/apache/flink/pull/18299#discussion_r857500589


##########
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java:
##########
@@ -899,6 +899,8 @@ protected FSDataInputStream decorateInputStream(
         InflaterInputStreamFactory<?> inflaterInputStreamFactory =
                 getInflaterInputStreamFactory(fileSplit.getPath());
         if (inflaterInputStreamFactory != null) {
+            // compressed format should use splitLength specially
+            this.splitLength = -1;

Review Comment:
   This fix looks odd to me. First, we are modifying a parameter, which is always a sign that this should go to call site. Second, here at this point, I cannot see that we are guaranteed to not have 2 splits on the same file and reading duplicate data if we simply change splitLength here. Third, this should probably use `READ_WHOLE_SPLIT_FLAG`.
   All in all the proper place to fix it is in `createInputSplits`. 
   
   The actual bug is in 
   ```
           if (unsplittable) { // should be testForUnsplittable(file)
               int splitNum = 0;
               for (final FileStatus file : files) {
                   final FileSystem fs = file.getPath().getFileSystem();
                   final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen());
                   Set<String> hosts = new HashSet<String>();
                   for (BlockLocation block : blocks) {
                       hosts.addAll(Arrays.asList(block.getHosts()));
                   }
                   long len = file.getLen();
                   if (testForUnsplittable(file)) { // this doesn't make any sense at this point
                       len = READ_WHOLE_SPLIT_FLAG;
                   }
                   FileInputSplit fis =
                           new FileInputSplit(
                                   splitNum++,
                                   file.getPath(),
                                   0,
                                   len,
                                   hosts.toArray(new String[hosts.size()]));
                   inputSplits.add(fis);
               }
               return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org