You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/12/08 08:13:14 UTC

[GitHub] [doris] BePPPower commented on a diff in pull request #14875: [feature](file reader) Merge hdfs reader to the new file reader

BePPPower commented on code in PR #14875:
URL: https://github.com/apache/doris/pull/14875#discussion_r1043046468


##########
be/src/io/fs/hdfs_file_system.h:
##########
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PlanNodes_types.h>
+#include <hdfs/hdfs.h>
+
+#include "common/status.h"
+#include "io/fs/remote_file_system.h"
+#include "io/hdfs_file_reader.h"
+namespace doris {
+
+namespace io {
+
+class HdfsFileSystemHandle {
+public:
+    HdfsFileSystemHandle(hdfsFS fs, bool cached)
+            : hdfs_fs(fs), from_cache(cached), _ref_cnt(0), _last_access_time(0), _invalid(false) {}
+
+    ~HdfsFileSystemHandle() {
+        DCHECK(_ref_cnt == 0);
+        if (hdfs_fs != nullptr) {
+            // Even if there is an error, the resources associated with the hdfsFS will be freed.
+            hdfsDisconnect(hdfs_fs);
+        }
+        hdfs_fs = nullptr;
+    }
+
+    int64_t last_access_time() { return _last_access_time; }
+
+    void inc_ref() {
+        _ref_cnt++;
+        _last_access_time = _now();
+    }
+
+    void dec_ref() {
+        _ref_cnt--;
+        _last_access_time = _now();
+    }
+
+    int ref_cnt() { return _ref_cnt; }
+
+    bool invalid() { return _invalid; }
+
+    void set_invalid() { _invalid = true; }
+
+    hdfsFS hdfs_fs;
+    // When cache is full, and all handlers are in use, HdfsFileSystemCache will return an uncached handler.
+    // Client should delete the handler in such case.
+    const bool from_cache;
+
+private:
+    // the number of referenced client
+    std::atomic<int> _ref_cnt;
+    // HdfsFileSystemCache try to remove the oldest handler when the cache is full
+    std::atomic<uint64_t> _last_access_time;
+    // Client will set invalid if error thrown, and HdfsFileSystemCache will not reuse this handler
+    std::atomic<bool> _invalid;
+
+    uint64_t _now() {
+        return std::chrono::duration_cast<std::chrono::milliseconds>(
+                       std::chrono::system_clock::now().time_since_epoch())
+                .count();
+    }
+};
+
+// Cache for HdfsFileSystemHandle
+class HdfsFileSystemCache {
+public:
+    static int MAX_CACHE_HANDLE;
+
+    static HdfsFileSystemCache* instance() {
+        static HdfsFileSystemCache s_instance;
+        return &s_instance;
+    }
+
+    HdfsFileSystemCache(const HdfsFileSystemCache&) = delete;
+    const HdfsFileSystemCache& operator=(const HdfsFileSystemCache&) = delete;
+
+    // This function is thread-safe
+    Status get_connection(THdfsParams& hdfs_params, HdfsFileSystemHandle** fs_handle);
+
+private:
+    std::mutex _lock;
+    std::unordered_map<uint64, std::unique_ptr<HdfsFileSystemHandle>> _cache;
+
+    HdfsFileSystemCache() = default;
+
+    uint64 _hdfs_hash_code(THdfsParams& hdfs_params);
+    Status _create_fs(THdfsParams& hdfs_params, hdfsFS* fs);
+    void _clean_invalid();
+    void _clean_oldest();
+};
+
+class HdfsFileSystem final : public RemoteFileSystem {
+public:
+    HdfsFileSystem(THdfsParams hdfs_params, const std::string& path);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org