You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/06/16 15:57:22 UTC

[GitHub] [incubator-doris] morningman commented on a change in pull request #3878: [optimize] Optimize spark load/broker load reading parquet format file

morningman commented on a change in pull request #3878:
URL: https://github.com/apache/incubator-doris/pull/3878#discussion_r440953854



##########
File path: be/src/exec/buffered_reader.h
##########
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "olap/olap_define.h"
+#include "exec/file_reader.h"
+
+namespace doris {
+
+// Buffered Reader of broker
+class BufferedReader : public FileReader {
+public:
+    // If the reader need the file size, set it when construct BrokerReader.
+    // There is no other way to set the file size.
+    BufferedReader(FileReader* reader);
+    BufferedReader(FileReader* reader, int64_t buffer_size);
+    virtual ~BufferedReader();
+
+    virtual Status open() override;
+
+    // Read 
+    virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
+    virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override;
+    virtual Status read_one_message(uint8_t** buf, size_t* length) override;
+    virtual int64_t size() override;
+    virtual Status seek(int64_t position) override;
+    virtual Status tell(int64_t* position) override;
+    virtual void close() override;
+    virtual bool closed() override;
+
+private:
+    Status fill();

Review comment:
       ```suggestion
       Status _fill();
   ```

##########
File path: be/src/exec/buffered_reader.h
##########
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "olap/olap_define.h"
+#include "exec/file_reader.h"
+
+namespace doris {
+
+// Buffered Reader of broker
+class BufferedReader : public FileReader {
+public:
+    // If the reader need the file size, set it when construct BrokerReader.

Review comment:
       ```suggestion
       // If the reader need the file size, set it when construct FileReader.
   ```

##########
File path: be/src/exec/buffered_reader.cpp
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/buffered_reader.h"
+
+#include <sstream>
+#include <algorithm>
+
+#include "common/logging.h"
+
+namespace doris {
+
+// buffered reader
+
+BufferedReader::BufferedReader(FileReader* reader)
+        : _reader(reader),
+          _buffer_size(1024 * 1024),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
+        : _reader(reader),
+          _buffer_size(buffer_size),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::~BufferedReader() {
+    close();
+}
+
+Status BufferedReader::open() {
+    if (!_reader) {
+        std::stringstream ss;
+        ss << "Open buffered reader failed, reader is null";
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+    RETURN_IF_ERROR(_reader->open());
+    RETURN_IF_ERROR(fill());
+    return Status::OK();
+}
+
+//not support
+Status BufferedReader::read_one_message(uint8_t** buf, size_t* length) {
+    return Status::NotSupported("Not support");
+}
+
+Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
+    DCHECK_NE(*buf_len, 0);
+    int64_t bytes_read;
+    RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, &bytes_read, buf));
+    if (bytes_read == 0) {
+        *eof = true;
+    } else {
+        *eof = false;
+    }
+    return Status::OK();
+}
+
+Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+    if (nbytes <= 0) {
+        *bytes_read = 0;
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(read_once(position, nbytes, bytes_read, out));
+    //EOF
+    if (*bytes_read <= 0) {
+        return Status::OK();
+    }
+    while (*bytes_read < nbytes) {
+        int64_t len;
+        RETURN_IF_ERROR(read_once(position + *bytes_read, nbytes - *bytes_read, &len, reinterpret_cast<char*>(out) + *bytes_read));
+        // EOF
+        if (len <= 0) {
+            break;
+        }
+        *bytes_read += len;
+    }
+    return Status::OK();
+}
+
+Status BufferedReader::read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+    // requested bytes missed the local buffer
+    if (position >= _buffer_limit || position < _buffer_offset) {
+        // if requested length is larger than the capacity of buffer, do not
+        // need to copy the character into local buffer.
+        if (nbytes > _buffer_limit - _buffer_offset) {
+            return _reader->readat(position, nbytes, bytes_read, out);
+        }
+        _buffer_offset = position;
+        RETURN_IF_ERROR(fill());
+        if (position >= _buffer_limit) {
+            *bytes_read = 0;
+            return Status::OK();
+        }
+    } 
+    int64_t len = std::min(_buffer_limit - position, nbytes);
+    int64_t off = position - _buffer_offset;
+    memcpy(out, _buffer + off, len);
+    *bytes_read = len;
+    _cur_offset = position + *bytes_read;
+    return Status::OK();
+}
+
+Status BufferedReader::fill() {
+    if (_buffer_offset >= 0) {
+        int64_t bytes_read;
+        int retry_times = 1;

Review comment:
       What is this retry for? add comment for it.

##########
File path: be/src/exec/buffered_reader.cpp
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/buffered_reader.h"
+
+#include <sstream>
+#include <algorithm>
+
+#include "common/logging.h"
+
+namespace doris {
+
+// buffered reader
+
+BufferedReader::BufferedReader(FileReader* reader)
+        : _reader(reader),
+          _buffer_size(1024 * 1024),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
+        : _reader(reader),
+          _buffer_size(buffer_size),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::~BufferedReader() {
+    close();
+}
+
+Status BufferedReader::open() {
+    if (!_reader) {
+        std::stringstream ss;
+        ss << "Open buffered reader failed, reader is null";
+        LOG(WARNING) << ss.str();

Review comment:
       Let the caller print the log, not here.

##########
File path: be/src/exec/buffered_reader.h
##########
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "olap/olap_define.h"
+#include "exec/file_reader.h"
+
+namespace doris {
+
+// Buffered Reader of broker
+class BufferedReader : public FileReader {
+public:
+    // If the reader need the file size, set it when construct BrokerReader.
+    // There is no other way to set the file size.
+    BufferedReader(FileReader* reader);
+    BufferedReader(FileReader* reader, int64_t buffer_size);
+    virtual ~BufferedReader();
+
+    virtual Status open() override;
+
+    // Read 
+    virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
+    virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override;
+    virtual Status read_one_message(uint8_t** buf, size_t* length) override;
+    virtual int64_t size() override;
+    virtual Status seek(int64_t position) override;
+    virtual Status tell(int64_t* position) override;
+    virtual void close() override;
+    virtual bool closed() override;
+
+private:
+    Status fill();
+    Status read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out);

Review comment:
       ```suggestion
       Status _read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out);
   ```

##########
File path: be/src/exec/buffered_reader.cpp
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/buffered_reader.h"
+
+#include <sstream>
+#include <algorithm>
+
+#include "common/logging.h"
+
+namespace doris {
+
+// buffered reader
+
+BufferedReader::BufferedReader(FileReader* reader)
+        : _reader(reader),
+          _buffer_size(1024 * 1024),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
+        : _reader(reader),
+          _buffer_size(buffer_size),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::~BufferedReader() {
+    close();
+}
+
+Status BufferedReader::open() {
+    if (!_reader) {
+        std::stringstream ss;
+        ss << "Open buffered reader failed, reader is null";
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+    RETURN_IF_ERROR(_reader->open());
+    RETURN_IF_ERROR(fill());

Review comment:
       Is it necessary to fill() when open()?

##########
File path: be/src/exec/buffered_reader.cpp
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/buffered_reader.h"
+
+#include <sstream>
+#include <algorithm>
+
+#include "common/logging.h"
+
+namespace doris {
+
+// buffered reader
+
+BufferedReader::BufferedReader(FileReader* reader)
+        : _reader(reader),
+          _buffer_size(1024 * 1024),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
+        : _reader(reader),
+          _buffer_size(buffer_size),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::~BufferedReader() {
+    close();
+}
+
+Status BufferedReader::open() {
+    if (!_reader) {
+        std::stringstream ss;
+        ss << "Open buffered reader failed, reader is null";
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+    RETURN_IF_ERROR(_reader->open());
+    RETURN_IF_ERROR(fill());
+    return Status::OK();
+}
+
+//not support
+Status BufferedReader::read_one_message(uint8_t** buf, size_t* length) {
+    return Status::NotSupported("Not support");
+}
+
+Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
+    DCHECK_NE(*buf_len, 0);
+    int64_t bytes_read;
+    RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, &bytes_read, buf));
+    if (bytes_read == 0) {
+        *eof = true;
+    } else {
+        *eof = false;
+    }
+    return Status::OK();
+}
+
+Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+    if (nbytes <= 0) {
+        *bytes_read = 0;
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(read_once(position, nbytes, bytes_read, out));
+    //EOF
+    if (*bytes_read <= 0) {
+        return Status::OK();
+    }
+    while (*bytes_read < nbytes) {
+        int64_t len;
+        RETURN_IF_ERROR(read_once(position + *bytes_read, nbytes - *bytes_read, &len, reinterpret_cast<char*>(out) + *bytes_read));
+        // EOF
+        if (len <= 0) {
+            break;
+        }
+        *bytes_read += len;
+    }
+    return Status::OK();
+}
+
+Status BufferedReader::read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
+    // requested bytes missed the local buffer
+    if (position >= _buffer_limit || position < _buffer_offset) {
+        // if requested length is larger than the capacity of buffer, do not
+        // need to copy the character into local buffer.
+        if (nbytes > _buffer_limit - _buffer_offset) {
+            return _reader->readat(position, nbytes, bytes_read, out);
+        }
+        _buffer_offset = position;
+        RETURN_IF_ERROR(fill());
+        if (position >= _buffer_limit) {

Review comment:
       when will `position > _buffer_limit`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org