You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/03/09 01:36:00 UTC
[incubator-doris] branch master updated: [Load] Support multi bytes
LineDelimiter and ColumnSeparator (#5462)
This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new e023ef5 [Load] Support multi bytes LineDelimiter and ColumnSeparator (#5462)
e023ef5 is described below
commit e023ef5404493fa0b1aece6b72d1c2202f190dc2
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Tue Mar 9 09:35:39 2021 +0800
[Load] Support multi bytes LineDelimiter and ColumnSeparator (#5462)
* [Internal][Support Multibytes Separator] doris-1079
support multi bytes LineDelimiter and ColumnSeparator
---
be/CMakeLists.txt | 2 +-
be/src/exec/broker_scanner.cpp | 52 ++++++++++++++++------
be/src/exec/broker_scanner.h | 6 ++-
be/src/exec/plain_text_line_reader.cpp | 19 ++++----
be/src/exec/plain_text_line_reader.h | 6 ++-
be/src/http/action/stream_load.cpp | 3 ++
be/src/http/http_common.h | 1 +
be/test/exec/plain_text_line_reader_bzip_test.cpp | 12 ++---
be/test/exec/plain_text_line_reader_gzip_test.cpp | 14 +++---
.../exec/plain_text_line_reader_lz4frame_test.cpp | 12 ++---
be/test/exec/plain_text_line_reader_lzop_test.cpp | 14 +++---
.../plain_text_line_reader_uncompressed_test.cpp | 16 +++----
fe/fe-core/src/main/cup/sql_parser.cup | 10 ++---
.../doris/analysis/CreateRoutineLoadStmt.java | 11 +++--
.../org/apache/doris/analysis/DataDescription.java | 17 ++++---
.../{ColumnSeparator.java => Separator.java} | 9 ++--
.../org/apache/doris/load/BrokerFileGroup.java | 6 +--
.../src/main/java/org/apache/doris/load/Load.java | 21 ++++++---
.../org/apache/doris/load/RoutineLoadDesc.java | 14 ++++--
.../doris/load/routineload/RoutineLoadJob.java | 15 +++++--
.../org/apache/doris/planner/BrokerScanNode.java | 4 ++
.../apache/doris/planner/StreamLoadScanNode.java | 16 ++++++-
.../java/org/apache/doris/qe/MultiLoadMgr.java | 6 +--
.../java/org/apache/doris/task/LoadTaskInfo.java | 5 ++-
.../java/org/apache/doris/task/StreamLoadTask.java | 21 +++++++--
.../doris/analysis/CreateRoutineLoadStmtTest.java | 4 +-
.../apache/doris/analysis/DataDescriptionTest.java | 12 ++---
...ColumnSeparatorTest.java => SeparatorTest.java} | 26 +++++------
.../load/routineload/KafkaRoutineLoadJobTest.java | 8 ++--
.../load/routineload/RoutineLoadManagerTest.java | 6 +--
gensrc/thrift/FrontendService.thrift | 1 +
gensrc/thrift/PlanNodes.thrift | 6 +++
32 files changed, 240 insertions(+), 135 deletions(-)
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 5d624ac..8be4883 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -584,7 +584,7 @@ FUNCTION(ADD_BE_TEST TEST_NAME)
ADD_EXECUTABLE(${TEST_FILE_NAME} ${TEST_NAME}.cpp ${ADDITIONAL_FILES})
TARGET_LINK_LIBRARIES(${TEST_FILE_NAME} ${TEST_LINK_LIBS})
- SET_TARGET_PROPERTIES(${TEST_FILE_NAME} PROPERTIES COMPILE_FLAGS "-fno-access-control")
+ SET_TARGET_PROPERTIES(${TEST_FILE_NAME} PROPERTIES COMPILE_FLAGS "-fno-access-control" ENABLE_EXPORTS 1)
if (NOT "${TEST_DIR_NAME}" STREQUAL "")
SET_TARGET_PROPERTIES(${TEST_FILE_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}/${TEST_DIR_NAME}")
endif()
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index 1844ba8..f732244 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -49,16 +49,28 @@ BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile,
: BaseScanner(state, profile, params, pre_filter_ctxs, counter),
_ranges(ranges),
_broker_addresses(broker_addresses),
- // _splittable(params.splittable),
- _value_separator(static_cast<char>(params.column_separator)),
- _line_delimiter(static_cast<char>(params.line_delimiter)),
_cur_file_reader(nullptr),
_cur_line_reader(nullptr),
_cur_decompressor(nullptr),
_next_range(0),
_cur_line_reader_eof(false),
_scanner_eof(false),
- _skip_next_line(false) {}
+ _skip_next_line(false) {
+ if (params.__isset.column_separator_length && params.column_separator_length > 1) {
+ _value_separator = params.column_separator_str;
+ _value_separator_length = params.column_separator_length;
+ } else {
+ _value_separator.push_back(static_cast<char>(params.column_separator));
+ _value_separator_length = 1;
+ }
+ if (params.__isset.line_delimiter_length && params.line_delimiter_length > 1) {
+ _line_delimiter = params.line_delimiter_str;
+ _line_delimiter_length = params.line_delimiter_length;
+ } else {
+ _line_delimiter.push_back(static_cast<char>(params.line_delimiter));
+ _line_delimiter_length = 1;
+ }
+}
BrokerScanner::~BrokerScanner() {
close();
@@ -255,7 +267,7 @@ Status BrokerScanner::open_line_reader() {
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_CSV_DEFLATE:
_cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader, _cur_decompressor,
- size, _line_delimiter);
+ size, _line_delimiter, _line_delimiter_length);
break;
default: {
std::stringstream ss;
@@ -292,16 +304,24 @@ void BrokerScanner::close() {
}
void BrokerScanner::split_line(const Slice& line, std::vector<Slice>* values) {
- // line-begin char and line-end char are considered to be 'delimiter'
const char* value = line.data;
- const char* ptr = line.data;
- for (size_t i = 0; i < line.size; ++i, ++ptr) {
- if (*ptr == _value_separator) {
- values->emplace_back(value, ptr - value);
- value = ptr + 1;
+ size_t i = 0;
+ // TODO improve the performance
+ while (i < line.size) {
+ if (i + _value_separator_length <= line.size) {
+ if (_value_separator.compare(0, _value_separator_length, line.data + i,
+ _value_separator_length) == 0) {
+ values->emplace_back(value, line.data + i - value);
+ value = line.data + i + _value_separator_length;
+ i += _value_separator_length;
+ } else {
+ ++i;
+ }
+ } else {
+ break;
}
}
- values->emplace_back(value, ptr - value);
+ values->emplace_back(value, line.data + i - value);
}
void BrokerScanner::fill_fix_length_string(const Slice& value, MemPool* pool, char** new_value_p,
@@ -413,7 +433,9 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
if (values.size() + columns_from_path.size() < _src_slot_descs.size()) {
std::stringstream error_msg;
error_msg << "actual column number is less than schema column number. "
- << "actual number: " << values.size() << " sep: " << _value_separator << ", "
+ << "actual number: " << values.size() << " column separator: ["
+ << _value_separator << "], "
+ << "line delimiter: [" << _line_delimiter << "], "
<< "schema number: " << _src_slot_descs.size() << "; ";
_state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
_counter->num_rows_filtered++;
@@ -421,7 +443,9 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
} else if (values.size() + columns_from_path.size() > _src_slot_descs.size()) {
std::stringstream error_msg;
error_msg << "actual column number is more than schema column number. "
- << "actual number: " << values.size() << " sep: " << _value_separator << ", "
+ << "actual number: " << values.size() << " column separator: ["
+ << _value_separator << "], "
+ << "line delimiter: [" << _line_delimiter << "], "
<< "schema number: " << _src_slot_descs.size() << "; ";
_state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
_counter->num_rows_filtered++;
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index 2a2563e..da41f47 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -98,8 +98,10 @@ private:
std::unique_ptr<TextConverter> _text_converter;
- char _value_separator;
- char _line_delimiter;
+ std::string _value_separator;
+ std::string _line_delimiter;
+ int _value_separator_length;
+ int _line_delimiter_length;
// Reader
FileReader* _cur_file_reader;
diff --git a/be/src/exec/plain_text_line_reader.cpp b/be/src/exec/plain_text_line_reader.cpp
index 2f0dcc3..d51fa33 100644
--- a/be/src/exec/plain_text_line_reader.cpp
+++ b/be/src/exec/plain_text_line_reader.cpp
@@ -34,13 +34,14 @@ namespace doris {
PlainTextLineReader::PlainTextLineReader(RuntimeProfile* profile, FileReader* file_reader,
Decompressor* decompressor, size_t length,
- uint8_t line_delimiter)
+ const std::string& line_delimiter, size_t line_delimiter_length)
: _profile(profile),
_file_reader(file_reader),
_decompressor(decompressor),
_min_length(length),
_total_read_bytes(0),
_line_delimiter(line_delimiter),
+ _line_delimiter_length(line_delimiter_length),
_input_buf(new uint8_t[INPUT_CHUNK]),
_input_buf_size(INPUT_CHUNK),
_input_buf_pos(0),
@@ -92,7 +93,7 @@ inline bool PlainTextLineReader::update_eof() {
uint8_t* PlainTextLineReader::update_field_pos_and_find_line_delimiter(const uint8_t* start,
size_t len) {
// TODO: meanwhile find and save field pos
- return (uint8_t*)memmem(start, len, &_line_delimiter, 1);
+ return (uint8_t*)memmem(start, len, _line_delimiter.c_str(), _line_delimiter_length);
}
// extend input buf if necessary only when _more_input_bytes > 0
@@ -195,10 +196,12 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e
if (pos == nullptr) {
// didn't find line delimiter, read more data from decompressor
- // 1. point 'offset' to _output_buf_limit
- offset = output_buf_read_remaining();
-
- // 2. read from file reader
+ // for multi bytes delimiter we cannot set offset to avoid incomplete
+ // delimiter
+ // read from file reader
+ if (_line_delimiter_length == 1) {
+ offset = output_buf_read_remaining();
+ }
extend_output_buf();
if ((_input_buf_limit > _input_buf_pos) && _more_input_bytes == 0) {
// we still have data in input which is not decompressed.
@@ -266,7 +269,7 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e
if (_decompressor != nullptr) {
SCOPED_TIMER(_decompress_timer);
- // 2. decompress
+ // decompress
size_t input_read_bytes = 0;
size_t decompressed_len = 0;
_more_input_bytes = 0;
@@ -316,7 +319,7 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e
// we found a complete line
// ready to return
offset = pos - cur_ptr;
- found_line_delimiter = 1;
+ found_line_delimiter = _line_delimiter_length;
break;
}
} // while (!done())
diff --git a/be/src/exec/plain_text_line_reader.h b/be/src/exec/plain_text_line_reader.h
index 01ed692..2b8aad6 100644
--- a/be/src/exec/plain_text_line_reader.h
+++ b/be/src/exec/plain_text_line_reader.h
@@ -29,7 +29,8 @@ class Status;
class PlainTextLineReader : public LineReader {
public:
PlainTextLineReader(RuntimeProfile* profile, FileReader* file_reader,
- Decompressor* decompressor, size_t length, uint8_t line_delimiter);
+ Decompressor* decompressor, size_t length,
+ const std::string& line_delimiter, size_t line_delimiter_length);
virtual ~PlainTextLineReader();
@@ -61,7 +62,8 @@ private:
Decompressor* _decompressor;
size_t _min_length;
size_t _total_read_bytes;
- uint8_t _line_delimiter;
+ std::string _line_delimiter;
+ size_t _line_delimiter_length;
// save the data read from file reader
uint8_t* _input_buf;
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index 47bfdb3..fbcbba5 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -348,6 +348,9 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
if (!http_req->header(HTTP_COLUMN_SEPARATOR).empty()) {
request.__set_columnSeparator(http_req->header(HTTP_COLUMN_SEPARATOR));
}
+ if (!http_req->header(HTTP_LINE_DELIMITER).empty()) {
+ request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER));
+ }
if (!http_req->header(HTTP_PARTITIONS).empty()) {
request.__set_partitions(http_req->header(HTTP_PARTITIONS));
request.__set_isTempPartition(false);
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index 5681db6..d9bf046 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -28,6 +28,7 @@ static const std::string HTTP_FORMAT_KEY = "format";
static const std::string HTTP_COLUMNS = "columns";
static const std::string HTTP_WHERE = "where";
static const std::string HTTP_COLUMN_SEPARATOR = "column_separator";
+static const std::string HTTP_LINE_DELIMITER = "line_delimiter";
static const std::string HTTP_MAX_FILTER_RATIO = "max_filter_ratio";
static const std::string HTTP_TIMEOUT = "timeout";
static const std::string HTTP_PARTITIONS = "partitions";
diff --git a/be/test/exec/plain_text_line_reader_bzip_test.cpp b/be/test/exec/plain_text_line_reader_bzip_test.cpp
index 648e3f8..8a2868a 100644
--- a/be/test/exec/plain_text_line_reader_bzip_test.cpp
+++ b/be/test/exec/plain_text_line_reader_bzip_test.cpp
@@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_normal_use) {
st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -97,7 +97,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit) {
st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -134,7 +134,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit2) {
st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -158,7 +158,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit3) {
st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -188,7 +188,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit4) {
st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -218,7 +218,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit5) {
st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
diff --git a/be/test/exec/plain_text_line_reader_gzip_test.cpp b/be/test/exec/plain_text_line_reader_gzip_test.cpp
index 8f75ef8..2f6d190 100644
--- a/be/test/exec/plain_text_line_reader_gzip_test.cpp
+++ b/be/test/exec/plain_text_line_reader_gzip_test.cpp
@@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, gzip_normal_use) {
st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -98,7 +98,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_no_newline) {
st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -133,7 +133,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit) {
st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -169,7 +169,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit2) {
st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -194,7 +194,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit3) {
st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -224,7 +224,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit4) {
st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -254,7 +254,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit5) {
st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
diff --git a/be/test/exec/plain_text_line_reader_lz4frame_test.cpp b/be/test/exec/plain_text_line_reader_lz4frame_test.cpp
index 157fe12..f78d736 100644
--- a/be/test/exec/plain_text_line_reader_lz4frame_test.cpp
+++ b/be/test/exec/plain_text_line_reader_lz4frame_test.cpp
@@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, lz4_normal_use) {
st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -97,7 +97,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit) {
st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -134,7 +134,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit2) {
st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -158,7 +158,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit3) {
st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -188,7 +188,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit4) {
st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -218,7 +218,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit5) {
st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
diff --git a/be/test/exec/plain_text_line_reader_lzop_test.cpp b/be/test/exec/plain_text_line_reader_lzop_test.cpp
index e06432c..5f83afc 100644
--- a/be/test/exec/plain_text_line_reader_lzop_test.cpp
+++ b/be/test/exec/plain_text_line_reader_lzop_test.cpp
@@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, lzop_normal_use) {
st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -96,7 +96,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_limit) {
st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -132,7 +132,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_limit2) {
st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -155,7 +155,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_limit3) {
st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -184,7 +184,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_limit4) {
st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -213,7 +213,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_limit5) {
st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -233,7 +233,7 @@ TEST_F(PlainTextLineReaderTest, lzop_test_larger) {
st = Decompressor::create_decompressor(CompressType::LZOP, &decompressor);
ASSERT_TRUE(st.ok());
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
diff --git a/be/test/exec/plain_text_line_reader_uncompressed_test.cpp b/be/test/exec/plain_text_line_reader_uncompressed_test.cpp
index 807049d..6568b4f 100644
--- a/be/test/exec/plain_text_line_reader_uncompressed_test.cpp
+++ b/be/test/exec/plain_text_line_reader_uncompressed_test.cpp
@@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_normal_use) {
ASSERT_TRUE(st.ok());
ASSERT_TRUE(decompressor == nullptr);
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -98,7 +98,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_no_newline) {
ASSERT_TRUE(st.ok());
ASSERT_TRUE(decompressor == nullptr);
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -133,7 +133,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_limit) {
ASSERT_TRUE(st.ok());
ASSERT_TRUE(decompressor == nullptr);
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -170,7 +170,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_limit2) {
ASSERT_TRUE(st.ok());
ASSERT_TRUE(decompressor == nullptr);
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -196,7 +196,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_limit3) {
ASSERT_TRUE(st.ok());
ASSERT_TRUE(decompressor == nullptr);
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -227,7 +227,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_limit4) {
ASSERT_TRUE(st.ok());
ASSERT_TRUE(decompressor == nullptr);
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -258,7 +258,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_limit5) {
ASSERT_TRUE(st.ok());
ASSERT_TRUE(decompressor == nullptr);
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
@@ -280,7 +280,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_test_empty) {
ASSERT_TRUE(decompressor == nullptr);
// set min length larger than 0 to test
- PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 10, '\n');
+ PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 10, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index 71bd5bc..c611932 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -439,7 +439,7 @@ nonterminal List<String> opt_col_list, opt_dup_keys, opt_columns_from_path;
nonterminal List<ColWithComment> opt_col_with_comment_list, col_with_comment_list;
nonterminal ColWithComment col_with_comment;
nonterminal List<Expr> opt_col_mapping_list;
-nonterminal ColumnSeparator opt_field_term, column_separator;
+nonterminal Separator opt_field_term, separator;
nonterminal String opt_user_role;
nonterminal TablePattern tbl_pattern;
nonterminal ResourcePattern resource_pattern;
@@ -1444,14 +1444,14 @@ opt_field_term ::=
:}
| KW_COLUMNS KW_TERMINATED KW_BY STRING_LITERAL:sep
{:
- RESULT = new ColumnSeparator(sep);
+ RESULT = new Separator(sep);
:}
;
-column_separator ::=
+separator ::=
KW_COLUMNS KW_TERMINATED KW_BY STRING_LITERAL:sep
{:
- RESULT = new ColumnSeparator(sep);
+ RESULT = new Separator(sep);
:}
;
@@ -1597,7 +1597,7 @@ opt_load_property_list ::=
;
load_property ::=
- column_separator:colSep
+ separator:colSep
{:
RESULT = colSep;
:}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index 1960c28..568e938 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -345,7 +345,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
}
public void checkLoadProperties() throws UserException {
- ColumnSeparator columnSeparator = null;
+ Separator columnSeparator = null;
+ // TODO(yangzhengguo01): add line delimiter to properties
+ Separator lineDelimiter = null;
ImportColumnsStmt importColumnsStmt = null;
ImportWhereStmt precedingImportWhereStmt = null;
ImportWhereStmt importWhereStmt = null;
@@ -354,12 +356,12 @@ public class CreateRoutineLoadStmt extends DdlStmt {
ImportDeleteOnStmt importDeleteOnStmt = null;
if (loadPropertyList != null) {
for (ParseNode parseNode : loadPropertyList) {
- if (parseNode instanceof ColumnSeparator) {
+ if (parseNode instanceof Separator) {
// check column separator
if (columnSeparator != null) {
throw new AnalysisException("repeat setting of column separator");
}
- columnSeparator = (ColumnSeparator) parseNode;
+ columnSeparator = (Separator) parseNode;
columnSeparator.analyze(null);
} else if (parseNode instanceof ImportColumnsStmt) {
// check columns info
@@ -403,7 +405,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
}
}
}
- routineLoadDesc = new RoutineLoadDesc(columnSeparator, importColumnsStmt, precedingImportWhereStmt, importWhereStmt,
+ routineLoadDesc = new RoutineLoadDesc(columnSeparator, lineDelimiter, importColumnsStmt,
+ precedingImportWhereStmt, importWhereStmt,
partitionNames, importDeleteOnStmt == null ? null : importDeleteOnStmt.getExpr(), mergeType,
importSequenceStmt == null ? null : importSequenceStmt.getSequenceColName());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index 7e31c2c..31046a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -96,7 +96,7 @@ public class DataDescription {
private final String tableName;
private final PartitionNames partitionNames;
private final List<String> filePaths;
- private final ColumnSeparator columnSeparator;
+ private final Separator columnSeparator;
private final String fileFormat;
private final boolean isNegative;
// column names in the path
@@ -112,7 +112,7 @@ public class DataDescription {
private List<String> fileFieldNames;
// Used for mini load
private TNetworkAddress beAddr;
- private String lineDelimiter;
+ private Separator lineDelimiter;
private String columnDef;
private long backendId;
private boolean stripOuterArray = false;
@@ -141,7 +141,7 @@ public class DataDescription {
PartitionNames partitionNames,
List<String> filePaths,
List<String> columns,
- ColumnSeparator columnSeparator,
+ Separator columnSeparator,
String fileFormat,
boolean isNegative,
List<Expr> columnMappingList) {
@@ -153,7 +153,7 @@ public class DataDescription {
PartitionNames partitionNames,
List<String> filePaths,
List<String> columns,
- ColumnSeparator columnSeparator,
+ Separator columnSeparator,
String fileFormat,
List<String> columnsFromPath,
boolean isNegative,
@@ -428,7 +428,7 @@ public class DataDescription {
if (columnSeparator == null) {
return null;
}
- return columnSeparator.getColumnSeparator();
+ return columnSeparator.getSeparator();
}
public boolean isNegative() {
@@ -444,10 +444,13 @@ public class DataDescription {
}
public String getLineDelimiter() {
- return lineDelimiter;
+ if (lineDelimiter == null) {
+ return null;
+ }
+ return lineDelimiter.getSeparator();
}
- public void setLineDelimiter(String lineDelimiter) {
+ public void setLineDelimiter(Separator lineDelimiter) {
this.lineDelimiter = lineDelimiter;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnSeparator.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java
similarity index 93%
rename from fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnSeparator.java
rename to fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java
index 3d2b271..efc61d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnSeparator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java
@@ -23,18 +23,18 @@ import com.google.common.base.Strings;
import java.io.StringWriter;
-public class ColumnSeparator implements ParseNode {
+public class Separator implements ParseNode {
private static final String HEX_STRING = "0123456789ABCDEF";
private final String oriSeparator;
private String separator;
- public ColumnSeparator(String separator) {
+ public Separator(String separator) {
this.oriSeparator = separator;
this.separator = null;
}
- public String getColumnSeparator() {
+ public String getSeparator() {
return separator;
}
@@ -69,7 +69,8 @@ public class ColumnSeparator implements ParseNode {
}
if (originStr.toUpperCase().startsWith("\\X")) {
- String hexStr = originStr.substring(2);
+ // convert \x01\x02\x0a to 01020a
+ String hexStr = originStr.replaceAll("(?i)\\\\X", "");
// check hex str
if (hexStr.isEmpty()) {
throw new AnalysisException("Hex str is empty");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index e608887..272a226 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -17,7 +17,7 @@
package org.apache.doris.load;
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
@@ -110,8 +110,8 @@ public class BrokerFileGroup implements Writable {
// Used for broker table, no need to parse
public BrokerFileGroup(BrokerTable table) throws AnalysisException {
this.tableId = table.getId();
- this.valueSeparator = ColumnSeparator.convertSeparator(table.getColumnSeparator());
- this.lineDelimiter = table.getLineDelimiter();
+ this.valueSeparator = Separator.convertSeparator(table.getColumnSeparator());
+ this.lineDelimiter = Separator.convertSeparator(table.getLineDelimiter());
this.isNegative = false;
this.filePaths = table.getPaths();
this.fileFormat = table.getFileFormat();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 2cb3273..db7b958 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -22,7 +22,7 @@ import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.CastExpr;
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
@@ -258,9 +258,9 @@ public class Load {
// partitions | column names | separator | line delimiter
List<String> partitionNames = null;
List<String> columnNames = null;
- ColumnSeparator columnSeparator = null;
+ Separator columnSeparator = null;
List<String> hllColumnPairList = null;
- String lineDelimiter = null;
+ Separator lineDelimiter = null;
String formatType = null;
if (params != null) {
String specifiedPartitions = params.get(LoadStmt.KEY_IN_PARAM_PARTITIONS);
@@ -282,14 +282,25 @@ public class Load {
if (columnSeparatorStr.isEmpty()) {
columnSeparatorStr = "\t";
}
- columnSeparator = new ColumnSeparator(columnSeparatorStr);
+ columnSeparator = new Separator(columnSeparatorStr);
try {
columnSeparator.analyze();
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
}
- lineDelimiter = params.get(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER);
+ String lineDelimiterStr = params.get(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER);
+ if (lineDelimiterStr != null) {
+ if (lineDelimiterStr.isEmpty()) {
+ lineDelimiterStr = "\n";
+ }
+ lineDelimiter = new Separator(lineDelimiterStr);
+ try {
+ lineDelimiter.analyze();
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+ }
formatType = params.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java b/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java
index bb7de5c..f9053bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/RoutineLoadDesc.java
@@ -18,7 +18,7 @@
package org.apache.doris.load;
import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnsStmt;
import org.apache.doris.analysis.ImportWhereStmt;
@@ -30,7 +30,8 @@ import org.apache.doris.load.loadv2.LoadTask;
import com.google.common.base.Strings;
public class RoutineLoadDesc {
- private final ColumnSeparator columnSeparator;
+ private final Separator columnSeparator;
+ private final Separator lineDelimiter;
private final ImportColumnsStmt columnsInfo;
private final ImportWhereStmt precedingFilter;
private final ImportWhereStmt wherePredicate;
@@ -40,11 +41,12 @@ public class RoutineLoadDesc {
private final PartitionNames partitionNames;
private final String sequenceColName;
- public RoutineLoadDesc(ColumnSeparator columnSeparator, ImportColumnsStmt columnsInfo,
+ public RoutineLoadDesc(Separator columnSeparator, Separator lineDelimiter, ImportColumnsStmt columnsInfo,
ImportWhereStmt precedingFilter, ImportWhereStmt wherePredicate,
PartitionNames partitionNames, Expr deleteCondition, LoadTask.MergeType mergeType,
String sequenceColName) {
this.columnSeparator = columnSeparator;
+ this.lineDelimiter = lineDelimiter;
this.columnsInfo = columnsInfo;
this.precedingFilter = precedingFilter;
this.wherePredicate = wherePredicate;
@@ -54,10 +56,14 @@ public class RoutineLoadDesc {
this.sequenceColName = sequenceColName;
}
- public ColumnSeparator getColumnSeparator() {
+ public Separator getColumnSeparator() {
return columnSeparator;
}
+ public Separator getLineDelimiter() {
+ return lineDelimiter;
+ }
+
public ImportColumnsStmt getColumnsInfo() {
return columnsInfo;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index de9ff6d..812c053 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -18,7 +18,7 @@
package org.apache.doris.load.routineload;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
@@ -158,7 +158,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
protected List<ImportColumnDesc> columnDescs; // optional
protected Expr precedingFilter; // optional
protected Expr whereExpr; // optional
- protected ColumnSeparator columnSeparator; // optional
+ protected Separator columnSeparator; // optional
+ protected Separator lineDelimiter;
protected int desireTaskConcurrentNum; // optional
protected JobState state = JobState.NEED_SCHEDULE;
protected LoadDataSourceType dataSourceType;
@@ -362,6 +363,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
if (routineLoadDesc.getColumnSeparator() != null) {
columnSeparator = routineLoadDesc.getColumnSeparator();
}
+ if (routineLoadDesc.getLineDelimiter() != null) {
+ lineDelimiter = routineLoadDesc.getLineDelimiter();
+ }
if (routineLoadDesc.getPartitionNames() != null) {
partitions = routineLoadDesc.getPartitionNames();
}
@@ -485,10 +489,14 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
return whereExpr;
}
- public ColumnSeparator getColumnSeparator() {
+ public Separator getColumnSeparator() {
return columnSeparator;
}
+ public Separator getLineDelimiter() {
+ return lineDelimiter;
+ }
+
public boolean isStrictMode() {
String value = jobProperties.get(LoadStmt.STRICT_MODE);
if (value == null) {
@@ -1350,6 +1358,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
jobProperties.put("dataFormat", "json");
} else {
jobProperties.put("columnSeparator", columnSeparator == null ? "\t" : columnSeparator.toString());
+ jobProperties.put("lineDelimiter", lineDelimiter == null ? "\n" : lineDelimiter.toString());
}
jobProperties.put("maxErrorNum", String.valueOf(maxErrorNum));
jobProperties.put("maxBatchIntervalS", String.valueOf(maxBatchIntervalS));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index c5d0b31..e3e7dc5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -200,6 +200,10 @@ public class BrokerScanNode extends LoadScanNode {
BrokerFileGroup fileGroup = context.fileGroup;
params.setColumnSeparator(fileGroup.getValueSeparator().getBytes(Charset.forName("UTF-8"))[0]);
params.setLineDelimiter(fileGroup.getLineDelimiter().getBytes(Charset.forName("UTF-8"))[0]);
+ params.setColumnSeparatorStr(fileGroup.getValueSeparator());
+ params.setLineDelimiterStr(fileGroup.getLineDelimiter());
+ params.setColumnSeparatorLength(fileGroup.getValueSeparator().getBytes(Charset.forName("UTF-8")).length);
+ params.setLineDelimiterLength(fileGroup.getLineDelimiter().getBytes(Charset.forName("UTF-8")).length);
params.setStrictMode(strictMode);
params.setProperties(brokerDesc.getProperties());
deleteCondition = fileGroup.getDeleteCondition();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
index 24d614b..112fa22 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
@@ -144,12 +144,24 @@ public class StreamLoadScanNode extends LoadScanNode {
createDefaultSmap(analyzer);
if (taskInfo.getColumnSeparator() != null) {
- String sep = taskInfo.getColumnSeparator().getColumnSeparator();
+ String sep = taskInfo.getColumnSeparator().getSeparator();
+ params.setColumnSeparatorStr(sep);
+ params.setColumnSeparatorLength(sep.getBytes(Charset.forName("UTF-8")).length);
params.setColumnSeparator(sep.getBytes(Charset.forName("UTF-8"))[0]);
} else {
params.setColumnSeparator((byte) '\t');
+ params.setColumnSeparatorLength(1);
+ params.setColumnSeparatorStr("\t");
+ }
+ if (taskInfo.getLineDelimiter() != null) {
+ String sep = taskInfo.getLineDelimiter().getSeparator();
+ params.setLineDelimiterStr(sep);
+ params.setLineDelimiterLength(sep.getBytes(Charset.forName("UTF-8")).length);
+ params.setLineDelimiter(sep.getBytes(Charset.forName("UTF-8"))[0]);
+ } else {
+ params.setLineDelimiter((byte) '\n');
+ params.setLineDelimiterLength(1);
}
- params.setLineDelimiter((byte) '\n');
params.setDestTupleId(desc.getId().asInt());
brokerScanRange.setParams(params);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
index cd42c98..7ddd39f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
@@ -19,7 +19,7 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportWhereStmt;
@@ -456,7 +456,7 @@ public class MultiLoadMgr {
fileSizes.add(pair.second);
});
}
- ColumnSeparator columnSeparator = null;
+ Separator columnSeparator = null;
PartitionNames partitionNames = null;
String fileFormat = properties.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE);
boolean isNegative = properties.get(LoadStmt.KEY_IN_PARAM_NEGATIVE) == null ? false :
@@ -475,7 +475,7 @@ public class MultiLoadMgr {
colString = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS);
String columnSeparatorStr = properties.get(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR);
if (columnSeparatorStr != null) {
- columnSeparator = new ColumnSeparator(columnSeparatorStr);
+ columnSeparator = new Separator(columnSeparatorStr);
try {
columnSeparator.analyze();
} catch (AnalysisException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index 983a944..c6dd020 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -17,7 +17,7 @@
package org.apache.doris.task;
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.PartitionNames;
@@ -51,5 +51,6 @@ public interface LoadTaskInfo {
public Expr getPrecedingFilter();
public Expr getWhereExpr();
- public ColumnSeparator getColumnSeparator();
+ public Separator getColumnSeparator();
+ public Separator getLineDelimiter();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index c79e5c7..57d07f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -17,7 +17,7 @@
package org.apache.doris.task;
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.ImportColumnsStmt;
@@ -62,7 +62,8 @@ public class StreamLoadTask implements LoadTaskInfo {
// optional
private List<ImportColumnDesc> columnExprDescs = Lists.newArrayList();
private Expr whereExpr;
- private ColumnSeparator columnSeparator;
+ private Separator columnSeparator;
+ private Separator lineDelimiter;
private PartitionNames partitions;
private String path;
private boolean negative;
@@ -114,10 +115,14 @@ public class StreamLoadTask implements LoadTaskInfo {
return whereExpr;
}
- public ColumnSeparator getColumnSeparator() {
+ public Separator getColumnSeparator() {
return columnSeparator;
}
+ public Separator getLineDelimiter() {
+ return lineDelimiter;
+ }
+
public PartitionNames getPartitions() {
return partitions;
}
@@ -217,6 +222,9 @@ public class StreamLoadTask implements LoadTaskInfo {
if (request.isSetColumnSeparator()) {
setColumnSeparator(request.getColumnSeparator());
}
+ if (request.isSetLineDelimiter()) {
+ setLineDelimiter(request.getLineDelimiter());
+ }
if (request.isSetPartitions()) {
String[] partNames = request.getPartitions().trim().split("\\s*,\\s*");
if (request.isSetIsTempPartition()) {
@@ -331,10 +339,15 @@ public class StreamLoadTask implements LoadTaskInfo {
}
private void setColumnSeparator(String oriSeparator) throws AnalysisException {
- columnSeparator = new ColumnSeparator(oriSeparator);
+ columnSeparator = new Separator(oriSeparator);
columnSeparator.analyze();
}
+ private void setLineDelimiter(String oriLineDelimiter) throws AnalysisException {
+ lineDelimiter = new Separator(oriLineDelimiter);
+ lineDelimiter.analyze();
+ }
+
@Override
public long getMemLimit() {
return execMemLimit;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java
index d964502..8c3a1fa 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java
@@ -94,7 +94,7 @@ public class CreateRoutineLoadStmtTest {
List<String> partitionNameString = Lists.newArrayList();
partitionNameString.add("p1");
PartitionNames partitionNames = new PartitionNames(false, partitionNameString);
- ColumnSeparator columnSeparator = new ColumnSeparator(",");
+ Separator columnSeparator = new Separator(",");
// duplicate load property
List<ParseNode> loadPropertyList = new ArrayList<>();
@@ -142,7 +142,7 @@ public class CreateRoutineLoadStmtTest {
List<String> partitionNameString = Lists.newArrayList();
partitionNameString.add("p1");
PartitionNames partitionNames = new PartitionNames(false, partitionNameString);
- ColumnSeparator columnSeparator = new ColumnSeparator(",");
+ Separator columnSeparator = new Separator(",");
// duplicate load property
TableName tableName = new TableName(dbName, tableNameString);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java
index 9197c6b..03f21cc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java
@@ -119,7 +119,7 @@ public class DataDescriptionTest {
Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1), new IntLiteral(1));
desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
- Lists.newArrayList("col1", "col2"), new ColumnSeparator(","), "csv", null, false, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null);
+ Lists.newArrayList("col1", "col2"), new Separator(","), "csv", null, false, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null);
desc.analyze("testDb");
Assert.assertEquals("MERGE DATA INFILE ('abc.txt') INTO TABLE testTable COLUMNS TERMINATED BY ',' (col1, col2) WHERE 1 = 1 DELETE ON 1 = 1", desc.toString());
Assert.assertEquals("1 = 1", desc.getWhereExpr().toSql());
@@ -127,7 +127,7 @@ public class DataDescriptionTest {
Assert.assertEquals(",", desc.getColumnSeparator());
desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt", "bcd.txt"),
- Lists.newArrayList("col1", "col2"), new ColumnSeparator("\t"),
+ Lists.newArrayList("col1", "col2"), new Separator("\t"),
null, true, null);
desc.analyze("testDb");
Assert.assertEquals("APPEND DATA INFILE ('abc.txt', 'bcd.txt') NEGATIVE INTO TABLE testTable"
@@ -136,7 +136,7 @@ public class DataDescriptionTest {
// hive \x01 column separator
desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt", "bcd.txt"),
- Lists.newArrayList("col1", "col2"), new ColumnSeparator("\\x01"),
+ Lists.newArrayList("col1", "col2"), new Separator("\\x01"),
null, true, null);
desc.analyze("testDb");
Assert.assertEquals("APPEND DATA INFILE ('abc.txt', 'bcd.txt') NEGATIVE INTO TABLE testTable"
@@ -220,7 +220,7 @@ public class DataDescriptionTest {
Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, new IntLiteral(1), new IntLiteral(1));
DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
- Lists.newArrayList("col1", "col2"), new ColumnSeparator(","), "csv", null, true, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null);
+ Lists.newArrayList("col1", "col2"), new Separator(","), "csv", null, true, null, null, whereExpr, LoadTask.MergeType.MERGE, whereExpr, null);
desc.analyze("testDb");
}
@@ -311,7 +311,7 @@ public class DataDescriptionTest {
@Test
public void testAnalyzeSequenceColumnNormal() throws AnalysisException {
DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
- Lists.newArrayList("k1", "k2", "source_sequence", "v1"), new ColumnSeparator("\t"),
+ Lists.newArrayList("k1", "k2", "source_sequence", "v1"), new Separator("\t"),
null, null, false, null, null, null, LoadTask.MergeType.APPEND, null, "source_sequence");
new Expectations() {
{
@@ -330,7 +330,7 @@ public class DataDescriptionTest {
@Test(expected = AnalysisException.class)
public void testAnalyzeSequenceColumnWithoutSourceSequence() throws AnalysisException {
DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
- Lists.newArrayList("k1", "k2", "v1"), new ColumnSeparator("\t"),
+ Lists.newArrayList("k1", "k2", "v1"), new Separator("\t"),
null, null, false, null, null, null, LoadTask.MergeType.APPEND, null, "source_sequence");
new Expectations() {
{
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnSeparatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/SeparatorTest.java
similarity index 70%
rename from fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnSeparatorTest.java
rename to fe/fe-core/src/test/java/org/apache/doris/analysis/SeparatorTest.java
index 8426992..e499cc1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnSeparatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SeparatorTest.java
@@ -22,47 +22,47 @@ import org.apache.doris.common.AnalysisException;
import org.junit.Assert;
import org.junit.Test;
-public class ColumnSeparatorTest {
+public class SeparatorTest {
@Test
public void testNormal() throws AnalysisException {
// \t
- ColumnSeparator separator = new ColumnSeparator("\t");
+ Separator separator = new Separator("\t");
separator.analyze();
Assert.assertEquals("'\t'", separator.toSql());
- Assert.assertEquals("\t", separator.getColumnSeparator());
+ Assert.assertEquals("\t", separator.getSeparator());
// \x01
- separator = new ColumnSeparator("\\x01");
+ separator = new Separator("\\x01");
separator.analyze();
Assert.assertEquals("'\\x01'", separator.toSql());
- Assert.assertEquals("\1", separator.getColumnSeparator());
+ Assert.assertEquals("\1", separator.getSeparator());
// \x00 \x01
- separator = new ColumnSeparator("\\x0001");
+ separator = new Separator("\\x0001");
separator.analyze();
Assert.assertEquals("'\\x0001'", separator.toSql());
- Assert.assertEquals("\0\1", separator.getColumnSeparator());
+ Assert.assertEquals("\0\1", separator.getSeparator());
- separator = new ColumnSeparator("|");
+ separator = new Separator("|");
separator.analyze();
Assert.assertEquals("'|'", separator.toSql());
- Assert.assertEquals("|", separator.getColumnSeparator());
+ Assert.assertEquals("|", separator.getSeparator());
- separator = new ColumnSeparator("\\|");
+ separator = new Separator("\\|");
separator.analyze();
Assert.assertEquals("'\\|'", separator.toSql());
- Assert.assertEquals("\\|", separator.getColumnSeparator());
+ Assert.assertEquals("\\|", separator.getSeparator());
}
@Test(expected = AnalysisException.class)
public void testHexFormatError() throws AnalysisException {
- ColumnSeparator separator = new ColumnSeparator("\\x0g");
+ Separator separator = new Separator("\\x0g");
separator.analyze();
}
@Test(expected = AnalysisException.class)
public void testHexLengthError() throws AnalysisException {
- ColumnSeparator separator = new ColumnSeparator("\\x011");
+ Separator separator = new Separator("\\x011");
separator.analyze();
}
}
\ No newline at end of file
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 5bbab3a..701eb80 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -17,7 +17,7 @@
package org.apache.doris.load.routineload;
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.ImportSequenceStmt;
import org.apache.doris.analysis.LabelName;
@@ -80,7 +80,7 @@ public class KafkaRoutineLoadJobTest {
private PartitionNames partitionNames;
- private ColumnSeparator columnSeparator = new ColumnSeparator(",");
+ private Separator columnSeparator = new Separator(",");
private ImportSequenceStmt sequenceStmt = new ImportSequenceStmt("source_sequence");
@@ -244,7 +244,7 @@ public class KafkaRoutineLoadJobTest {
public void testFromCreateStmtWithErrorTable(@Mocked Catalog catalog,
@Injectable Database database) throws LoadException {
CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt();
- RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, null,
+ RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, null, null,
partitionNames, null, LoadTask.MergeType.APPEND, null);
Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc);
@@ -269,7 +269,7 @@ public class KafkaRoutineLoadJobTest {
@Injectable Database database,
@Injectable OlapTable table) throws UserException {
CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt();
- RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, null, partitionNames, null,
+ RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, null, null, partitionNames, null,
LoadTask.MergeType.APPEND, sequenceStmt.getSequenceColName());
Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc);
List<Pair<Integer, Long>> partitionIdToOffset = Lists.newArrayList();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index cd320f8..ea460e4 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -17,7 +17,7 @@
package org.apache.doris.load.routineload;
-import org.apache.doris.analysis.ColumnSeparator;
+import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.ParseNode;
@@ -82,7 +82,7 @@ public class RoutineLoadManagerTest {
String tableNameString = "table1";
TableName tableName = new TableName(dbName, tableNameString);
List<ParseNode> loadPropertyList = new ArrayList<>();
- ColumnSeparator columnSeparator = new ColumnSeparator(",");
+ Separator columnSeparator = new Separator(",");
loadPropertyList.add(columnSeparator);
Map<String, String> properties = Maps.newHashMap();
properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
@@ -152,7 +152,7 @@ public class RoutineLoadManagerTest {
String tableNameString = "table1";
TableName tableName = new TableName(dbName, tableNameString);
List<ParseNode> loadPropertyList = new ArrayList<>();
- ColumnSeparator columnSeparator = new ColumnSeparator(",");
+ Separator columnSeparator = new Separator(",");
loadPropertyList.add(columnSeparator);
Map<String, String> properties = Maps.newHashMap();
properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 4e6cfd1..a6d7387 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -578,6 +578,7 @@ struct TStreamLoadPutRequest {
29: optional string sequence_col
30: optional bool num_as_string
31: optional bool fuzzy_parse
+ 32: optional string line_delimiter
}
struct TStreamLoadPutResult {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 58c09c8..96e1212 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -166,6 +166,12 @@ struct TBrokerScanRangeParams {
// strictMode is a boolean
// if strict mode is true, the incorrect data (the result of cast is null) will not be loaded
10: optional bool strict_mode
+ // for multibytes separators
+ 11: optional i32 column_separator_length = 1;
+ 12: optional i32 line_delimiter_length = 1;
+ 13: optional string column_separator_str;
+ 14: optional string line_delimiter_str;
+
}
// Broker scan range
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org