You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/08/05 06:16:47 UTC

[incubator-doris] branch master updated: [Broker] Fix bug that broker can not handle FSDataInputStream which does not implement ByteBufferReadable (#6308)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 12730e7  [Broker] Fix bug that broker can not handle FSDataInputStream which does not implement ByteBufferReadable (#6308)
12730e7 is described below

commit 12730e7a3b74799f1ca548e8f65b3e80d52911bd
Author: Zhanfeng Huo <hu...@gmail.com>
AuthorDate: Thu Aug 5 14:16:36 2021 +0800

    [Broker] Fix bug that broker can not handle FSDataInputStream which does not implement ByteBufferReadable (#6308)
    
    Currently, Doris supports loading OSS/S3A files by using params like fs.s3a.access.key, but there is a bug when using it to load such type files. The root cause is broker can not handle FSDataInputStream which does not implement ByteBufferReadable.
    
    See Issue #6307
    S3A input stream to support ByteBufferReadable
    https://issues.apache.org/jira/browse/HADOOP-14603
---
 .../doris/broker/hdfs/FileSystemManager.java       | 39 +++++++++-------------
 1 file changed, 16 insertions(+), 23 deletions(-)

diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index de04a07..576f3da 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -561,22 +561,25 @@ public class FileSystemManager {
                             currentStreamOffset, offset);
                 }
             }
-            ByteBuffer buf;
+            // Avoid using the ByteBuffer based read for Hadoop because some FSDataInputStream
+            // implementations are not ByteBufferReadable,
+            // See https://issues.apache.org/jira/browse/HADOOP-14603
+            byte[] buf;
             if (length > readBufferSize) {
-                buf = ByteBuffer.allocate(readBufferSize);
+                buf = new byte[readBufferSize];
             } else {
-                buf = ByteBuffer.allocate((int) length);
+                buf = new byte[(int) length];
             }
             try {
-                int readLength = readByteBufferFully(fsDataInputStream, buf);
+                int readLength = readBytesFully(fsDataInputStream, buf);
                 if (readLength < 0) {
                     throw new BrokerException(TBrokerOperationStatusCode.END_OF_FILE,
                             "end of file reached");
                 }
                 if (logger.isDebugEnabled()) {
-                    logger.debug("read buffer from input stream, buffer size:" + buf.capacity() + ", read length:" + readLength);
+                    logger.debug("read buffer from input stream, buffer size:" + buf.length + ", read length:" + readLength);
                 }
-                return buf;
+                return ByteBuffer.wrap(buf, 0, readLength);
             } catch (IOException e) {
                 logger.error("errors while read data from stream", e);
                 throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
@@ -674,27 +677,17 @@ public class FileSystemManager {
         return new TBrokerFD(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
     }
 
-    private int readByteBuffer(FSDataInputStream is, ByteBuffer dest) throws IOException {
-        int pos = dest.position();
-        int result = is.read(dest);
-        if (result > 0) {
-            // Ensure this explicitly since versions before 2.7 read doesn't do it.
-            dest.position(pos + result);
-        }
-        return result;
-    }
-
-    private int readByteBufferFully(FSDataInputStream is, ByteBuffer dest) throws IOException {
-        int result = 0;
-        while (dest.remaining() > 0) {
-            int n = readByteBuffer(is, dest);
+    private int readBytesFully(FSDataInputStream is, byte[] dest) throws IOException {
+        int readLength = 0;
+        while (readLength < dest.length) {
+            int availableReadLength = dest.length - readLength;
+            int n = is.read(dest, readLength, availableReadLength);
             if (n <= 0) {
                 break;
             }
-            result += n;
+            readLength += n;
         }
-        dest.flip();
-        return result;
+        return readLength;
     }
     
     class FileSystemExpirationChecker implements Runnable {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org