You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/04 22:17:06 UTC

[GitHub] [hudi] yihua commented on a diff in pull request #6031: [HUDI-4282] Repair IOException in some other dfs, except hdfs,when check block corrupted in HoodieLogFileReader

yihua commented on code in PR #6031:
URL: https://github.com/apache/hudi/pull/6031#discussion_r913281573


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -285,6 +288,22 @@ private boolean isBlockCorrupted(int blocksize) throws IOException {
       // release-3.1.0-RC1/BufferedFSInputStream.java#L73
       inputStream.seek(currentPos);
       return true;
+    } catch (IOException e) {
+      if (logFile.getFileSize() < 0) {
+        long logFileSize = FSUtils.getFileSize(fs, logFile.getPath());
+        logFile.setFileLen(logFileSize);
+      }
+      if (endOfBlockPos > logFile.getFileSize() || endOfBlockPos < 0) {
+        LOG.info("Found corrupted block in file " + logFile + " with block size(" + blocksize + ") running past EOF");
+        // this is corrupt
+        // This seek is required because contract of seek() is different for naked DFSInputStream vs BufferedFSInputStream
+        // release-3.1.0-RC1/DFSInputStream.java#L1455
+        // release-3.1.0-RC1/BufferedFSInputStream.java#L73
+        inputStream.seek(currentPos);
+        return true;
+      } else {
+        throw e;
+      }

Review Comment:
   Instead of changing the core reader logic here, could you add the scheme-specific logic to a new implementation of `FSDataInputStream` like `SchemeAwareFSDataInputStream` and integrate that through `getFSDataInputStream()`?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -285,6 +288,22 @@ private boolean isBlockCorrupted(int blocksize) throws IOException {
       // release-3.1.0-RC1/BufferedFSInputStream.java#L73
       inputStream.seek(currentPos);
       return true;
+    } catch (IOException e) {
+      if (logFile.getFileSize() < 0) {
+        long logFileSize = FSUtils.getFileSize(fs, logFile.getPath());
+        logFile.setFileLen(logFileSize);
+      }
+      if (endOfBlockPos > logFile.getFileSize() || endOfBlockPos < 0) {
+        LOG.info("Found corrupted block in file " + logFile + " with block size(" + blocksize + ") running past EOF");
+        // this is corrupt
+        // This seek is required because contract of seek() is different for naked DFSInputStream vs BufferedFSInputStream
+        // release-3.1.0-RC1/DFSInputStream.java#L1455
+        // release-3.1.0-RC1/BufferedFSInputStream.java#L73
+        inputStream.seek(currentPos);
+        return true;
+      } else {
+        throw e;
+      }

Review Comment:
   See here:
   ```
   private static FSDataInputStream getFSDataInputStream(FileSystem fs,
                                                           HoodieLogFile logFile,
                                                           int bufferSize) throws IOException {
       FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
   
       if (FSUtils.isGCSFileSystem(fs)) {
         // in GCS FS, we might need to interceptor seek offsets as we might get EOF exception
         return new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, logFile, bufferSize), true);
       }
   
       if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
         return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
             new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
       }
   
       // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
       // need to wrap in another BufferedFSInputStream the make bufferSize work?
       return fsDataInputStream;
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org