You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/03/23 01:37:24 UTC
[incubator-doris] branch master updated: [Bug] Fix bug that the
last column may be null when using multibytes separator (#5534)
This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new cef3cbc [Bug] Fix bug that the last column may be null when using multibytes separator (#5534)
cef3cbc is described below
commit cef3cbc53ac9a23cd578ac92af9bc4125d9ae837
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Tue Mar 23 09:35:30 2021 +0800
[Bug] Fix bug that the last column may be null when using multibytes separator (#5534)
---
be/src/exec/broker_scanner.cpp | 44 +++++---
be/test/exec/CMakeLists.txt | 1 +
be/test/exec/broker_scanner_test.cpp | 42 ++++++++
be/test/exec/multi_bytes_separator_test.cpp | 117 +++++++++++++++++++++
.../test_data/broker_scanner/multi_bytes_sep.csv | 1 +
.../doris/analysis/CreateRoutineLoadStmt.java | 2 +-
6 files changed, 192 insertions(+), 15 deletions(-)
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index e14f1fb..c4254c0 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -305,23 +305,39 @@ void BrokerScanner::close() {
void BrokerScanner::split_line(const Slice& line, std::vector<Slice>* values) {
const char* value = line.data;
- size_t i = 0;
- // TODO improve the performance
- while (i < line.size) {
- if (i + _value_separator_length <= line.size) {
- if (_value_separator.compare(0, _value_separator_length, line.data + i,
- _value_separator_length) == 0) {
- values->emplace_back(value, line.data + i - value);
- value = line.data + i + _value_separator_length;
- i += _value_separator_length;
- } else {
- ++i;
- }
+ size_t start = 0; // point to the start pos of next col value.
+ size_t curpos= 0; // point to the start pos of separator matching sequence.
+ size_t p1 = 0; // point to the current pos of separator matching sequence.
+
+ // Separator: AAAA
+ //
+ // curpos
+ // ▼
+ // AAAA
+ // 1000AAAA2000AAAA
+ // ▲ ▲
+ // Start │
+ // p1
+
+ while (curpos < line.size) {
+ if (*(value + curpos + p1) != _value_separator[p1]) {
+ // Not match, move forward:
+ curpos += (p1 == 0 ? 1 : p1);
+ p1 = 0;
} else {
- break;
+ p1++;
+ if (p1 == _value_separator_length) {
+ // Match a separator
+ values->emplace_back(value + start, curpos - start);
+ start = curpos + _value_separator_length;
+ curpos = start;
+ p1 = 0;
+ }
}
}
- values->emplace_back(value, line.data + i - value);
+
+ CHECK(curpos == line.size) << curpos << " vs " << line.size;
+ values->emplace_back(value + start, curpos - start);
}
void BrokerScanner::fill_fix_length_string(const Slice& value, MemPool* pool, char** new_value_p,
diff --git a/be/test/exec/CMakeLists.txt b/be/test/exec/CMakeLists.txt
index 7cd6a93..a35c51f 100644
--- a/be/test/exec/CMakeLists.txt
+++ b/be/test/exec/CMakeLists.txt
@@ -73,3 +73,4 @@ ADD_BE_TEST(unix_odbc_test)
#ADD_BE_TEST(schema_scanner/schema_collations_scanner_test)
#ADD_BE_TEST(schema_scanner/schema_charsets_scanner_test)
ADD_BE_TEST(s3_reader_test)
+ADD_BE_TEST(multi_bytes_separator_test)
diff --git a/be/test/exec/broker_scanner_test.cpp b/be/test/exec/broker_scanner_test.cpp
index 98f163b..ade80a1 100644
--- a/be/test/exec/broker_scanner_test.cpp
+++ b/be/test/exec/broker_scanner_test.cpp
@@ -657,6 +657,48 @@ TEST_F(BrokerScannerTest, normal9) {
ASSERT_TRUE(eof);
}
+TEST_F(BrokerScannerTest, multi_bytes_1) {
+ std::vector<TBrokerRangeDesc> ranges;
+ TBrokerRangeDesc range;
+ range.path = "./be/test/exec/test_data/broker_scanner/multi_bytes_sep.csv";
+ range.start_offset = 0;
+ range.size = 18;
+ range.splittable = true;
+ range.file_type = TFileType::FILE_LOCAL;
+ range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
+ ranges.push_back(range);
+
+ _params.column_separator_str = "AAAA";
+ _params.line_delimiter_str = "BB";
+ _params.column_separator_length = 4;
+ _params.line_delimiter_length = 2;
+ BrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter, &_counter);
+ auto st = scanner.open();
+ ASSERT_TRUE(st.ok());
+
+ MemPool tuple_pool(_tracker.get());
+ Tuple* tuple = (Tuple*)tuple_pool.allocate(20);
+ bool eof = false;
+ // 4,5,6
+ st = scanner.get_next(tuple, &tuple_pool, &eof);
+ ASSERT_TRUE(st.ok());
+ ASSERT_FALSE(eof);
+ ASSERT_EQ(4, *(int*)tuple->get_slot(0));
+ ASSERT_EQ(5, *(int*)tuple->get_slot(4));
+ ASSERT_EQ(6, *(int*)tuple->get_slot(8));
+ // 1,2,3
+ st = scanner.get_next(tuple, &tuple_pool, &eof);
+ ASSERT_TRUE(st.ok());
+ ASSERT_FALSE(eof);
+ ASSERT_EQ(1, *(int*)tuple->get_slot(0));
+ ASSERT_EQ(2, *(int*)tuple->get_slot(4));
+ ASSERT_EQ(3, *(int*)tuple->get_slot(8));
+ // end of file
+ st = scanner.get_next(tuple, &tuple_pool, &eof);
+ ASSERT_TRUE(st.ok());
+ ASSERT_TRUE(eof);
+}
+
} // end namespace doris
int main(int argc, char** argv) {
diff --git a/be/test/exec/multi_bytes_separator_test.cpp b/be/test/exec/multi_bytes_separator_test.cpp
new file mode 100644
index 0000000..c3fea0e
--- /dev/null
+++ b/be/test/exec/multi_bytes_separator_test.cpp
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/broker_scanner.h"
+
+#include <gtest/gtest.h>
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "common/object_pool.h"
+#include "exec/local_file_reader.h"
+#include "exprs/cast_functions.h"
+#include "gen_cpp/Descriptors_types.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "runtime/descriptors.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/runtime_state.h"
+#include "runtime/tuple.h"
+#include "runtime/user_function_cache.h"
+
+namespace doris {
+
+class MultiBytesSeparatorTest: public testing::Test {
+public:
+ MultiBytesSeparatorTest() {}
+
+protected:
+ virtual void SetUp() {}
+ virtual void TearDown() {}
+};
+
+
+TEST_F(MultiBytesSeparatorTest, normal) {
+ TBrokerScanRangeParams params;
+ params.column_separator = ',';
+ params.line_delimiter = '\n';
+ params.column_separator_str = "AAAA";
+ params.line_delimiter_str = "BBB";
+ params.column_separator_length = 4;
+ params.line_delimiter_length = 3;
+
+ const std::vector<TBrokerRangeDesc> ranges;
+ const std::vector<TNetworkAddress> broker_addresses;
+ const std::vector<ExprContext*> pre_filter_ctxs;
+ BrokerScanner scanner(nullptr, nullptr, params, ranges, broker_addresses, pre_filter_ctxs, nullptr);
+
+#define private public
+
+ // 1.
+ {
+ std::string line = "AAAA";
+ Slice s(line);
+ std::vector<Slice> values;
+ scanner.split_line(s, &values);
+ ASSERT_EQ(2, values.size());
+ ASSERT_EQ(0, values[0].size);
+ ASSERT_EQ(0, values[1].size);
+ }
+
+ // 2.
+ {
+ std::string line = "ABAA";
+ Slice s(line);
+ std::vector<Slice> values;
+ scanner.split_line(s, &values);
+ ASSERT_EQ(1, values.size());
+ ASSERT_EQ(4, values[0].size);
+ }
+
+ // 3.
+ {
+ std::string line = "";
+ Slice s(line);
+ std::vector<Slice> values;
+ scanner.split_line(s, &values);
+ ASSERT_EQ(1, values.size());
+ ASSERT_EQ(0, values[0].size);
+ }
+
+ // 4.
+ {
+ // 1234, AAAB, , AA
+ std::string line = "1234AAAAAAABAAAAAAAAAA";
+ Slice s(line);
+ std::vector<Slice> values;
+ scanner.split_line(s, &values);
+ ASSERT_EQ(4, values.size());
+ ASSERT_EQ(4, values[0].size);
+ ASSERT_EQ(4, values[1].size);
+ ASSERT_EQ(0, values[2].size);
+ ASSERT_EQ(2, values[3].size);
+ }
+}
+
+
+} // end namespace doris
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/be/test/exec/test_data/broker_scanner/multi_bytes_sep.csv b/be/test/exec/test_data/broker_scanner/multi_bytes_sep.csv
new file mode 100644
index 0000000..ae65330
--- /dev/null
+++ b/be/test/exec/test_data/broker_scanner/multi_bytes_sep.csv
@@ -0,0 +1 @@
+4AAAA5AAAA6BB1AAAA2AAAA3BB
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index 568e938..a828b1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -331,7 +331,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
}
Table table = db.getTable(tableName);
if (table == null) {
- throw new AnalysisException("table: " + dbName + " not found.");
+ throw new AnalysisException("table: " + tableName + " not found.");
}
if (mergeType != LoadTask.MergeType.APPEND
&& (table.getType() != Table.TableType.OLAP
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org