You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/06/16 02:19:29 UTC

[GitHub] [incubator-doris] xy720 opened a new pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

xy720 opened a new pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878


   Please see the description in #3877 .


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yiguolei commented on pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

Posted by GitBox <gi...@apache.org>.
yiguolei commented on pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878#issuecomment-644500884


   Great job


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878#discussion_r443290119



##########
File path: be/src/exec/buffered_reader.cpp
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/buffered_reader.h"
+
+#include <sstream>
+#include <algorithm>
+
+#include "common/logging.h"
+
+namespace doris {
+
+// buffered reader
+
+BufferedReader::BufferedReader(FileReader* reader)
+        : _reader(reader),
+          _buffer_size(1024 * 1024),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
+        : _reader(reader),
+          _buffer_size(buffer_size),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::~BufferedReader() {
+    close();
+}
+
+Status BufferedReader::open() {
+    if (!_reader) {
+        std::stringstream ss;
+        ss << "Open buffered reader failed, reader is null";
+        return Status::InternalError(ss.str());
+    }
+    RETURN_IF_ERROR(_reader->open());
+    RETURN_IF_ERROR(_fill());
+    return Status::OK();
+}
+
+//not support
+Status BufferedReader::read_one_message(uint8_t** buf, size_t* length) {
+    return Status::NotSupported("Not support");
+}
+
+Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
+    DCHECK_NE(*buf_len, 0);
+    int64_t bytes_read;
+    RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, &bytes_read, buf));
+    if (bytes_read == 0) {
+        *eof = true;
+    } else {
+        *eof = false;
+    }
+    return Status::OK();
+}
+
+Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+    if (nbytes <= 0) {
+        *bytes_read = 0;
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(_read_once(position, nbytes, bytes_read, out));
+    //EOF
+    if (*bytes_read <= 0) {
+        return Status::OK();
+    }
+    while (*bytes_read < nbytes) {
+        int64_t len;
+        RETURN_IF_ERROR(_read_once(position + *bytes_read, nbytes - *bytes_read, &len, reinterpret_cast<char*>(out) + *bytes_read));
+        // EOF
+        if (len <= 0) {
+            break;
+        }
+        *bytes_read += len;
+    }
+    return Status::OK();
+}
+
+Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+    // requested bytes missed the local buffer
+    if (position >= _buffer_limit || position < _buffer_offset) {
+        // if requested length is larger than the capacity of buffer, do not
+        // need to copy the character into local buffer.
+        if (nbytes > _buffer_limit - _buffer_offset) {

Review comment:
       You are right.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman merged pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

Posted by GitBox <gi...@apache.org>.
morningman merged pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878#discussion_r442327705



##########
File path: be/src/exec/buffered_reader.h
##########
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "olap/olap_define.h"
+#include "exec/file_reader.h"
+
+namespace doris {
+
+// Buffered Reader of broker
+class BufferedReader : public FileReader {
+public:
+    // If the reader need the file size, set it when construct BrokerReader.
+    // There is no other way to set the file size.
+    BufferedReader(FileReader* reader);
+    BufferedReader(FileReader* reader, int64_t buffer_size);
+    virtual ~BufferedReader();
+
+    virtual Status open() override;
+
+    // Read 
+    virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
+    virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override;
+    virtual Status read_one_message(uint8_t** buf, size_t* length) override;
+    virtual int64_t size() override;
+    virtual Status seek(int64_t position) override;
+    virtual Status tell(int64_t* position) override;
+    virtual void close() override;
+    virtual bool closed() override;
+
+private:
+    Status fill();
+    Status read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out);

Review comment:
       done

##########
File path: be/src/exec/buffered_reader.h
##########
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "olap/olap_define.h"
+#include "exec/file_reader.h"
+
+namespace doris {
+
+// Buffered Reader of broker
+class BufferedReader : public FileReader {
+public:
+    // If the reader need the file size, set it when construct BrokerReader.
+    // There is no other way to set the file size.
+    BufferedReader(FileReader* reader);
+    BufferedReader(FileReader* reader, int64_t buffer_size);
+    virtual ~BufferedReader();
+
+    virtual Status open() override;
+
+    // Read 
+    virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
+    virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override;
+    virtual Status read_one_message(uint8_t** buf, size_t* length) override;
+    virtual int64_t size() override;
+    virtual Status seek(int64_t position) override;
+    virtual Status tell(int64_t* position) override;
+    virtual void close() override;
+    virtual bool closed() override;
+
+private:
+    Status fill();

Review comment:
       done

##########
File path: be/src/exec/buffered_reader.h
##########
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "olap/olap_define.h"
+#include "exec/file_reader.h"
+
+namespace doris {
+
+// Buffered Reader of broker
+class BufferedReader : public FileReader {
+public:
+    // If the reader need the file size, set it when construct BrokerReader.

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878#discussion_r442327238



##########
File path: be/src/exec/buffered_reader.cpp
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/buffered_reader.h"
+
+#include <sstream>
+#include <algorithm>
+
+#include "common/logging.h"
+
+namespace doris {
+
+// buffered reader
+
+BufferedReader::BufferedReader(FileReader* reader)
+        : _reader(reader),
+          _buffer_size(1024 * 1024),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
+        : _reader(reader),
+          _buffer_size(buffer_size),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::~BufferedReader() {
+    close();
+}
+
+Status BufferedReader::open() {
+    if (!_reader) {
+        std::stringstream ss;
+        ss << "Open buffered reader failed, reader is null";
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+    RETURN_IF_ERROR(_reader->open());
+    RETURN_IF_ERROR(fill());

Review comment:
       Yes. If not call fill() in open(), the actual length of the buffer will be 0 when user call read().




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wutiangan commented on a change in pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

Posted by GitBox <gi...@apache.org>.
wutiangan commented on a change in pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878#discussion_r443313805



##########
File path: be/src/exec/broker_reader.cpp
##########
@@ -155,6 +155,8 @@ Status BrokerReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_rea
             return status;
         }
 
+        LOG(DEBUG) << "send readat request to broker:" << broker_addr << " position:" << position << ", read bytes length:" << nbytes;

Review comment:
       what is "readat ?"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wutiangan commented on a change in pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

Posted by GitBox <gi...@apache.org>.
wutiangan commented on a change in pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878#discussion_r443315827



##########
File path: be/src/exec/buffered_reader.cpp
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/buffered_reader.h"
+
+#include <sstream>
+#include <algorithm>
+
+#include "common/logging.h"
+
+namespace doris {
+
+// buffered reader
+
+BufferedReader::BufferedReader(FileReader* reader)
+        : _reader(reader),
+          _buffer_size(1024 * 1024),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}

Review comment:
       maybe this construct funciton can be remove, for this is duplicated code.
   you can add default paremerts to constructor function bellow.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878#discussion_r442599424



##########
File path: be/src/exec/buffered_reader.h
##########
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "olap/olap_define.h"
+#include "exec/file_reader.h"
+
+namespace doris {
+
+// Buffered Reader of broker

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878#discussion_r443125887



##########
File path: be/test/exec/buffered_reader_test.cpp
##########
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "exec/local_file_reader.h"
+#include "exec/buffered_reader.h"
+#include "util/stopwatch.hpp"
+
+namespace doris {
+class BufferedReaderTest : public testing::Test {
+public:
+    BufferedReaderTest() {}
+
+protected:
+    virtual void SetUp() {
+    }
+    virtual void TearDown() {
+    }
+};
+
+TEST_F(BufferedReaderTest, normal_use) {
+    LocalFileReader file_reader(
+            "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file", 0);
+    BufferedReader reader(&file_reader);
+    auto st = reader.open();
+    ASSERT_TRUE(st.ok());
+    uint8_t buf[32 * 1024];
+    MonotonicStopWatch watch;
+    watch.start();
+    bool eof = false;
+    size_t read_length = 0;
+    while (!eof) {
+        size_t buf_len = 32 * 1024;
+        st = reader.read(buf, &buf_len, &eof);
+        ASSERT_TRUE(st.ok());
+        read_length += buf_len;
+    }

Review comment:
       You need to check the result, or the unit test is meaningless

##########
File path: be/src/exec/buffered_reader.cpp
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/buffered_reader.h"
+
+#include <sstream>
+#include <algorithm>
+
+#include "common/logging.h"
+
+namespace doris {
+
+// buffered reader
+
+BufferedReader::BufferedReader(FileReader* reader)
+        : _reader(reader),
+          _buffer_size(1024 * 1024),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
+        : _reader(reader),
+          _buffer_size(buffer_size),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::~BufferedReader() {
+    close();
+}
+
+Status BufferedReader::open() {
+    if (!_reader) {
+        std::stringstream ss;
+        ss << "Open buffered reader failed, reader is null";
+        return Status::InternalError(ss.str());
+    }
+    RETURN_IF_ERROR(_reader->open());
+    RETURN_IF_ERROR(_fill());
+    return Status::OK();
+}
+
+//not support
+Status BufferedReader::read_one_message(uint8_t** buf, size_t* length) {
+    return Status::NotSupported("Not support");
+}
+
+Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
+    DCHECK_NE(*buf_len, 0);
+    int64_t bytes_read;
+    RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, &bytes_read, buf));
+    if (bytes_read == 0) {
+        *eof = true;
+    } else {
+        *eof = false;
+    }
+    return Status::OK();
+}
+
+Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+    if (nbytes <= 0) {
+        *bytes_read = 0;
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(_read_once(position, nbytes, bytes_read, out));
+    //EOF
+    if (*bytes_read <= 0) {
+        return Status::OK();
+    }
+    while (*bytes_read < nbytes) {
+        int64_t len;
+        RETURN_IF_ERROR(_read_once(position + *bytes_read, nbytes - *bytes_read, &len, reinterpret_cast<char*>(out) + *bytes_read));
+        // EOF
+        if (len <= 0) {
+            break;
+        }
+        *bytes_read += len;
+    }
+    return Status::OK();
+}
+
+Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+    // requested bytes missed the local buffer
+    if (position >= _buffer_limit || position < _buffer_offset) {
+        // if requested length is larger than the capacity of buffer, do not
+        // need to copy the character into local buffer.
+        if (nbytes > _buffer_limit - _buffer_offset) {

Review comment:
       should it be `nbytes > _buffer_size ` ?

##########
File path: be/test/exec/buffered_reader_test.cpp
##########
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "exec/local_file_reader.h"
+#include "exec/buffered_reader.h"
+#include "util/stopwatch.hpp"
+
+namespace doris {
+class BufferedReaderTest : public testing::Test {
+public:
+    BufferedReaderTest() {}
+
+protected:
+    virtual void SetUp() {
+    }
+    virtual void TearDown() {
+    }
+};
+
+TEST_F(BufferedReaderTest, normal_use) {
+    LocalFileReader file_reader(
+            "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file", 0);
+    BufferedReader reader(&file_reader);
+    auto st = reader.open();
+    ASSERT_TRUE(st.ok());
+    uint8_t buf[32 * 1024];
+    MonotonicStopWatch watch;
+    watch.start();
+    bool eof = false;
+    size_t read_length = 0;
+    while (!eof) {
+        size_t buf_len = 32 * 1024;
+        st = reader.read(buf, &buf_len, &eof);
+        ASSERT_TRUE(st.ok());
+        read_length += buf_len;
+    }
+
+    LOG(INFO) << "read bytes " << read_length << " using time " << watch.elapsed_time();
+}
+
+TEST_F(BufferedReaderTest, test_validity) {
+    LocalFileReader file_reader(
+            "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
+    BufferedReader reader(&file_reader, 128 * 1024);
+    auto st = reader.open();
+    ASSERT_TRUE(st.ok());
+    uint8_t buf[10];
+    bool eof = false;
+    size_t buf_len = 10;
+
+    st = reader.read(buf, &buf_len, &eof);
+    ASSERT_TRUE(st.ok());
+    ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str());

Review comment:
       check the `buf_len` after `read()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangkaisen commented on pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

Posted by GitBox <gi...@apache.org>.
kangkaisen commented on pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878#issuecomment-644515644


   @xy720 Hi, Please add the UT for new BufferedReader class, Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878#discussion_r443314176



##########
File path: be/src/exec/broker_reader.cpp
##########
@@ -155,6 +155,8 @@ Status BrokerReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_rea
             return status;
         }
 
+        LOG(DEBUG) << "send readat request to broker:" << broker_addr << " position:" << position << ", read bytes length:" << nbytes;

Review comment:
       > what is "readat ?"
   
   Like `pread`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878#discussion_r442327599



##########
File path: be/src/exec/buffered_reader.cpp
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/buffered_reader.h"
+
+#include <sstream>
+#include <algorithm>
+
+#include "common/logging.h"
+
+namespace doris {
+
+// buffered reader
+
+BufferedReader::BufferedReader(FileReader* reader)
+        : _reader(reader),
+          _buffer_size(1024 * 1024),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
+        : _reader(reader),
+          _buffer_size(buffer_size),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::~BufferedReader() {
+    close();
+}
+
+Status BufferedReader::open() {
+    if (!_reader) {
+        std::stringstream ss;
+        ss << "Open buffered reader failed, reader is null";
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+    RETURN_IF_ERROR(_reader->open());
+    RETURN_IF_ERROR(fill());
+    return Status::OK();
+}
+
+//not support
+Status BufferedReader::read_one_message(uint8_t** buf, size_t* length) {
+    return Status::NotSupported("Not support");
+}
+
+Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
+    DCHECK_NE(*buf_len, 0);
+    int64_t bytes_read;
+    RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, &bytes_read, buf));
+    if (bytes_read == 0) {
+        *eof = true;
+    } else {
+        *eof = false;
+    }
+    return Status::OK();
+}
+
+Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+    if (nbytes <= 0) {
+        *bytes_read = 0;
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(read_once(position, nbytes, bytes_read, out));
+    //EOF
+    if (*bytes_read <= 0) {
+        return Status::OK();
+    }
+    while (*bytes_read < nbytes) {
+        int64_t len;
+        RETURN_IF_ERROR(read_once(position + *bytes_read, nbytes - *bytes_read, &len, reinterpret_cast<char*>(out) + *bytes_read));
+        // EOF
+        if (len <= 0) {
+            break;
+        }
+        *bytes_read += len;
+    }
+    return Status::OK();
+}
+
+Status BufferedReader::read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+    // requested bytes missed the local buffer
+    if (position >= _buffer_limit || position < _buffer_offset) {
+        // if requested length is larger than the capacity of buffer, do not
+        // need to copy the character into local buffer.
+        if (nbytes > _buffer_limit - _buffer_offset) {
+            return _reader->readat(position, nbytes, bytes_read, out);
+        }
+        _buffer_offset = position;
+        RETURN_IF_ERROR(fill());
+        if (position >= _buffer_limit) {
+            *bytes_read = 0;
+            return Status::OK();
+        }
+    } 
+    int64_t len = std::min(_buffer_limit - position, nbytes);
+    int64_t off = position - _buffer_offset;
+    memcpy(out, _buffer + off, len);
+    *bytes_read = len;
+    _cur_offset = position + *bytes_read;
+    return Status::OK();
+}
+
+Status BufferedReader::fill() {
+    if (_buffer_offset >= 0) {
+        int64_t bytes_read;
+        int retry_times = 1;

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangkaisen commented on a change in pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

Posted by GitBox <gi...@apache.org>.
kangkaisen commented on a change in pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878#discussion_r440570998



##########
File path: be/src/exec/buffered_reader.h
##########
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "olap/olap_define.h"
+#include "exec/file_reader.h"
+
+namespace doris {
+
+// Buffered Reader of broker

Review comment:
       Please add the comment to describe why do we need this class, and when should we use it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878#discussion_r443314263



##########
File path: be/src/exec/broker_reader.cpp
##########
@@ -155,6 +155,8 @@ Status BrokerReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_rea
             return status;
         }
 
+        LOG(DEBUG) << "send readat request to broker:" << broker_addr << " position:" << position << ", read bytes length:" << nbytes;

Review comment:
       Yes, it's confusing.  Better named it as pread request.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878#discussion_r440953854



##########
File path: be/src/exec/buffered_reader.h
##########
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "olap/olap_define.h"
+#include "exec/file_reader.h"
+
+namespace doris {
+
+// Buffered Reader of broker
+class BufferedReader : public FileReader {
+public:
+    // If the reader need the file size, set it when construct BrokerReader.
+    // There is no other way to set the file size.
+    BufferedReader(FileReader* reader);
+    BufferedReader(FileReader* reader, int64_t buffer_size);
+    virtual ~BufferedReader();
+
+    virtual Status open() override;
+
+    // Read 
+    virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
+    virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override;
+    virtual Status read_one_message(uint8_t** buf, size_t* length) override;
+    virtual int64_t size() override;
+    virtual Status seek(int64_t position) override;
+    virtual Status tell(int64_t* position) override;
+    virtual void close() override;
+    virtual bool closed() override;
+
+private:
+    Status fill();

Review comment:
       ```suggestion
       Status _fill();
   ```

##########
File path: be/src/exec/buffered_reader.h
##########
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "olap/olap_define.h"
+#include "exec/file_reader.h"
+
+namespace doris {
+
+// Buffered Reader of broker
+class BufferedReader : public FileReader {
+public:
+    // If the reader need the file size, set it when construct BrokerReader.

Review comment:
       ```suggestion
       // If the reader need the file size, set it when construct FileReader.
   ```

##########
File path: be/src/exec/buffered_reader.cpp
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/buffered_reader.h"
+
+#include <sstream>
+#include <algorithm>
+
+#include "common/logging.h"
+
+namespace doris {
+
+// buffered reader
+
+BufferedReader::BufferedReader(FileReader* reader)
+        : _reader(reader),
+          _buffer_size(1024 * 1024),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
+        : _reader(reader),
+          _buffer_size(buffer_size),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::~BufferedReader() {
+    close();
+}
+
+Status BufferedReader::open() {
+    if (!_reader) {
+        std::stringstream ss;
+        ss << "Open buffered reader failed, reader is null";
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+    RETURN_IF_ERROR(_reader->open());
+    RETURN_IF_ERROR(fill());
+    return Status::OK();
+}
+
+//not support
+Status BufferedReader::read_one_message(uint8_t** buf, size_t* length) {
+    return Status::NotSupported("Not support");
+}
+
+Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
+    DCHECK_NE(*buf_len, 0);
+    int64_t bytes_read;
+    RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, &bytes_read, buf));
+    if (bytes_read == 0) {
+        *eof = true;
+    } else {
+        *eof = false;
+    }
+    return Status::OK();
+}
+
+Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+    if (nbytes <= 0) {
+        *bytes_read = 0;
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(read_once(position, nbytes, bytes_read, out));
+    //EOF
+    if (*bytes_read <= 0) {
+        return Status::OK();
+    }
+    while (*bytes_read < nbytes) {
+        int64_t len;
+        RETURN_IF_ERROR(read_once(position + *bytes_read, nbytes - *bytes_read, &len, reinterpret_cast<char*>(out) + *bytes_read));
+        // EOF
+        if (len <= 0) {
+            break;
+        }
+        *bytes_read += len;
+    }
+    return Status::OK();
+}
+
+Status BufferedReader::read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+    // requested bytes missed the local buffer
+    if (position >= _buffer_limit || position < _buffer_offset) {
+        // if requested length is larger than the capacity of buffer, do not
+        // need to copy the character into local buffer.
+        if (nbytes > _buffer_limit - _buffer_offset) {
+            return _reader->readat(position, nbytes, bytes_read, out);
+        }
+        _buffer_offset = position;
+        RETURN_IF_ERROR(fill());
+        if (position >= _buffer_limit) {
+            *bytes_read = 0;
+            return Status::OK();
+        }
+    } 
+    int64_t len = std::min(_buffer_limit - position, nbytes);
+    int64_t off = position - _buffer_offset;
+    memcpy(out, _buffer + off, len);
+    *bytes_read = len;
+    _cur_offset = position + *bytes_read;
+    return Status::OK();
+}
+
+Status BufferedReader::fill() {
+    if (_buffer_offset >= 0) {
+        int64_t bytes_read;
+        int retry_times = 1;

Review comment:
       What is this retry for? add comment for it.

##########
File path: be/src/exec/buffered_reader.cpp
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/buffered_reader.h"
+
+#include <sstream>
+#include <algorithm>
+
+#include "common/logging.h"
+
+namespace doris {
+
+// buffered reader
+
+BufferedReader::BufferedReader(FileReader* reader)
+        : _reader(reader),
+          _buffer_size(1024 * 1024),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
+        : _reader(reader),
+          _buffer_size(buffer_size),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::~BufferedReader() {
+    close();
+}
+
+Status BufferedReader::open() {
+    if (!_reader) {
+        std::stringstream ss;
+        ss << "Open buffered reader failed, reader is null";
+        LOG(WARNING) << ss.str();

Review comment:
       Let the caller print the log, not here.

##########
File path: be/src/exec/buffered_reader.h
##########
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "olap/olap_define.h"
+#include "exec/file_reader.h"
+
+namespace doris {
+
+// Buffered Reader of broker
+class BufferedReader : public FileReader {
+public:
+    // If the reader need the file size, set it when construct BrokerReader.
+    // There is no other way to set the file size.
+    BufferedReader(FileReader* reader);
+    BufferedReader(FileReader* reader, int64_t buffer_size);
+    virtual ~BufferedReader();
+
+    virtual Status open() override;
+
+    // Read 
+    virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
+    virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override;
+    virtual Status read_one_message(uint8_t** buf, size_t* length) override;
+    virtual int64_t size() override;
+    virtual Status seek(int64_t position) override;
+    virtual Status tell(int64_t* position) override;
+    virtual void close() override;
+    virtual bool closed() override;
+
+private:
+    Status fill();
+    Status read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out);

Review comment:
       ```suggestion
       Status _read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out);
   ```

##########
File path: be/src/exec/buffered_reader.cpp
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/buffered_reader.h"
+
+#include <sstream>
+#include <algorithm>
+
+#include "common/logging.h"
+
+namespace doris {
+
+// buffered reader
+
+BufferedReader::BufferedReader(FileReader* reader)
+        : _reader(reader),
+          _buffer_size(1024 * 1024),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
+        : _reader(reader),
+          _buffer_size(buffer_size),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::~BufferedReader() {
+    close();
+}
+
+Status BufferedReader::open() {
+    if (!_reader) {
+        std::stringstream ss;
+        ss << "Open buffered reader failed, reader is null";
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+    RETURN_IF_ERROR(_reader->open());
+    RETURN_IF_ERROR(fill());

Review comment:
       Is it necessary to fill() when open()?

##########
File path: be/src/exec/buffered_reader.cpp
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/buffered_reader.h"
+
+#include <sstream>
+#include <algorithm>
+
+#include "common/logging.h"
+
+namespace doris {
+
+// buffered reader
+
+BufferedReader::BufferedReader(FileReader* reader)
+        : _reader(reader),
+          _buffer_size(1024 * 1024),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
+        : _reader(reader),
+          _buffer_size(buffer_size),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::~BufferedReader() {
+    close();
+}
+
+Status BufferedReader::open() {
+    if (!_reader) {
+        std::stringstream ss;
+        ss << "Open buffered reader failed, reader is null";
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+    RETURN_IF_ERROR(_reader->open());
+    RETURN_IF_ERROR(fill());
+    return Status::OK();
+}
+
+//not support
+Status BufferedReader::read_one_message(uint8_t** buf, size_t* length) {
+    return Status::NotSupported("Not support");
+}
+
+Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
+    DCHECK_NE(*buf_len, 0);
+    int64_t bytes_read;
+    RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, &bytes_read, buf));
+    if (bytes_read == 0) {
+        *eof = true;
+    } else {
+        *eof = false;
+    }
+    return Status::OK();
+}
+
+Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+    if (nbytes <= 0) {
+        *bytes_read = 0;
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(read_once(position, nbytes, bytes_read, out));
+    //EOF
+    if (*bytes_read <= 0) {
+        return Status::OK();
+    }
+    while (*bytes_read < nbytes) {
+        int64_t len;
+        RETURN_IF_ERROR(read_once(position + *bytes_read, nbytes - *bytes_read, &len, reinterpret_cast<char*>(out) + *bytes_read));
+        // EOF
+        if (len <= 0) {
+            break;
+        }
+        *bytes_read += len;
+    }
+    return Status::OK();
+}
+
+Status BufferedReader::read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+    // requested bytes missed the local buffer
+    if (position >= _buffer_limit || position < _buffer_offset) {
+        // if requested length is larger than the capacity of buffer, do not
+        // need to copy the character into local buffer.
+        if (nbytes > _buffer_limit - _buffer_offset) {
+            return _reader->readat(position, nbytes, bytes_read, out);
+        }
+        _buffer_offset = position;
+        RETURN_IF_ERROR(fill());
+        if (position >= _buffer_limit) {

Review comment:
       when will `position > _buffer_limit`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878#discussion_r443316601



##########
File path: be/src/exec/buffered_reader.cpp
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/buffered_reader.h"
+
+#include <sstream>
+#include <algorithm>
+
+#include "common/logging.h"
+
+namespace doris {
+
+// buffered reader
+
+BufferedReader::BufferedReader(FileReader* reader)
+        : _reader(reader),
+          _buffer_size(1024 * 1024),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}

Review comment:
       good




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org