You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/03/09 01:36:00 UTC

[incubator-doris] branch master updated: [Load] Support multi bytes LineDelimiter and ColumnSeparator (#5462)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e023ef5  [Load] Support multi bytes LineDelimiter and ColumnSeparator (#5462)
e023ef5 is described below

commit e023ef5404493fa0b1aece6b72d1c2202f190dc2
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Tue Mar 9 09:35:39 2021 +0800

    [Load] Support multi bytes LineDelimiter and ColumnSeparator (#5462)
    
    * [Internal][Support Multibytes Separator] doris-1079
    support multi bytes LineDelimiter and ColumnSeparator
---
 be/CMakeLists.txt                                  |  2 +-
 be/src/exec/broker_scanner.cpp                     | 52 ++++++++++++++++------
 be/src/exec/broker_scanner.h                       |  6 ++-
 be/src/exec/plain_text_line_reader.cpp             | 19 ++++----
 be/src/exec/plain_text_line_reader.h               |  6 ++-
 be/src/http/action/stream_load.cpp                 |  3 ++
 be/src/http/http_common.h                          |  1 +
 be/test/exec/plain_text_line_reader_bzip_test.cpp  | 12 ++---
 be/test/exec/plain_text_line_reader_gzip_test.cpp  | 14 +++---
 .../exec/plain_text_line_reader_lz4frame_test.cpp  | 12 ++---
 be/test/exec/plain_text_line_reader_lzop_test.cpp  | 14 +++---
 .../plain_text_line_reader_uncompressed_test.cpp   | 16 +++----
 fe/fe-core/src/main/cup/sql_parser.cup             | 10 ++---
 .../doris/analysis/CreateRoutineLoadStmt.java      | 11 +++--
 .../org/apache/doris/analysis/DataDescription.java | 17 ++++---
 .../{ColumnSeparator.java => Separator.java}       |  9 ++--
 .../org/apache/doris/load/BrokerFileGroup.java     |  6 +--
 .../src/main/java/org/apache/doris/load/Load.java  | 21 ++++++---
 .../org/apache/doris/load/RoutineLoadDesc.java     | 14 ++++--
 .../doris/load/routineload/RoutineLoadJob.java     | 15 +++++--
 .../org/apache/doris/planner/BrokerScanNode.java   |  4 ++
 .../apache/doris/planner/StreamLoadScanNode.java   | 16 ++++++-
 .../java/org/apache/doris/qe/MultiLoadMgr.java     |  6 +--
 .../java/org/apache/doris/task/LoadTaskInfo.java   |  5 ++-
 .../java/org/apache/doris/task/StreamLoadTask.java | 21 +++++++--
 .../doris/analysis/CreateRoutineLoadStmtTest.java  |  4 +-
 .../apache/doris/analysis/DataDescriptionTest.java | 12 ++---
 ...ColumnSeparatorTest.java => SeparatorTest.java} | 26 +++++------
 .../load/routineload/KafkaRoutineLoadJobTest.java  |  8 ++--
 .../load/routineload/RoutineLoadManagerTest.java   |  6 +--
 gensrc/thrift/FrontendService.thrift               |  1 +
 gensrc/thrift/PlanNodes.thrift                     |  6 +++
 32 files changed, 240 insertions(+), 135 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 5d624ac..8be4883 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -584,7 +584,7 @@ FUNCTION(ADD_BE_TEST TEST_NAME)
 
     ADD_EXECUTABLE(${TEST_FILE_NAME} ${TEST_NAME}.cpp ${ADDITIONAL_FILES})
     TARGET_LINK_LIBRARIES(${TEST_FILE_NAME} ${TEST_LINK_LIBS})
-    SET_TARGET_PROPERTIES(${TEST_FILE_NAME} PROPERTIES COMPILE_FLAGS "-fno-access-control")
+    SET_TARGET_PROPERTIES(${TEST_FILE_NAME} PROPERTIES COMPILE_FLAGS "-fno-access-control" ENABLE_EXPORTS 1)
     if (NOT "${TEST_DIR_NAME}" STREQUAL "")
         SET_TARGET_PROPERTIES(${TEST_FILE_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}/${TEST_DIR_NAME}")
     endif()
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index 1844ba8..f732244 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -49,16 +49,28 @@ BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile,
         : BaseScanner(state, profile, params, pre_filter_ctxs, counter),
           _ranges(ranges),
           _broker_addresses(broker_addresses),
-          // _splittable(params.splittable),
-          _value_separator(static_cast<char>(params.column_separator)),
-          _line_delimiter(static_cast<char>(params.line_delimiter)),
           _cur_file_reader(nullptr),
           _cur_line_reader(nullptr),
           _cur_decompressor(nullptr),
           _next_range(0),
           _cur_line_reader_eof(false),
           _scanner_eof(false),
-          _skip_next_line(false) {}
+          _skip_next_line(false) {
+    if (params.__isset.column_separator_length && params.column_separator_length > 1) {
+        _value_separator = params.column_separator_str;
+        _value_separator_length = params.column_separator_length;
+    } else {
+        _value_separator.push_back(static_cast<char>(params.column_separator));
+        _value_separator_length = 1;
+    }
+    if (params.__isset.line_delimiter_length && params.line_delimiter_length > 1) {
+        _line_delimiter = params.line_delimiter_str;
+        _line_delimiter_length = params.line_delimiter_length;
+    } else {
+        _line_delimiter.push_back(static_cast<char>(params.line_delimiter));
+        _line_delimiter_length = 1;
+    }
+}
 
 BrokerScanner::~BrokerScanner() {
     close();
@@ -255,7 +267,7 @@ Status BrokerScanner::open_line_reader() {
     case TFileFormatType::FORMAT_CSV_LZOP:
     case TFileFormatType::FORMAT_CSV_DEFLATE:
         _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader, _cur_decompressor,
-                                                   size, _line_delimiter);
+                                                   size, _line_delimiter, _line_delimiter_length);
         break;
     default: {
         std::stringstream ss;
@@ -292,16 +304,24 @@ void BrokerScanner::close() {
 }
 
 void BrokerScanner::split_line(const Slice& line, std::vector<Slice>* values) {
-    // line-begin char and line-end char are considered to be 'delimiter'
     const char* value = line.data;
-    const char* ptr = line.data;
-    for (size_t i = 0; i < line.size; ++i, ++ptr) {
-        if (*ptr == _value_separator) {
-            values->emplace_back(value, ptr - value);
-            value = ptr + 1;
+    size_t i = 0;
+    // TODO improve the performance
+    while (i < line.size) {
+        if (i + _value_separator_length <= line.size) {
+            if (_value_separator.compare(0, _value_separator_length, line.data + i,
+                                         _value_separator_length) == 0) {
+                values->emplace_back(value, line.data + i - value);
+                value = line.data + i + _value_separator_length;
+                i += _value_separator_length;
+            } else {
+                ++i;
+            }
+        } else {
+            break;
         }
     }
-    values->emplace_back(value, ptr - value);
+    values->emplace_back(value, line.data + i - value);
 }
 
 void BrokerScanner::fill_fix_length_string(const Slice& value, MemPool* pool, char** new_value_p,
@@ -413,7 +433,9 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
     if (values.size() + columns_from_path.size() < _src_slot_descs.size()) {
         std::stringstream error_msg;
         error_msg << "actual column number is less than schema column number. "
-                  << "actual number: " << values.size() << " sep: " << _value_separator << ", "
+                  << "actual number: " << values.size() << " column separator: ["
+                  << _value_separator << "], "
+                  << "line delimiter: [" << _line_delimiter << "], "
                   << "schema number: " << _src_slot_descs.size() << "; ";
         _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
         _counter->num_rows_filtered++;
@@ -421,7 +443,9 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
     } else if (values.size() + columns_from_path.size() > _src_slot_descs.size()) {
         std::stringstream error_msg;
         error_msg << "actual column number is more than schema column number. "
-                  << "actual number: " << values.size() << " sep: " << _value_separator << ", "
+                  << "actual number: " << values.size() << " column separator: ["
+                  << _value_separator << "], "
+                  << "line delimiter: [" << _line_delimiter << "], "
                   << "schema number: " << _src_slot_descs.size() << "; ";
         _state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
         _counter->num_rows_filtered++;
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index 2a2563e..da41f47 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -98,8 +98,10 @@ private:
 
     std::unique_ptr<TextConverter> _text_converter;
 
-    char _value_separator;
-    char _line_delimiter;
+    std::string _value_separator;
+    std::string _line_delimiter;
+    int _value_separator_length;
+    int _line_delimiter_length;
 
     // Reader
     FileReader* _cur_file_reader;
diff --git a/be/src/exec/plain_text_line_reader.cpp b/be/src/exec/plain_text_line_reader.cpp
index 2f0dcc3..d51fa33 100644
--- a/be/src/exec/plain_text_line_reader.cpp
+++ b/be/src/exec/plain_text_line_reader.cpp
@@ -34,13 +34,14 @@ namespace doris {
 
 PlainTextLineReader::PlainTextLineReader(RuntimeProfile* profile, FileReader* file_reader,
                                          Decompressor* decompressor, size_t length,
-                                         uint8_t line_delimiter)
+                                         const std::string& line_delimiter, size_t line_delimiter_length)
         : _profile(profile),
           _file_reader(file_reader),
           _decompressor(decompressor),
           _min_length(length),
           _total_read_bytes(0),
           _line_delimiter(line_delimiter),
+          _line_delimiter_length(line_delimiter_length),
           _input_buf(new uint8_t[INPUT_CHUNK]),
           _input_buf_size(INPUT_CHUNK),
           _input_buf_pos(0),
@@ -92,7 +93,7 @@ inline bool PlainTextLineReader::update_eof() {
 uint8_t* PlainTextLineReader::update_field_pos_and_find_line_delimiter(const uint8_t* start,
                                                                        size_t len) {
     // TODO: meanwhile find and save field pos
-    return (uint8_t*)memmem(start, len, &_line_delimiter, 1);
+    return (uint8_t*)memmem(start, len, _line_delimiter.c_str(), _line_delimiter_length);
 }
 
 // extend input buf if necessary only when _more_input_bytes > 0
@@ -195,10 +196,12 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e
 
         if (pos == nullptr) {
             // didn't find line delimiter, read more data from decompressor
-            // 1. point 'offset' to _output_buf_limit
-            offset = output_buf_read_remaining();
-
-            // 2. read from file reader
+            // for multi bytes delimiter we cannot set offset to avoid incomplete
+            // delimiter
+            // read from file reader
+            if (_line_delimiter_length == 1) {
+                offset = output_buf_read_remaining();
+            }
             extend_output_buf();
             if ((_input_buf_limit > _input_buf_pos) && _more_input_bytes == 0) {
                 // we still have data in input which is not decompressed.
@@ -266,7 +269,7 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e
 
             if (_decompressor != nullptr) {
                 SCOPED_TIMER(_decompress_timer);
-                // 2. decompress
+                // decompress
                 size_t input_read_bytes = 0;
                 size_t decompressed_len = 0;
                 _more_input_bytes = 0;
@@ -316,7 +319,7 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e
             // we found a complete line
             // ready to return
             offset = pos - cur_ptr;
-            found_line_delimiter = 1;
+            found_line_delimiter = _line_delimiter_length;
             break;
         }
     } // while (!done())
diff --git a/be/src/exec/plain_text_line_reader.h b/be/src/exec/plain_text_line_reader.h
index 01ed692..2b8aad6 100644
--- a/be/src/exec/plain_text_line_reader.h
+++ b/be/src/exec/plain_text_line_reader.h
@@ -29,7 +29,8 @@ class Status;
 class PlainTextLineReader : public LineReader {
 public:
     PlainTextLineReader(RuntimeProfile* profile, FileReader* file_reader,
-                        Decompressor* decompressor, size_t length, uint8_t line_delimiter);
+                        Decompressor* decompressor, size_t length,
+                        const std::string& line_delimiter, size_t line_delimiter_length);
 
     virtual ~PlainTextLineReader();
 
@@ -61,7 +62,8 @@ private:
     Decompressor* _decompressor;
     size_t _min_length;
     size_t _total_read_bytes;
-    uint8_t _line_delimiter;
+    std::string _line_delimiter;
+    size_t _line_delimiter_length;
 
     // save the data read from file reader
     uint8_t* _input_buf;
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index 47bfdb3..fbcbba5 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -348,6 +348,9 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
     if (!http_req->header(HTTP_COLUMN_SEPARATOR).empty()) {
         request.__set_columnSeparator(http_req->header(HTTP_COLUMN_SEPARATOR));
     }
+    if (!http_req->header(HTTP_LINE_DELIMITER).empty()) {
+        request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER));
+    }
     if (!http_req->header(HTTP_PARTITIONS).empty()) {
         request.__set_partitions(http_req->header(HTTP_PARTITIONS));
         request.__set_isTempPartition(false);
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index 5681db6..d9bf046 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -28,6 +28,7 @@ static const std::string HTTP_FORMAT_KEY = "format";
 static const std::string HTTP_COLUMNS = "columns";
 static const std::string HTTP_WHERE = "where";
 static const std::string HTTP_COLUMN_SEPARATOR = "column_separator";
+static const std::string HTTP_LINE_DELIMITER = "line_delimiter";
 static const std::string HTTP_MAX_FILTER_RATIO = "max_filter_ratio";
 static const std::string HTTP_TIMEOUT = "timeout";
 static const std::string HTTP_PARTITIONS = "partitions";
diff --git a/be/test/exec/plain_text_line_reader_bzip_test.cpp b/be/test/exec/plain_text_line_reader_bzip_test.cpp
index 648e3f8..8a2868a 100644
--- a/be/test/exec/plain_text_line_reader_bzip_test.cpp
+++ b/be/test/exec/plain_text_line_reader_bzip_test.cpp
@@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_normal_use) {
     st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -97,7 +97,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit) {
     st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -134,7 +134,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit2) {
     st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -158,7 +158,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit3) {
     st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -188,7 +188,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit4) {
     st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -218,7 +218,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit5) {
     st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
diff --git a/be/test/exec/plain_text_line_reader_gzip_test.cpp b/be/test/exec/plain_text_line_reader_gzip_test.cpp
index 8f75ef8..2f6d190 100644
--- a/be/test/exec/plain_text_line_reader_gzip_test.cpp
+++ b/be/test/exec/plain_text_line_reader_gzip_test.cpp
@@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, gzip_normal_use) {
     st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -98,7 +98,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_no_newline) {
     st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -133,7 +133,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit) {
     st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -169,7 +169,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit2) {
     st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -194,7 +194,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit3) {
     st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -224,7 +224,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit4) {
     st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -254,7 +254,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit5) {
     st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
diff --git a/be/test/exec/plain_text_line_reader_lz4frame_test.cpp b/be/test/exec/plain_text_line_reader_lz4frame_test.cpp
index 157fe12..f78d736 100644
--- a/be/test/exec/plain_text_line_reader_lz4frame_test.cpp
+++ b/be/test/exec/plain_text_line_reader_lz4frame_test.cpp
@@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, lz4_normal_use) {
     st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -97,7 +97,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit) {
     st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -134,7 +134,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit2) {
     st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -158,7 +158,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit3) {
     st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -188,7 +188,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit4) {
     st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -218,7 +218,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit5) {
     st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
diff --git a/be/test/exec/plain_text_line_reader_lzop_test.cpp b/be/test/exec/plain_text_line_reader_lzop_test.cpp
index e06432c..5f83afc 100644
--- a/be/test/exec/plain_text_line_reader_lzop_test.cpp
+++ b/be/test/exec/plain_text_line_reader_lzop_test.cpp
@@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, lzop_normal_use) {
     st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -96,7 +96,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_limit) {
     st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -132,7 +132,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_limit2) {
     st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -155,7 +155,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_limit3) {
     st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -184,7 +184,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_limit4) {
     st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -213,7 +213,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_limit5) {
     st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -233,7 +233,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_larger) {
     st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor);
     ASSERT_TRUE(st.ok());
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
diff --git a/be/test/exec/plain_text_line_reader_uncompressed_test.cpp b/be/test/exec/plain_text_line_reader_uncompressed_test.cpp
index 807049d..6568b4f 100644
--- a/be/test/exec/plain_text_line_reader_uncompressed_test.cpp
+++ b/be/test/exec/plain_text_line_reader_uncompressed_test.cpp
@@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_normal_use) {
     ASSERT_TRUE(st.ok());
     ASSERT_TRUE(decompressor == nullptr);
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -98,7 +98,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_no_newline) {
     ASSERT_TRUE(st.ok());
     ASSERT_TRUE(decompressor == nullptr);
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -133,7 +133,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_limit) {
     ASSERT_TRUE(st.ok());
     ASSERT_TRUE(decompressor == nullptr);
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -170,7 +170,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_limit2) {
     ASSERT_TRUE(st.ok());
     ASSERT_TRUE(decompressor == nullptr);
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -196,7 +196,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_limit3) {
     ASSERT_TRUE(st.ok());
     ASSERT_TRUE(decompressor == nullptr);
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -227,7 +227,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_limit4) {
     ASSERT_TRUE(st.ok());
     ASSERT_TRUE(decompressor == nullptr);
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -258,7 +258,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_limit5) {
     ASSERT_TRUE(st.ok());
     ASSERT_TRUE(decompressor == nullptr);
 
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
@@ -280,7 +280,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_empty) {
     ASSERT_TRUE(decompressor == nullptr);
 
     // set min length larger than 0 to test
-    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 10, '\n');
+    PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 10, "\n", 1);
     const uint8_t* ptr;
     size_t size;
     bool eof;
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index 71bd5bc..c611932 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -439,7 +439,7 @@ nonterminal List<String> opt_col_list, opt_dup_keys, opt_columns_from_path;
 nonterminal List<ColWithComment> opt_col_with_comment_list, col_with_comment_list;
 nonterminal ColWithComment col_with_comment;
 nonterminal List<Expr> opt_col_mapping_list;
-nonterminal ColumnSeparator opt_field_term, column_separator;
+nonterminal Separator opt_field_term, separator;
 nonterminal String opt_user_role;
 nonterminal TablePattern tbl_pattern;
 nonterminal ResourcePattern resource_pattern;
@@ -1444,14 +1444,14 @@ opt_field_term ::=
     :}
     | KW_COLUMNS KW_TERMINATED KW_BY STRING_LITERAL:sep
     {:
-        RESULT = new ColumnSeparator(sep);
+        RESULT = new Separator(sep);
     :}
     ;
 
-column_separator ::=
+separator ::=
     KW_COLUMNS KW_TERMINATED KW_BY STRING_LITERAL:sep
     {:
-        RESULT = new ColumnSeparator(sep);
+        RESULT = new Separator(sep);
     :}
     ;
 
@@ -1597,7 +1597,7 @@ opt_load_property_list ::=
     ;
 
 load_property ::=
-    column_separator:colSep
+    separator:colSep
     {:
         RESULT = colSep;
     :}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index 1960c28..568e938 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -345,7 +345,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     }
 
     public void checkLoadProperties() throws UserException {
-        ColumnSeparator columnSeparator = null;
+        Separator columnSeparator = null;
+        // TODO(yangzhengguo01): add line delimiter to properties
+        Separator lineDelimiter = null;
         ImportColumnsStmt importColumnsStmt = null;
         ImportWhereStmt precedingImportWhereStmt = null;
         ImportWhereStmt importWhereStmt = null;
@@ -354,12 +356,12 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         ImportDeleteOnStmt importDeleteOnStmt = null;
         if (loadPropertyList != null) {
             for (ParseNode parseNode : loadPropertyList) {
-                if (parseNode instanceof ColumnSeparator) {
+                if (parseNode instanceof Separator) {
                     // check column separator
                     if (columnSeparator != null) {
                         throw new AnalysisException("repeat setting of column separator");
                     }
-                    columnSeparator = (ColumnSeparator) parseNode;
+                    columnSeparator = (Separator) parseNode;
                     columnSeparator.analyze(null);
                 } else if (parseNode instanceof ImportColumnsStmt) {
                     // check columns info
@@ -403,7 +405,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
                 }
             }
         }
-        routineLoadDesc = new RoutineLoadDesc(columnSeparator, importColumnsStmt, precedingImportWhereStmt, importWhereStmt,
+        routineLoadDesc = new RoutineLoadDesc(columnSeparator, lineDelimiter, importColumnsStmt,
+                        precedingImportWhereStmt, importWhereStmt,
                         partitionNames, importDeleteOnStmt == null ? null : importDeleteOnStmt.getExpr(), mergeType,
                         importSequenceStmt == null ? null : importSequenceStmt.getSequenceColName());
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index 7e31c2c..31046a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -96,7 +96,7 @@ public class DataDescription {
     private final String tableName;
     private final PartitionNames partitionNames;
     private final List<String> filePaths;
-    private final ColumnSeparator columnSeparator;
+    private final Separator columnSeparator;
     private final String fileFormat;
     private final boolean isNegative;
     // column names in the path
@@ -112,7 +112,7 @@ public class DataDescription {
     private List<String> fileFieldNames;
     // Used for mini load
     private TNetworkAddress beAddr;
-    private String lineDelimiter;
+    private Separator lineDelimiter;
     private String columnDef;
     private long backendId;
     private boolean stripOuterArray = false;
@@ -141,7 +141,7 @@ public class DataDescription {
                            PartitionNames partitionNames,
                            List<String> filePaths,
                            List<String> columns,
-                           ColumnSeparator columnSeparator,
+                           Separator columnSeparator,
                            String fileFormat,
                            boolean isNegative,
                            List<Expr> columnMappingList) {
@@ -153,7 +153,7 @@ public class DataDescription {
                            PartitionNames partitionNames,
                            List<String> filePaths,
                            List<String> columns,
-                           ColumnSeparator columnSeparator,
+                           Separator columnSeparator,
                            String fileFormat,
                            List<String> columnsFromPath,
                            boolean isNegative,
@@ -428,7 +428,7 @@ public class DataDescription {
         if (columnSeparator == null) {
             return null;
         }
-        return columnSeparator.getColumnSeparator();
+        return columnSeparator.getSeparator();
     }
 
     public boolean isNegative() {
@@ -444,10 +444,13 @@ public class DataDescription {
     }
 
     public String getLineDelimiter() {
-        return lineDelimiter;
+        if (lineDelimiter == null) {
+            return null;
+        }
+        return lineDelimiter.getSeparator();
     }
 
-    public void setLineDelimiter(String lineDelimiter) {
+    public void setLineDelimiter(Separator lineDelimiter) {
         this.lineDelimiter = lineDelimiter;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnSeparator.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java
similarity index 93%
rename from fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnSeparator.java
rename to fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java
index 3d2b271..efc61d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnSeparator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java
@@ -23,18 +23,18 @@ import com.google.common.base.Strings;
 
 import java.io.StringWriter;
 
-public class ColumnSeparator implements ParseNode {
+public class Separator implements ParseNode {
     private static final String HEX_STRING = "0123456789ABCDEF";
 
     private final String oriSeparator;
     private String separator;
 
-    public ColumnSeparator(String separator) {
+    public Separator(String separator) {
         this.oriSeparator = separator;
         this.separator = null;
     }
 
-    public String getColumnSeparator() {
+    public String getSeparator() {
         return separator;
     }
 
@@ -69,7 +69,8 @@ public class ColumnSeparator implements ParseNode {
         }
 
         if (originStr.toUpperCase().startsWith("\\X")) {
-            String hexStr = originStr.substring(2);
+            // convert \x01\x02\x0a to 01020a
+            String hexStr = originStr.replaceAll("(?i)\\\\X", "");
             // check hex str
             if (hexStr.isEmpty()) {
                 throw new AnalysisException("Hex str is empty");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index e608887..272a226 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.load;
 
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.DataDescription;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ImportColumnDesc;
@@ -110,8 +110,8 @@ public class BrokerFileGroup implements Writable {
     // Used for broker table, no need to parse
     public BrokerFileGroup(BrokerTable table) throws AnalysisException {
         this.tableId = table.getId();
-        this.valueSeparator = ColumnSeparator.convertSeparator(table.getColumnSeparator());
-        this.lineDelimiter = table.getLineDelimiter();
+        this.valueSeparator = Separator.convertSeparator(table.getColumnSeparator());
+        this.lineDelimiter = Separator.convertSeparator(table.getLineDelimiter());
         this.isNegative = false;
         this.filePaths = table.getPaths();
         this.fileFormat = table.getFileFormat();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 2cb3273..db7b958 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -22,7 +22,7 @@ import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BinaryPredicate;
 import org.apache.doris.analysis.CancelLoadStmt;
 import org.apache.doris.analysis.CastExpr;
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.DataDescription;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ExprSubstitutionMap;
@@ -258,9 +258,9 @@ public class Load {
         // partitions | column names | separator | line delimiter
         List<String> partitionNames = null;
         List<String> columnNames = null;
-        ColumnSeparator columnSeparator = null;
+        Separator columnSeparator = null;
         List<String> hllColumnPairList = null;
-        String lineDelimiter = null;
+        Separator lineDelimiter = null;
         String formatType = null;
         if (params != null) {
             String specifiedPartitions = params.get(LoadStmt.KEY_IN_PARAM_PARTITIONS);
@@ -282,14 +282,25 @@ public class Load {
                 if (columnSeparatorStr.isEmpty()) {
                     columnSeparatorStr = "\t";
                 }
-                columnSeparator = new ColumnSeparator(columnSeparatorStr);
+                columnSeparator = new Separator(columnSeparatorStr);
                 try {
                     columnSeparator.analyze();
                 } catch (AnalysisException e) {
                     throw new DdlException(e.getMessage());
                 }
             }
-            lineDelimiter = params.get(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER);
+            String lineDelimiterStr = params.get(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER);
+            if (lineDelimiterStr != null) {
+                if (lineDelimiterStr.isEmpty()) {
+                    lineDelimiterStr = "\n";
+                }
+                lineDelimiter = new Separator(lineDelimiterStr);
+                try {
+                    lineDelimiter.analyze();
+                } catch (AnalysisException e) {
+                    throw new DdlException(e.getMessage());
+                }
+            }
             formatType = params.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE);
         }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java b/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java
index bb7de5c..f9053bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java
@@ -18,7 +18,7 @@
 package org.apache.doris.load;
 
 import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ImportColumnsStmt;
 import org.apache.doris.analysis.ImportWhereStmt;
@@ -30,7 +30,8 @@ import org.apache.doris.load.loadv2.LoadTask;
 import com.google.common.base.Strings;
 
 public class RoutineLoadDesc {
-    private final ColumnSeparator columnSeparator;
+    private final Separator columnSeparator;
+    private final Separator lineDelimiter;
     private final ImportColumnsStmt columnsInfo;
     private final ImportWhereStmt precedingFilter;
     private final ImportWhereStmt wherePredicate;
@@ -40,11 +41,12 @@ public class RoutineLoadDesc {
     private final PartitionNames partitionNames;
     private final String sequenceColName;
 
-    public RoutineLoadDesc(ColumnSeparator columnSeparator, ImportColumnsStmt columnsInfo,
+    public RoutineLoadDesc(Separator columnSeparator, Separator lineDelimiter, ImportColumnsStmt columnsInfo,
                            ImportWhereStmt precedingFilter, ImportWhereStmt wherePredicate,
                            PartitionNames partitionNames, Expr deleteCondition, LoadTask.MergeType mergeType,
                            String sequenceColName) {
         this.columnSeparator = columnSeparator;
+        this.lineDelimiter = lineDelimiter;
         this.columnsInfo = columnsInfo;
         this.precedingFilter = precedingFilter;
         this.wherePredicate = wherePredicate;
@@ -54,10 +56,14 @@ public class RoutineLoadDesc {
         this.sequenceColName = sequenceColName;
     }
 
-    public ColumnSeparator getColumnSeparator() {
+    public Separator getColumnSeparator() {
         return columnSeparator;
     }
 
+    public Separator getLineDelimiter() {
+        return lineDelimiter;
+    }
+
     public ImportColumnsStmt getColumnsInfo() {
         return columnsInfo;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index de9ff6d..812c053 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -18,7 +18,7 @@
 package org.apache.doris.load.routineload;
 
 import org.apache.doris.analysis.AlterRoutineLoadStmt;
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ImportColumnDesc;
@@ -158,7 +158,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
     protected List<ImportColumnDesc> columnDescs; // optional
     protected Expr precedingFilter; // optional
     protected Expr whereExpr; // optional
-    protected ColumnSeparator columnSeparator; // optional
+    protected Separator columnSeparator; // optional
+    protected Separator lineDelimiter;
     protected int desireTaskConcurrentNum; // optional
     protected JobState state = JobState.NEED_SCHEDULE;
     protected LoadDataSourceType dataSourceType;
@@ -362,6 +363,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
             if (routineLoadDesc.getColumnSeparator() != null) {
                 columnSeparator = routineLoadDesc.getColumnSeparator();
             }
+            if (routineLoadDesc.getLineDelimiter() != null) {
+                lineDelimiter = routineLoadDesc.getLineDelimiter();
+            }
             if (routineLoadDesc.getPartitionNames() != null) {
                 partitions = routineLoadDesc.getPartitionNames();
             }
@@ -485,10 +489,14 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         return whereExpr;
     }
 
-    public ColumnSeparator getColumnSeparator() {
+    public Separator getColumnSeparator() {
         return columnSeparator;
     }
 
+    public Separator getLineDelimiter() {
+        return lineDelimiter;
+    }
+
     public boolean isStrictMode() {
         String value = jobProperties.get(LoadStmt.STRICT_MODE);
         if (value == null) {
@@ -1350,6 +1358,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
             jobProperties.put("dataFormat", "json");
         } else {
             jobProperties.put("columnSeparator", columnSeparator == null ? "\t" : columnSeparator.toString());
+            jobProperties.put("lineDelimiter", lineDelimiter == null ? "\n" : lineDelimiter.toString());
         }
         jobProperties.put("maxErrorNum", String.valueOf(maxErrorNum));
         jobProperties.put("maxBatchIntervalS", String.valueOf(maxBatchIntervalS));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index c5d0b31..e3e7dc5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -200,6 +200,10 @@ public class BrokerScanNode extends LoadScanNode {
         BrokerFileGroup fileGroup = context.fileGroup;
         params.setColumnSeparator(fileGroup.getValueSeparator().getBytes(Charset.forName("UTF-8"))[0]);
         params.setLineDelimiter(fileGroup.getLineDelimiter().getBytes(Charset.forName("UTF-8"))[0]);
+        params.setColumnSeparatorStr(fileGroup.getValueSeparator());
+        params.setLineDelimiterStr(fileGroup.getLineDelimiter());
+        params.setColumnSeparatorLength(fileGroup.getValueSeparator().getBytes(Charset.forName("UTF-8")).length);
+        params.setLineDelimiterLength(fileGroup.getLineDelimiter().getBytes(Charset.forName("UTF-8")).length);
         params.setStrictMode(strictMode);
         params.setProperties(brokerDesc.getProperties());
         deleteCondition = fileGroup.getDeleteCondition();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
index 24d614b..112fa22 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
@@ -144,12 +144,24 @@ public class StreamLoadScanNode extends LoadScanNode {
         createDefaultSmap(analyzer);
 
         if (taskInfo.getColumnSeparator() != null) {
-            String sep = taskInfo.getColumnSeparator().getColumnSeparator();
+            String sep = taskInfo.getColumnSeparator().getSeparator();
+            params.setColumnSeparatorStr(sep);
+            params.setColumnSeparatorLength(sep.getBytes(Charset.forName("UTF-8")).length);
             params.setColumnSeparator(sep.getBytes(Charset.forName("UTF-8"))[0]);
         } else {
             params.setColumnSeparator((byte) '\t');
+            params.setColumnSeparatorLength(1);
+            params.setColumnSeparatorStr("\t");
+        }
+        if (taskInfo.getLineDelimiter() != null) {
+            String sep = taskInfo.getLineDelimiter().getSeparator();
+            params.setLineDelimiterStr(sep);
+            params.setLineDelimiterLength(sep.getBytes(Charset.forName("UTF-8")).length);
+            params.setLineDelimiter(sep.getBytes(Charset.forName("UTF-8"))[0]);
+        } else {
+            params.setLineDelimiter((byte) '\n');
+            params.setLineDelimiterLength(1);
         }
-        params.setLineDelimiter((byte) '\n');
         params.setDestTupleId(desc.getId().asInt());
         brokerScanRange.setParams(params);
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
index cd42c98..7ddd39f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
@@ -19,7 +19,7 @@ package org.apache.doris.qe;
 
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.DataDescription;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ImportWhereStmt;
@@ -456,7 +456,7 @@ public class MultiLoadMgr {
                     fileSizes.add(pair.second);
                 });
             }
-            ColumnSeparator columnSeparator = null;
+            Separator columnSeparator = null;
             PartitionNames partitionNames = null;
             String fileFormat = properties.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE);
             boolean isNegative = properties.get(LoadStmt.KEY_IN_PARAM_NEGATIVE) == null ? false :
@@ -475,7 +475,7 @@ public class MultiLoadMgr {
                 colString = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS);
                 String columnSeparatorStr = properties.get(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR);
                 if (columnSeparatorStr != null) {
-                    columnSeparator = new ColumnSeparator(columnSeparatorStr);
+                    columnSeparator = new Separator(columnSeparatorStr);
                     try {
                         columnSeparator.analyze();
                     } catch (AnalysisException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index 983a944..c6dd020 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.task;
 
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ImportColumnDesc;
 import org.apache.doris.analysis.PartitionNames;
@@ -51,5 +51,6 @@ public interface LoadTaskInfo {
 
     public Expr getPrecedingFilter();
     public Expr getWhereExpr();
-    public ColumnSeparator getColumnSeparator();
+    public Separator getColumnSeparator();
+    public Separator getLineDelimiter();
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index c79e5c7..57d07f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.task;
 
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ImportColumnDesc;
 import org.apache.doris.analysis.ImportColumnsStmt;
@@ -62,7 +62,8 @@ public class StreamLoadTask implements LoadTaskInfo {
     // optional
     private List<ImportColumnDesc> columnExprDescs = Lists.newArrayList();
     private Expr whereExpr;
-    private ColumnSeparator columnSeparator;
+    private Separator columnSeparator;
+    private Separator lineDelimiter;
     private PartitionNames partitions;
     private String path;
     private boolean negative;
@@ -114,10 +115,14 @@ public class StreamLoadTask implements LoadTaskInfo {
         return whereExpr;
     }
 
-    public ColumnSeparator getColumnSeparator() {
+    public Separator getColumnSeparator() {
         return columnSeparator;
     }
 
+    public Separator getLineDelimiter() {
+        return lineDelimiter;
+    }
+
     public PartitionNames getPartitions() {
         return partitions;
     }
@@ -217,6 +222,9 @@ public class StreamLoadTask implements LoadTaskInfo {
         if (request.isSetColumnSeparator()) {
             setColumnSeparator(request.getColumnSeparator());
         }
+        if (request.isSetLineDelimiter()) {
+            setLineDelimiter(request.getLineDelimiter());
+        }
         if (request.isSetPartitions()) {
             String[] partNames = request.getPartitions().trim().split("\\s*,\\s*");
             if (request.isSetIsTempPartition()) {
@@ -331,10 +339,15 @@ public class StreamLoadTask implements LoadTaskInfo {
     }
 
     private void setColumnSeparator(String oriSeparator) throws AnalysisException {
-        columnSeparator = new ColumnSeparator(oriSeparator);
+        columnSeparator = new Separator(oriSeparator);
         columnSeparator.analyze();
     }
 
+    private void setLineDelimiter(String oriLineDelimiter) throws AnalysisException {
+        lineDelimiter = new Separator(oriLineDelimiter);
+        lineDelimiter.analyze();
+    }
+
     @Override
     public long getMemLimit() {
         return execMemLimit;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java
index d964502..8c3a1fa 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java
@@ -94,7 +94,7 @@ public class CreateRoutineLoadStmtTest {
         List<String> partitionNameString = Lists.newArrayList();
         partitionNameString.add("p1");
         PartitionNames partitionNames = new PartitionNames(false, partitionNameString);
-        ColumnSeparator columnSeparator = new ColumnSeparator(",");
+        Separator columnSeparator = new Separator(",");
 
         // duplicate load property
         List<ParseNode> loadPropertyList = new ArrayList<>();
@@ -142,7 +142,7 @@ public class CreateRoutineLoadStmtTest {
         List<String> partitionNameString = Lists.newArrayList();
         partitionNameString.add("p1");
         PartitionNames partitionNames = new PartitionNames(false, partitionNameString);
-        ColumnSeparator columnSeparator = new ColumnSeparator(",");
+        Separator columnSeparator = new Separator(",");
 
         // duplicate load property
         TableName tableName = new TableName(dbName, tableNameString);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java
index 9197c6b..03f21cc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java
@@ -119,7 +119,7 @@ public class DataDescriptionTest {
         Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1),  new IntLiteral(1));
 
         desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
-                Lists.newArrayList("col1", "col2"), new ColumnSeparator(","), "csv", null, false, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null);
+                Lists.newArrayList("col1", "col2"), new Separator(","), "csv", null, false, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null);
         desc.analyze("testDb");
         Assert.assertEquals("MERGE DATA INFILE ('abc.txt') INTO TABLE testTable COLUMNS TERMINATED BY ',' (col1, col2) WHERE 1 = 1 DELETE ON 1 = 1", desc.toString());
         Assert.assertEquals("1 = 1", desc.getWhereExpr().toSql());
@@ -127,7 +127,7 @@ public class DataDescriptionTest {
         Assert.assertEquals(",", desc.getColumnSeparator());
 
         desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt", "bcd.txt"),
-                                                  Lists.newArrayList("col1", "col2"), new ColumnSeparator("\t"),
+                                                  Lists.newArrayList("col1", "col2"), new Separator("\t"),
                                                   null, true, null);
         desc.analyze("testDb");
         Assert.assertEquals("APPEND DATA INFILE ('abc.txt', 'bcd.txt') NEGATIVE INTO TABLE testTable"
@@ -136,7 +136,7 @@ public class DataDescriptionTest {
 
         // hive \x01 column separator
         desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt", "bcd.txt"),
-                                                  Lists.newArrayList("col1", "col2"), new ColumnSeparator("\\x01"),
+                                                  Lists.newArrayList("col1", "col2"), new Separator("\\x01"),
                                                   null, true, null);
         desc.analyze("testDb");
         Assert.assertEquals("APPEND DATA INFILE ('abc.txt', 'bcd.txt') NEGATIVE INTO TABLE testTable"
@@ -220,7 +220,7 @@ public class DataDescriptionTest {
         Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1),  new IntLiteral(1));
 
         DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
-                Lists.newArrayList("col1", "col2"), new ColumnSeparator(","), "csv", null, true, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null);
+                Lists.newArrayList("col1", "col2"), new Separator(","), "csv", null, true, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null);
         desc.analyze("testDb");
     }
 
@@ -311,7 +311,7 @@ public class DataDescriptionTest {
     @Test
     public void testAnalyzeSequenceColumnNormal() throws AnalysisException {
         DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
-                Lists.newArrayList("k1", "k2", "source_sequence", "v1"), new ColumnSeparator("\t"),
+                Lists.newArrayList("k1", "k2", "source_sequence", "v1"), new Separator("\t"),
                 null, null, false, null, null, null, LoadTask.MergeType.APPEND, null, "source_sequence");
         new Expectations() {
             {
@@ -330,7 +330,7 @@ public class DataDescriptionTest {
     @Test(expected = AnalysisException.class)
     public void testAnalyzeSequenceColumnWithoutSourceSequence() throws AnalysisException {
         DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
-                Lists.newArrayList("k1", "k2", "v1"), new ColumnSeparator("\t"),
+                Lists.newArrayList("k1", "k2", "v1"), new Separator("\t"),
                 null, null, false, null, null, null, LoadTask.MergeType.APPEND, null, "source_sequence");
         new Expectations() {
             {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnSeparatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/SeparatorTest.java
similarity index 70%
rename from fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnSeparatorTest.java
rename to fe/fe-core/src/test/java/org/apache/doris/analysis/SeparatorTest.java
index 8426992..e499cc1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnSeparatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SeparatorTest.java
@@ -22,47 +22,47 @@ import org.apache.doris.common.AnalysisException;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class ColumnSeparatorTest {
+public class SeparatorTest {
     @Test
     public void testNormal() throws AnalysisException {
         // \t
-        ColumnSeparator separator = new ColumnSeparator("\t");
+        Separator separator = new Separator("\t");
         separator.analyze();
         Assert.assertEquals("'\t'", separator.toSql());
-        Assert.assertEquals("\t", separator.getColumnSeparator());
+        Assert.assertEquals("\t", separator.getSeparator());
 
         // \x01
-        separator = new ColumnSeparator("\\x01");
+        separator = new Separator("\\x01");
         separator.analyze();
         Assert.assertEquals("'\\x01'", separator.toSql());
-        Assert.assertEquals("\1", separator.getColumnSeparator());
+        Assert.assertEquals("\1", separator.getSeparator());
 
         // \x00 \x01
-        separator = new ColumnSeparator("\\x0001");
+        separator = new Separator("\\x0001");
         separator.analyze();
         Assert.assertEquals("'\\x0001'", separator.toSql());
-        Assert.assertEquals("\0\1", separator.getColumnSeparator());
+        Assert.assertEquals("\0\1", separator.getSeparator());
 
-        separator = new ColumnSeparator("|");
+        separator = new Separator("|");
         separator.analyze();
         Assert.assertEquals("'|'", separator.toSql());
-        Assert.assertEquals("|", separator.getColumnSeparator());
+        Assert.assertEquals("|", separator.getSeparator());
 
-        separator = new ColumnSeparator("\\|");
+        separator = new Separator("\\|");
         separator.analyze();
         Assert.assertEquals("'\\|'", separator.toSql());
-        Assert.assertEquals("\\|", separator.getColumnSeparator());
+        Assert.assertEquals("\\|", separator.getSeparator());
     }
 
     @Test(expected = AnalysisException.class)
     public void testHexFormatError() throws AnalysisException {
-        ColumnSeparator separator = new ColumnSeparator("\\x0g");
+        Separator separator = new Separator("\\x0g");
         separator.analyze();
     }
 
     @Test(expected = AnalysisException.class)
     public void testHexLengthError() throws AnalysisException {
-        ColumnSeparator separator = new ColumnSeparator("\\x011");
+        Separator separator = new Separator("\\x011");
         separator.analyze();
     }
 }
\ No newline at end of file
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 5bbab3a..701eb80 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.load.routineload;
 
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.ImportSequenceStmt;
 import org.apache.doris.analysis.LabelName;
@@ -80,7 +80,7 @@ public class KafkaRoutineLoadJobTest {
 
     private PartitionNames partitionNames;
 
-    private ColumnSeparator columnSeparator = new ColumnSeparator(",");
+    private Separator columnSeparator = new Separator(",");
 
     private ImportSequenceStmt sequenceStmt = new ImportSequenceStmt("source_sequence");
 
@@ -244,7 +244,7 @@ public class KafkaRoutineLoadJobTest {
     public void testFromCreateStmtWithErrorTable(@Mocked Catalog catalog,
                                                  @Injectable Database database) throws LoadException {
         CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt();
-        RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, null,
+        RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, null, null,
                 partitionNames, null, LoadTask.MergeType.APPEND, null);
         Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc);
 
@@ -269,7 +269,7 @@ public class KafkaRoutineLoadJobTest {
                                    @Injectable Database database,
             @Injectable OlapTable table) throws UserException {
         CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt();
-        RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, null, partitionNames, null,
+        RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, null, null, partitionNames, null,
                 LoadTask.MergeType.APPEND, sequenceStmt.getSequenceColName());
         Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc);
         List<Pair<Integer, Long>> partitionIdToOffset = Lists.newArrayList();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index cd320f8..ea460e4 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.load.routineload;
 
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.LabelName;
 import org.apache.doris.analysis.ParseNode;
@@ -82,7 +82,7 @@ public class RoutineLoadManagerTest {
         String tableNameString = "table1";
         TableName tableName = new TableName(dbName, tableNameString);
         List<ParseNode> loadPropertyList = new ArrayList<>();
-        ColumnSeparator columnSeparator = new ColumnSeparator(",");
+        Separator columnSeparator = new Separator(",");
         loadPropertyList.add(columnSeparator);
         Map<String, String> properties = Maps.newHashMap();
         properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
@@ -152,7 +152,7 @@ public class RoutineLoadManagerTest {
         String tableNameString = "table1";
         TableName tableName = new TableName(dbName, tableNameString);
         List<ParseNode> loadPropertyList = new ArrayList<>();
-        ColumnSeparator columnSeparator = new ColumnSeparator(",");
+        Separator columnSeparator = new Separator(",");
         loadPropertyList.add(columnSeparator);
         Map<String, String> properties = Maps.newHashMap();
         properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 4e6cfd1..a6d7387 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -578,6 +578,7 @@ struct TStreamLoadPutRequest {
     29: optional string sequence_col
     30: optional bool num_as_string
     31: optional bool fuzzy_parse
+    32: optional string line_delimiter
 }
 
 struct TStreamLoadPutResult {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 58c09c8..96e1212 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -166,6 +166,12 @@ struct TBrokerScanRangeParams {
     // strictMode is a boolean
     // if strict mode is true, the incorrect data (the result of cast is null) will not be loaded
     10: optional bool strict_mode
+    // for multibytes separators
+    11: optional i32 column_separator_length = 1;
+    12: optional i32 line_delimiter_length = 1;
+    13: optional string column_separator_str;
+    14: optional string line_delimiter_str;
+
 }
 
 // Broker scan range


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org