You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/09/24 02:07:22 UTC

[incubator-doris] branch master updated: [Outfile] Support hdfs in select outfile clause (#6644)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bdc8c98  [Outfile] Support hdfs in select outfile clause (#6644)
bdc8c98 is described below

commit bdc8c98008fc25d5051c7114e0d6b36ef642d6d9
Author: EmmyMiao87 <52...@qq.com>
AuthorDate: Fri Sep 24 10:07:11 2021 +0800

    [Outfile] Support hdfs in select outfile clause (#6644)
    
    Support hdfs in select outfile clause without broker.
    This PR implement a HDFS writer in BE which is used to write HDFS file directly without using broker.
    Also the hdfs outfile clause syntax check has been added in FE.
    The syntax:
    ```
    select * from xx into outfile "hdfs://user/outfile_" format as csv
    properties ("hdfs.fs.dafultFS" = "xxx", "hdfs.hdfs_user" = "xxx");
    ```
    Note that all hdfs configurations need to carry a prefix `hdfs.`.
---
 be/src/exec/CMakeLists.txt                         |   1 +
 be/src/exec/hdfs_writer.cpp                        | 173 +++++++++++++++++++++
 be/src/exec/hdfs_writer.h                          |  57 +++++++
 be/src/runtime/file_result_writer.cpp              |   5 +-
 be/src/runtime/result_file_sink.cpp                |  14 +-
 docs/en/administrator-guide/outfile.md             |  30 +++-
 docs/zh-CN/administrator-guide/outfile.md          |  26 +++-
 .../org/apache/doris/analysis/OutFileClause.java   |  17 +-
 .../java/org/apache/doris/backup/HDFSStorage.java  |  57 +++++++
 9 files changed, 365 insertions(+), 15 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 9e826cc..dd8b0e5 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -105,6 +105,7 @@ set(EXEC_FILES
     assert_num_rows_node.cpp
     s3_reader.cpp
     s3_writer.cpp
+    hdfs_writer.cpp
 )
 
 if (ARCH_AMD64)
diff --git a/be/src/exec/hdfs_writer.cpp b/be/src/exec/hdfs_writer.cpp
new file mode 100644
index 0000000..6c4b343
--- /dev/null
+++ b/be/src/exec/hdfs_writer.cpp
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/hdfs_writer.h"
+
+#include "common/logging.h"
+
+namespace doris {
+const static std::string FS_KEY = "fs.defaultFS";
+const static std::string USER = "hdfs_user";
+const static std::string KERBEROS_PRINCIPAL = "kerberos_principal";
+const static std::string KERB_TICKET_CACHE_PATH = "kerb_ticket_cache_path";
+const static std::string TOKEN = "token";
+
+HDFSWriter::HDFSWriter(std::map<std::string, std::string>& properties, const std::string& path)
+        : _properties(properties),
+          _path(path),
+          _hdfs_fs(nullptr) {
+    _parse_properties(_properties);
+}
+
+HDFSWriter::~HDFSWriter() {
+    close();
+}
+
+Status HDFSWriter::open() {
+    RETURN_IF_ERROR(_connect());
+    if (_hdfs_fs == nullptr) {
+        return Status::InternalError("HDFS writer open without client");
+    }
+    int exists = hdfsExists(_hdfs_fs, _path.c_str());
+    if (exists == 0) {
+        // the path already exists
+        return Status::AlreadyExist(_path + " already exists.");
+    }
+    // open file
+    _hdfs_file = hdfsOpenFile(_hdfs_fs, _path.c_str(), O_WRONLY, 0, 0, 0);
+    if (_hdfs_file == nullptr) {
+        std::stringstream ss;
+        ss << "open file failed. namenode:" << _namenode << " path:" << _path;
+        return Status::InternalError(ss.str());
+    }
+    LOG(INFO) << "open file. namenode:" << _namenode << " path:" << _path;
+    return Status::OK();
+}
+
+Status HDFSWriter::write(const uint8_t* buf, size_t buf_len, size_t* written_len) {
+    if (buf_len == 0) {
+        *written_len = 0;
+        return Status::OK();
+    }
+    int32_t result = hdfsWrite(_hdfs_fs, _hdfs_file, buf, buf_len);
+    if (result < 0) {
+        std::stringstream ss;
+        ss << "write file failed. namenode:" << _namenode << " path:" << _path;
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+
+    *written_len = (unsigned int) result;
+    return Status::OK();
+}
+
+Status HDFSWriter::close() {
+    if (_closed) {
+        return Status::OK();
+    }
+    _closed = true;
+    if (_hdfs_fs == nullptr) {
+        return Status::OK();
+    }
+    if (_hdfs_file == nullptr) {
+        // Even if there is an error, the resources associated with the hdfsFS will be freed.
+        hdfsDisconnect(_hdfs_fs);
+        return Status::OK();
+    }
+    int result = hdfsFlush(_hdfs_fs, _hdfs_file);
+    if (result == -1) {
+        std::stringstream ss;
+        ss << "failed to flush hdfs file. namenode:" << _namenode << " path:" << _path;
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+    hdfsCloseFile(_hdfs_fs, _hdfs_file);
+    hdfsDisconnect(_hdfs_fs);
+
+    _hdfs_file = nullptr;
+    _hdfs_fs = nullptr;
+    return Status::OK();
+}
+
+Status HDFSWriter::_connect() {
+    hdfsBuilder* hdfs_builder = hdfsNewBuilder();
+    hdfsBuilderSetNameNode(hdfs_builder, _namenode.c_str());
+    // set hdfs user
+    if (!_user.empty()) {
+        hdfsBuilderSetUserName(hdfs_builder, _user.c_str());
+    }
+    // set kerberos conf
+    if (!_kerb_principal.empty()) {
+        hdfsBuilderSetPrincipal(hdfs_builder, _kerb_principal.c_str());
+    }
+    if (!_kerb_ticket_cache_path.empty()) {
+        hdfsBuilderSetKerbTicketCachePath(hdfs_builder, _kerb_ticket_cache_path.c_str());
+    }
+    // set token
+    if (!_token.empty()) {
+        hdfsBuilderSetToken(hdfs_builder, _token.c_str());
+    }
+    // set other conf
+    if (!_properties.empty()) {
+        std::map<std::string, std::string>::iterator iter;
+        for (iter = _properties.begin(); iter != _properties.end(); iter++) {
+            hdfsBuilderConfSetStr(hdfs_builder, iter->first.c_str(), iter->second.c_str());
+        }
+    }
+    _hdfs_fs = hdfsBuilderConnect(hdfs_builder);
+    if (_hdfs_fs == nullptr) {
+        std::stringstream ss;
+        ss << "connect failed. namenode:" << _namenode;
+        return Status::InternalError(ss.str());
+    }
+    return Status::OK();
+}
+
+Status HDFSWriter::_parse_properties(std::map<std::string, std::string>& prop) {
+    std::map<std::string, std::string>::iterator iter;
+    for (iter = prop.begin(); iter != prop.end(); iter++) {
+        if (iter->first.compare(FS_KEY) == 0) {
+            _namenode = iter->second;
+            prop.erase(iter);
+        }
+        if (iter->first.compare(USER) == 0) {
+            _user = iter->second;
+            prop.erase(iter);
+        }
+        if (iter->first.compare(KERBEROS_PRINCIPAL) == 0) {
+            _kerb_principal = iter->second;
+            prop.erase(iter);
+        }
+        if (iter->first.compare(KERB_TICKET_CACHE_PATH) == 0) {
+            _kerb_ticket_cache_path = iter->second;
+            prop.erase(iter);
+        }
+        if (iter->first.compare(TOKEN) == 0) {
+            _token = iter->second;
+            prop.erase(iter);
+        }
+    }
+
+    if (_namenode.empty()) {
+        DCHECK(false) << "hdfs properties is incorrect.";
+        LOG(ERROR) << "hdfs properties is incorrect.";
+        return Status::InternalError("hdfs properties is incorrect");
+    }
+    return Status::OK();
+}
+
+}// end namespace doris
\ No newline at end of file
diff --git a/be/src/exec/hdfs_writer.h b/be/src/exec/hdfs_writer.h
new file mode 100644
index 0000000..19e0832
--- /dev/null
+++ b/be/src/exec/hdfs_writer.h
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <hdfs/hdfs.h>
+
+#include <map>
+#include <string>
+
+#include "exec/file_writer.h"
+
+namespace doris {
+class HDFSWriter : public FileWriter {
+
+public:
+    HDFSWriter(std::map<std::string, std::string>& properties, const std::string& path);
+    ~HDFSWriter();
+    Status open() override;
+
+    // Writes up to count bytes from the buffer pointed buf to the file.
+    // NOTE: the number of bytes written may be less than count if.
+    Status write(const uint8_t* buf, size_t buf_len, size_t* written_len) override;
+
+    Status close() override;
+
+private:
+    Status _connect();
+    Status _parse_properties(std::map<std::string, std::string>& prop);
+
+    std::map<std::string, std::string> _properties;
+    std::string _user = "";
+    std::string _namenode = "";
+    std::string _path = "";
+    std::string _kerb_principal = "";
+    std::string _kerb_ticket_cache_path = "";
+    std::string _token = "";
+    hdfsFS _hdfs_fs = nullptr;
+    hdfsFile _hdfs_file = nullptr;
+    bool _closed = false;
+};
+
+}
\ No newline at end of file
diff --git a/be/src/runtime/file_result_writer.cpp b/be/src/runtime/file_result_writer.cpp
index 4edfdb5..5fd25b3 100644
--- a/be/src/runtime/file_result_writer.cpp
+++ b/be/src/runtime/file_result_writer.cpp
@@ -21,6 +21,7 @@
 #include "exec/local_file_writer.h"
 #include "exec/parquet_writer.h"
 #include "exec/s3_writer.h"
+#include "exec/hdfs_writer.h"
 #include "exprs/expr.h"
 #include "exprs/expr_context.h"
 #include "gen_cpp/PaloInternalService_types.h"
@@ -131,12 +132,14 @@ Status FileResultWriter::_create_next_file_writer() {
 Status FileResultWriter::_create_file_writer(const std::string& file_name) {
     if (_storage_type == TStorageBackendType::LOCAL) {
         _file_writer = new LocalFileWriter(file_name, 0 /* start offset */);
-    } else if (_storage_type == TStorageBackendType::BROKER){
+    } else if (_storage_type == TStorageBackendType::BROKER) {
         _file_writer =
                 new BrokerWriter(_state->exec_env(), _file_opts->broker_addresses,
                                  _file_opts->broker_properties, file_name, 0 /*start offset*/);
     } else if (_storage_type == TStorageBackendType::S3) {
         _file_writer =  new S3Writer(_file_opts->broker_properties, file_name, 0 /* offset */);
+    } else if (_storage_type == TStorageBackendType::HDFS) {
+        _file_writer = new HDFSWriter(const_cast<std::map<std::string, std::string>&>(_file_opts->broker_properties), file_name);
     }
     RETURN_IF_ERROR(_file_writer->open());
     switch (_file_opts->file_format) {
diff --git a/be/src/runtime/result_file_sink.cpp b/be/src/runtime/result_file_sink.cpp
index 090fe6f..41eedac 100644
--- a/be/src/runtime/result_file_sink.cpp
+++ b/be/src/runtime/result_file_sink.cpp
@@ -108,9 +108,6 @@ Status ResultFileSink::prepare(RuntimeState* state) {
         _mem_tracker = MemTracker::CreateTracker(
                 _profile, -1, "ResultFileSink:" + print_id(state->fragment_instance_id()),
                 state->instance_mem_tracker());
-        for (int i = 0; i < _channels.size(); ++i) {
-            RETURN_IF_ERROR(_channels[i]->init(state));
-        }
         // create writer
         _output_batch = new RowBatch(_output_row_descriptor, 1024, _mem_tracker.get());
         _writer.reset(new (std::nothrow) FileResultWriter(_file_opts.get(), _storage_type,
@@ -119,6 +116,9 @@ Status ResultFileSink::prepare(RuntimeState* state) {
 
     }
     RETURN_IF_ERROR(_writer->init(state));
+    for (int i = 0; i < _channels.size(); ++i) {
+        RETURN_IF_ERROR(_channels[i]->init(state));
+    }
     return Status::OK();
 }
 
@@ -155,9 +155,11 @@ Status ResultFileSink::close(RuntimeState* state, Status exec_status) {
                 time(NULL) + config::result_buffer_cancelled_interval_time,
                 state->fragment_instance_id());
     } else {
-        RETURN_IF_ERROR(serialize_batch(_output_batch, _current_pb_batch, _channels.size()));
-        for (auto channel : _channels) {
-            RETURN_IF_ERROR(channel->send_batch(_current_pb_batch));
+        if (final_status.ok()) {
+            RETURN_IF_ERROR(serialize_batch(_output_batch, _current_pb_batch, _channels.size()));
+            for (auto channel : _channels) {
+                RETURN_IF_ERROR(channel->send_batch(_current_pb_batch));
+            }
         }
         Status final_st = Status::OK();
         for (int i = 0; i < _channels.size(); ++i) {
diff --git a/docs/en/administrator-guide/outfile.md b/docs/en/administrator-guide/outfile.md
index 1e9d97c..b114bde 100644
--- a/docs/en/administrator-guide/outfile.md
+++ b/docs/en/administrator-guide/outfile.md
@@ -30,7 +30,7 @@ This document describes how to use the `SELECT INTO OUTFILE` command to export q
 
 ## Syntax
 
-The `SELECT INTO OUTFILE` statement can export the query results to a file. Currently supports export to remote storage through Broker process, or directly through S3 protocol such as HDFS, S3, BOS and COS(Tencent Cloud) through the Broker process. The syntax is as follows:
+The `SELECT INTO OUTFILE` statement can export the query results to a file. Currently supports export to remote storage through Broker process, or directly through S3, HDFS  protocol such as HDFS, S3, BOS and COS(Tencent Cloud) through the Broker process. The syntax is as follows:
 
 ```
 query_stmt
@@ -61,15 +61,18 @@ INTO OUTFILE "file_path"
 
 * `[properties]`
 
-    Specify the relevant attributes. Currently it supports exporting through the Broker process, or through the S3 protocol.
+    Specify the relevant attributes. Currently it supports exporting through the Broker process, or through the S3, HDFS protocol.
 
     + Broker related attributes need to be prefixed with `broker.`. For details, please refer to [Broker Document](./broker.html).
+    + HDFS protocal can directly execute HDFS protocal configuration.
     + S3 protocol can directly execute S3 protocol configuration.
 
     ```
     PROPERTIES
     ("broker.prop_key" = "broker.prop_val", ...)
     or 
+    ("hdfs.fs.defaultFS" = "xxx", "hdfs.user" = "xxx")
+    or
     ("AWS_ENDPOINT" = "xxx", ...)
     ```
 
@@ -91,7 +94,7 @@ INTO OUTFILE "file_path"
 By default, the export of the query result set is non-concurrent, that is, a single point of export. If the user wants the query result set to be exported concurrently, the following conditions need to be met:
 
 1. session variable 'enable_parallel_outfile' to enable concurrent export: ```set enable_parallel_outfile = true;```
-2. The export method is S3 instead of using a broker
+2. The export method is S3, HDFS instead of using a broker
 3. The query can meet the needs of concurrent export, for example, the top level does not contain single point nodes such as sort. (I will give an example later, which is a query that does not export the result set concurrently)
 
 If the above three conditions are met, the concurrent export query result set can be triggered. Concurrency = ```be_instacne_num * parallel_fragment_exec_instance_num```
@@ -279,6 +282,27 @@ Planning example for concurrent export:
 
     **But because the query statement has a top-level sorting node, even if the query is enabled for concurrently exported session variables, it cannot be exported concurrently.**
 
+7. Example 7
+
+    Export simple query results to the file `hdfs://path/to/result.txt`. Specify the export format as CSV. Use HDFS protocal directly and set kerberos authentication information.
+    
+    ```
+    SELECT * FROM tbl
+    INTO OUTFILE "hdfs://path/to/result_"
+    FORMAT AS CSV
+    PROPERTIES
+    (
+        "hdfs.fs.defaultFS" = "hdfs://namenode_ip:namenode_port",
+        "hdfs.hadoop.security.authentication" = "kerberos",
+        "hdfs.kerberos_principal" = "doris@YOUR.COM",
+        "hdfs.kerberos_keytab" = "/home/doris/my.keytab",
+        "max_file_size" = "100MB"
+    );
+    ```
+    
+    If the result is less than 100MB, file will be: `result_0.csv`.
+    
+    If larger than 100MB, may be: `result_0.csv, result_1.csv, ...`.
 ## Return result
 
 The command is a synchronization command. The command returns, which means the operation is over.
diff --git a/docs/zh-CN/administrator-guide/outfile.md b/docs/zh-CN/administrator-guide/outfile.md
index f3274a1..2352cf8 100644
--- a/docs/zh-CN/administrator-guide/outfile.md
+++ b/docs/zh-CN/administrator-guide/outfile.md
@@ -30,7 +30,7 @@ under the License.
 
 ## 语法
 
-`SELECT INTO OUTFILE` 语句可以将查询结果导出到文件中。目前支持通过 Broker 进程, 或直接通过 S3 协议导出到远端存储,如 HDFS,S3,BOS,COS(腾讯云)上。语法如下
+`SELECT INTO OUTFILE` 语句可以将查询结果导出到文件中。目前支持通过 Broker 进程, 通过 S3 协议, 或直接通过 HDFS 协议,导出到远端存储,如 HDFS,S3,BOS,COS(腾讯云)上。语法如下
 
 ```
 query_stmt
@@ -65,10 +65,13 @@ INTO OUTFILE "file_path"
     指定相关属性。目前支持通过 Broker 进程, 或通过 S3 协议进行导出。
 
     + Broker 相关属性需加前缀 `broker.`。具体参阅[Broker 文档](./broker.html)。
+    + HDFS 相关属性需加前缀 `hdfs.`。
     + S3 协议则直接执行 S3 协议配置即可。
 
     ```
     ("broker.prop_key" = "broker.prop_val", ...)
+    or
+    ("hdfs.fs.defaultFS" = "xxx", "hdfs.hdfs_user" = "xxx")
     or 
     ("AWS_ENDPOINT" = "xxx", ...)
     ``` 
@@ -90,7 +93,7 @@ INTO OUTFILE "file_path"
 默认情况下,查询结果集的导出是非并发的,也就是单点导出。如果用户希望查询结果集可以并发导出,需要满足以下条件:
 
 1. session variable 'enable_parallel_outfile' 开启并发导出: ```set enable_parallel_outfile = true;```
-2. 导出方式为 S3 , 而不是使用 broker
+2. 导出方式为 S3 , 或者 HDFS, 而不是使用 broker
 3. 查询可以满足并发导出的需求,比如顶层不包含 sort 等单点节点。(后面会举例说明,哪种属于不可并发导出结果集的查询)
 
 满足以上三个条件,就能触发并发导出查询结果集了。并发度 = ```be_instacne_num * parallel_fragment_exec_instance_num```
@@ -136,7 +139,7 @@ explain select xxx from xxx where xxx  into outfile "s3://xxx" format as csv pro
 
 1. 示例1
 
-    将简单查询结果导出到文件 `hdfs:/path/to/result.txt`。指定导出格式为 CSV。使用 `my_broker` 并设置 kerberos 认证信息。指定列分隔符为 `,`,行分隔符为 `\n`。
+    使用 broker 方式导出,将简单查询结果导出到文件 `hdfs:/path/to/result.txt`。指定导出格式为 CSV。使用 `my_broker` 并设置 kerberos 认证信息。指定列分隔符为 `,`,行分隔符为 `\n`。
 
     ```
     SELECT * FROM tbl
@@ -278,6 +281,23 @@ explain select xxx from xxx where xxx  into outfile "s3://xxx" format as csv pro
 
     **但由于查询语句带了一个顶层的排序节点,所以这个查询即使开启并发导出的 session 变量,也是无法并发导出的。**
 
+7. 示例7
+
+    使用 hdfs 方式导出,将简单查询结果导出到文件 `hdfs:/path/to/result.txt`。指定导出格式为 CSV。使用并设置 kerberos 认证信息。
+
+    ```
+    SELECT * FROM tbl
+    INTO OUTFILE "hdfs://path/to/result_"
+    FORMAT AS CSV
+    PROPERTIES
+    (
+        "hdfs.fs.defaultFS" = "hdfs://namenode:port",
+        "hdfs.hadoop.security.authentication" = "kerberos",
+        "hdfs.kerberos_principal" = "doris@YOUR.COM",
+        "hdfs.kerberos_keytab" = "/home/doris/my.keytab"
+    );
+    ```
+    
 ## 返回结果
 
 导出命令为同步命令。命令返回,即表示操作结束。同时会返回一行结果来展示导出的执行结果。
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 5aecc2a..248ffd3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.analysis;
 
+import org.apache.doris.backup.HDFSStorage;
 import org.apache.doris.backup.S3Storage;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.Type;
@@ -84,6 +85,8 @@ public class OutFileClause {
 
     public static final String LOCAL_FILE_PREFIX = "file:///";
     private static final String S3_FILE_PREFIX = "S3://";
+    private static final String HDFS_FILE_PREFIX = "hdfs://";
+    private static final String HDFS_PROP_PREFIX = "hdfs.";
     private static final String BROKER_PROP_PREFIX = "broker.";
     private static final String PROP_BROKER_NAME = "broker.name";
     private static final String PROP_COLUMN_SEPARATOR = "column_separator";
@@ -369,6 +372,10 @@ public class OutFileClause {
         } else if (filePath.toUpperCase().startsWith(S3_FILE_PREFIX)) {
             brokerName = StorageBackend.StorageType.S3.name();
             storageType = StorageBackend.StorageType.S3;
+        } else if (filePath.toUpperCase().startsWith(HDFS_FILE_PREFIX.toUpperCase())) {
+            brokerName = StorageBackend.StorageType.HDFS.name();
+            storageType = StorageBackend.StorageType.HDFS;
+            filePath = filePath.substring(HDFS_FILE_PREFIX.length() - 1);
         } else {
             return;
         }
@@ -383,13 +390,19 @@ public class OutFileClause {
             } else if (entry.getKey().toUpperCase().startsWith(S3Storage.S3_PROPERTIES_PREFIX)) {
                 brokerProps.put(entry.getKey(), entry.getValue());
                 processedPropKeys.add(entry.getKey());
+            } else if (entry.getKey().startsWith(HDFS_PROP_PREFIX)
+                    && storageType == StorageBackend.StorageType.HDFS) {
+                brokerProps.put(entry.getKey().substring(HDFS_PROP_PREFIX.length()), entry.getValue());
+                processedPropKeys.add(entry.getKey());
             }
         }
-
-        brokerDesc = new BrokerDesc(brokerName, storageType, brokerProps);
         if (storageType == StorageBackend.StorageType.S3) {
             S3Storage.checkS3(new CaseInsensitiveMap(brokerProps));
+        } else if (storageType == StorageBackend.StorageType.HDFS) {
+            HDFSStorage.checkHDFS(brokerProps);
         }
+
+        brokerDesc = new BrokerDesc(brokerName, storageType, brokerProps);
     }
 
     /**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/HDFSStorage.java b/fe/fe-core/src/main/java/org/apache/doris/backup/HDFSStorage.java
new file mode 100644
index 0000000..3948828
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/HDFSStorage.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.backup;
+
+import org.apache.doris.common.UserException;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+// TODO: extend BlobStorage
+public class HDFSStorage {
+    public static final String HDFS_DEFAULT_FS = "fs.defaultFS";
+    public static final String USER = "hdfs_user";
+    public static final String NAME_SERVICES = "dfs.nameservices";
+    public static final String NAME_NODES = "dfs.ha.namenodes";
+    public static final String RPC_ADDRESS = "dfs.namenode.rpc-address";
+    public static final String FAILOVER_PROXY = "dfs.client.failover.proxy.provider";
+    public static final String AUTHENTICATION = "hadoop.security.authentication";
+    public static final String KERBEROS_PRINCIPAL = "kerberos_principal";
+    public static final String KERB_TICKET_CACHE_PATH = "kerb_ticket_cache_path";
+    public static final String TOKEN = "token";
+
+    public static Set<String> keySets = new HashSet<>(Arrays.asList(HDFS_DEFAULT_FS, USER,
+            NAME_SERVICES, NAME_NODES, RPC_ADDRESS, FAILOVER_PROXY,
+            AUTHENTICATION,
+            KERBEROS_PRINCIPAL, KERB_TICKET_CACHE_PATH,
+            TOKEN));
+
+
+    public static void checkHDFS(Map<String, String> properties) throws UserException {
+        if (!properties.containsKey(HDFS_DEFAULT_FS)) {
+            throw new UserException(HDFS_DEFAULT_FS + " not found. This is required field");
+        }
+        for (String key : properties.keySet()) {
+            if (!keySets.contains(key)) {
+                throw new UserException("Unknown properties " + key);
+            }
+        }
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org