You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/04 22:17:06 UTC
[GitHub] [hudi] yihua commented on a diff in pull request #6031: [HUDI-4282] Repair IOException in some other dfs, except hdfs,when check block corrupted in HoodieLogFileReader
yihua commented on code in PR #6031:
URL: https://github.com/apache/hudi/pull/6031#discussion_r913281573
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -285,6 +288,22 @@ private boolean isBlockCorrupted(int blocksize) throws IOException {
// release-3.1.0-RC1/BufferedFSInputStream.java#L73
inputStream.seek(currentPos);
return true;
+ } catch (IOException e) {
+ if (logFile.getFileSize() < 0) {
+ long logFileSize = FSUtils.getFileSize(fs, logFile.getPath());
+ logFile.setFileLen(logFileSize);
+ }
+ if (endOfBlockPos > logFile.getFileSize() || endOfBlockPos < 0) {
+ LOG.info("Found corrupted block in file " + logFile + " with block size(" + blocksize + ") running past EOF");
+ // this is corrupt
+ // This seek is required because contract of seek() is different for naked DFSInputStream vs BufferedFSInputStream
+ // release-3.1.0-RC1/DFSInputStream.java#L1455
+ // release-3.1.0-RC1/BufferedFSInputStream.java#L73
+ inputStream.seek(currentPos);
+ return true;
+ } else {
+ throw e;
+ }
Review Comment:
Instead of changing the core reader logic here, could you add the scheme-specific logic to a new implementation of `FSDataInputStream` like `SchemeAwareFSDataInputStream` and integrate that through `getFSDataInputStream()`?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -285,6 +288,22 @@ private boolean isBlockCorrupted(int blocksize) throws IOException {
// release-3.1.0-RC1/BufferedFSInputStream.java#L73
inputStream.seek(currentPos);
return true;
+ } catch (IOException e) {
+ if (logFile.getFileSize() < 0) {
+ long logFileSize = FSUtils.getFileSize(fs, logFile.getPath());
+ logFile.setFileLen(logFileSize);
+ }
+ if (endOfBlockPos > logFile.getFileSize() || endOfBlockPos < 0) {
+ LOG.info("Found corrupted block in file " + logFile + " with block size(" + blocksize + ") running past EOF");
+ // this is corrupt
+ // This seek is required because contract of seek() is different for naked DFSInputStream vs BufferedFSInputStream
+ // release-3.1.0-RC1/DFSInputStream.java#L1455
+ // release-3.1.0-RC1/BufferedFSInputStream.java#L73
+ inputStream.seek(currentPos);
+ return true;
+ } else {
+ throw e;
+ }
Review Comment:
See here:
```
private static FSDataInputStream getFSDataInputStream(FileSystem fs,
HoodieLogFile logFile,
int bufferSize) throws IOException {
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
if (FSUtils.isGCSFileSystem(fs)) {
// in GCS FS, we might need to interceptor seek offsets as we might get EOF exception
return new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, logFile, bufferSize), true);
}
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
}
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize work?
return fsDataInputStream;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org