You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/03/23 01:37:24 UTC

[incubator-doris] branch master updated: [Bug] Fix bug that the last column may be null when using multibytes separator (#5534)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cef3cbc  [Bug] Fix bug that the last column may be null when using multibytes separator (#5534)
cef3cbc is described below

commit cef3cbc53ac9a23cd578ac92af9bc4125d9ae837
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Tue Mar 23 09:35:30 2021 +0800

    [Bug] Fix bug that the last column may be null when using multibytes separator (#5534)
---
 be/src/exec/broker_scanner.cpp                     |  44 +++++---
 be/test/exec/CMakeLists.txt                        |   1 +
 be/test/exec/broker_scanner_test.cpp               |  42 ++++++++
 be/test/exec/multi_bytes_separator_test.cpp        | 117 +++++++++++++++++++++
 .../test_data/broker_scanner/multi_bytes_sep.csv   |   1 +
 .../doris/analysis/CreateRoutineLoadStmt.java      |   2 +-
 6 files changed, 192 insertions(+), 15 deletions(-)

diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index e14f1fb..c4254c0 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -305,23 +305,39 @@ void BrokerScanner::close() {
 
 void BrokerScanner::split_line(const Slice& line, std::vector<Slice>* values) {
     const char* value = line.data;
-    size_t i = 0;
-    // TODO improve the performance
-    while (i < line.size) {
-        if (i + _value_separator_length <= line.size) {
-            if (_value_separator.compare(0, _value_separator_length, line.data + i,
-                                         _value_separator_length) == 0) {
-                values->emplace_back(value, line.data + i - value);
-                value = line.data + i + _value_separator_length;
-                i += _value_separator_length;
-            } else {
-                ++i;
-            }
+    size_t start = 0; // point to the start pos of next col value.
+    size_t curpos= 0; // point to the start pos of separator matching sequence.
+    size_t p1 = 0;    // point to the current pos of separator matching sequence.
+
+    // Separator: AAAA
+    // 
+    //   curpos
+    //     ▼
+    //     AAAA
+    //   1000AAAA2000AAAA
+    //   ▲   ▲
+    // Start │
+    //       p1
+
+    while (curpos < line.size) {
+        if (*(value + curpos + p1) != _value_separator[p1]) {
+            // Not match, move forward:
+            curpos += (p1 == 0 ? 1 : p1);
+            p1 = 0;
         } else {
-            break;
+            p1++;
+            if (p1 == _value_separator_length) {
+                // Match a separator
+                values->emplace_back(value + start, curpos - start);
+                start = curpos + _value_separator_length;
+                curpos = start;
+                p1 = 0;
+            }
         }
     }
-    values->emplace_back(value, line.data + i - value);
+
+    CHECK(curpos == line.size) << curpos << " vs " <<  line.size;
+    values->emplace_back(value + start, curpos - start);
 }
 
 void BrokerScanner::fill_fix_length_string(const Slice& value, MemPool* pool, char** new_value_p,
diff --git a/be/test/exec/CMakeLists.txt b/be/test/exec/CMakeLists.txt
index 7cd6a93..a35c51f 100644
--- a/be/test/exec/CMakeLists.txt
+++ b/be/test/exec/CMakeLists.txt
@@ -73,3 +73,4 @@ ADD_BE_TEST(unix_odbc_test)
 #ADD_BE_TEST(schema_scanner/schema_collations_scanner_test)
 #ADD_BE_TEST(schema_scanner/schema_charsets_scanner_test)
 ADD_BE_TEST(s3_reader_test)
+ADD_BE_TEST(multi_bytes_separator_test)
diff --git a/be/test/exec/broker_scanner_test.cpp b/be/test/exec/broker_scanner_test.cpp
index 98f163b..ade80a1 100644
--- a/be/test/exec/broker_scanner_test.cpp
+++ b/be/test/exec/broker_scanner_test.cpp
@@ -657,6 +657,48 @@ TEST_F(BrokerScannerTest, normal9) {
     ASSERT_TRUE(eof);
 }
 
+TEST_F(BrokerScannerTest, multi_bytes_1) {
+    std::vector<TBrokerRangeDesc> ranges;
+    TBrokerRangeDesc range;
+    range.path = "./be/test/exec/test_data/broker_scanner/multi_bytes_sep.csv";
+    range.start_offset = 0;
+    range.size = 18;
+    range.splittable = true;
+    range.file_type = TFileType::FILE_LOCAL;
+    range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
+    ranges.push_back(range);
+
+    _params.column_separator_str = "AAAA";
+    _params.line_delimiter_str = "BB";
+    _params.column_separator_length = 4;
+    _params.line_delimiter_length = 2;
+    BrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter, &_counter);
+    auto st = scanner.open();
+    ASSERT_TRUE(st.ok());
+
+    MemPool tuple_pool(_tracker.get());
+    Tuple* tuple = (Tuple*)tuple_pool.allocate(20);
+    bool eof = false;
+    // 4,5,6
+    st = scanner.get_next(tuple, &tuple_pool, &eof);
+    ASSERT_TRUE(st.ok());
+    ASSERT_FALSE(eof);
+    ASSERT_EQ(4, *(int*)tuple->get_slot(0));
+    ASSERT_EQ(5, *(int*)tuple->get_slot(4));
+    ASSERT_EQ(6, *(int*)tuple->get_slot(8));
+    // 1,2,3
+    st = scanner.get_next(tuple, &tuple_pool, &eof);
+    ASSERT_TRUE(st.ok());
+    ASSERT_FALSE(eof);
+    ASSERT_EQ(1, *(int*)tuple->get_slot(0));
+    ASSERT_EQ(2, *(int*)tuple->get_slot(4));
+    ASSERT_EQ(3, *(int*)tuple->get_slot(8));
+    // end of file
+    st = scanner.get_next(tuple, &tuple_pool, &eof);
+    ASSERT_TRUE(st.ok());
+    ASSERT_TRUE(eof);
+}
+
 } // end namespace doris
 
 int main(int argc, char** argv) {
diff --git a/be/test/exec/multi_bytes_separator_test.cpp b/be/test/exec/multi_bytes_separator_test.cpp
new file mode 100644
index 0000000..c3fea0e
--- /dev/null
+++ b/be/test/exec/multi_bytes_separator_test.cpp
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/broker_scanner.h"
+
+#include <gtest/gtest.h>
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "common/object_pool.h"
+#include "exec/local_file_reader.h"
+#include "exprs/cast_functions.h"
+#include "gen_cpp/Descriptors_types.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "runtime/descriptors.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/runtime_state.h"
+#include "runtime/tuple.h"
+#include "runtime/user_function_cache.h"
+
+namespace doris {
+
+class MultiBytesSeparatorTest: public testing::Test {
+public:
+    MultiBytesSeparatorTest() {}
+
+protected:
+    virtual void SetUp() {}
+    virtual void TearDown() {}
+};
+
+
+TEST_F(MultiBytesSeparatorTest, normal) {
+    TBrokerScanRangeParams params;
+    params.column_separator = ',';
+    params.line_delimiter = '\n';
+    params.column_separator_str = "AAAA";
+    params.line_delimiter_str = "BBB";
+    params.column_separator_length = 4;
+    params.line_delimiter_length = 3;
+    
+    const std::vector<TBrokerRangeDesc> ranges;
+    const std::vector<TNetworkAddress> broker_addresses;
+    const std::vector<ExprContext*> pre_filter_ctxs;
+    BrokerScanner scanner(nullptr, nullptr, params, ranges, broker_addresses, pre_filter_ctxs, nullptr);
+
+#define private public
+
+    // 1.
+    {
+        std::string line = "AAAA";
+        Slice s(line);
+        std::vector<Slice> values;
+        scanner.split_line(s, &values);
+        ASSERT_EQ(2, values.size());
+        ASSERT_EQ(0, values[0].size);
+        ASSERT_EQ(0, values[1].size);
+    }
+
+    // 2.
+    {
+        std::string line = "ABAA";
+        Slice s(line);
+        std::vector<Slice> values;
+        scanner.split_line(s, &values);
+        ASSERT_EQ(1, values.size());
+        ASSERT_EQ(4, values[0].size);
+    }
+
+    // 3.
+    {
+        std::string line = "";
+        Slice s(line);
+        std::vector<Slice> values;
+        scanner.split_line(s, &values);
+        ASSERT_EQ(1, values.size());
+        ASSERT_EQ(0, values[0].size);
+    }
+
+    // 4.
+    {
+        // 1234, AAAB, , AA
+        std::string line = "1234AAAAAAABAAAAAAAAAA";
+        Slice s(line);
+        std::vector<Slice> values;
+        scanner.split_line(s, &values);
+        ASSERT_EQ(4, values.size());
+        ASSERT_EQ(4, values[0].size);
+        ASSERT_EQ(4, values[1].size);
+        ASSERT_EQ(0, values[2].size);
+        ASSERT_EQ(2, values[3].size);
+    }
+}
+
+
+} // end namespace doris
+
+int main(int argc, char** argv) {
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
diff --git a/be/test/exec/test_data/broker_scanner/multi_bytes_sep.csv b/be/test/exec/test_data/broker_scanner/multi_bytes_sep.csv
new file mode 100644
index 0000000..ae65330
--- /dev/null
+++ b/be/test/exec/test_data/broker_scanner/multi_bytes_sep.csv
@@ -0,0 +1 @@
+4AAAA5AAAA6BB1AAAA2AAAA3BB
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index 568e938..a828b1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -331,7 +331,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         }
         Table table = db.getTable(tableName);
         if (table == null) {
-            throw new AnalysisException("table: " + dbName + " not found.");
+            throw new AnalysisException("table: " + tableName + " not found.");
         }
         if (mergeType != LoadTask.MergeType.APPEND
                 && (table.getType() != Table.TableType.OLAP

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org