You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/08/05 06:16:47 UTC
[incubator-doris] branch master updated: [Broker] Fix bug that
broker can not handle FSDataInputStream which does not implement
ByteBufferReadable (#6308)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 12730e7 [Broker] Fix bug that broker can not handle FSDataInputStream which does not implement ByteBufferReadable (#6308)
12730e7 is described below
commit 12730e7a3b74799f1ca548e8f65b3e80d52911bd
Author: Zhanfeng Huo <hu...@gmail.com>
AuthorDate: Thu Aug 5 14:16:36 2021 +0800
[Broker] Fix bug that broker can not handle FSDataInputStream which does not implement ByteBufferReadable (#6308)
Currently, Doris supports loading OSS/S3A files by using params like fs.s3a.access.key, but there is a bug when using it to load such type files. The root cause is broker can not handle FSDataInputStream which does not implement ByteBufferReadable.
See Issue #6307
S3A input stream to support ByteBufferReadable
https://issues.apache.org/jira/browse/HADOOP-14603
---
.../doris/broker/hdfs/FileSystemManager.java | 39 +++++++++-------------
1 file changed, 16 insertions(+), 23 deletions(-)
diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index de04a07..576f3da 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -561,22 +561,25 @@ public class FileSystemManager {
currentStreamOffset, offset);
}
}
- ByteBuffer buf;
+ // Avoid using the ByteBuffer based read for Hadoop because some FSDataInputStream
+ // implementations are not ByteBufferReadable,
+ // See https://issues.apache.org/jira/browse/HADOOP-14603
+ byte[] buf;
if (length > readBufferSize) {
- buf = ByteBuffer.allocate(readBufferSize);
+ buf = new byte[readBufferSize];
} else {
- buf = ByteBuffer.allocate((int) length);
+ buf = new byte[(int) length];
}
try {
- int readLength = readByteBufferFully(fsDataInputStream, buf);
+ int readLength = readBytesFully(fsDataInputStream, buf);
if (readLength < 0) {
throw new BrokerException(TBrokerOperationStatusCode.END_OF_FILE,
"end of file reached");
}
if (logger.isDebugEnabled()) {
- logger.debug("read buffer from input stream, buffer size:" + buf.capacity() + ", read length:" + readLength);
+ logger.debug("read buffer from input stream, buffer size:" + buf.length + ", read length:" + readLength);
}
- return buf;
+ return ByteBuffer.wrap(buf, 0, readLength);
} catch (IOException e) {
logger.error("errors while read data from stream", e);
throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
@@ -674,27 +677,17 @@ public class FileSystemManager {
return new TBrokerFD(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
}
- private int readByteBuffer(FSDataInputStream is, ByteBuffer dest) throws IOException {
- int pos = dest.position();
- int result = is.read(dest);
- if (result > 0) {
- // Ensure this explicitly since versions before 2.7 read doesn't do it.
- dest.position(pos + result);
- }
- return result;
- }
-
- private int readByteBufferFully(FSDataInputStream is, ByteBuffer dest) throws IOException {
- int result = 0;
- while (dest.remaining() > 0) {
- int n = readByteBuffer(is, dest);
+ private int readBytesFully(FSDataInputStream is, byte[] dest) throws IOException {
+ int readLength = 0;
+ while (readLength < dest.length) {
+ int availableReadLength = dest.length - readLength;
+ int n = is.read(dest, readLength, availableReadLength);
if (n <= 0) {
break;
}
- result += n;
+ readLength += n;
}
- dest.flip();
- return result;
+ return readLength;
}
class FileSystemExpirationChecker implements Runnable {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org