You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/22 14:51:25 UTC

[incubator-doris] branch dev-1.0.1 updated (727f08ca5b -> 25ee329f52)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


    from 727f08ca5b [fix](planner) unnecessary cast will be added on children in CaseExpr sometimes (#9600)
     new b6489024a0 [improvement](stream-load) adjust read unit of http to optimize stream load (#9154)
     new 131a0ac56e [fix](broker-scan-node) Remove trailing spaces in broker_scanner. Make it consistent with hive and trino behavior. (#9190)
     new 733fb3a92a [Feature] CTAS support insert data (#9271)
     new 2cc31df3c5 [fix] UT MathFunctionTest.round_test fix (#9447)
     new 571608dee5 [Enhancement]  improve parquet reader via arrow's prefetch and multi thread (#9472)
     new 8490c38906 [Bug][Vectorized] fix schema change add varchar type column default value get wrong result (#9523)
     new 8a375dcafb [deps] libhdfs3 build enable kerberos support (#9524)
     new 0e38fa4e9d [refactor](load) add tablet errors when close_wait return error (#9619)
     new b28b77a32f [FeConfig](Project) Project optimization is enabled by default (#9667)
     new e52e27ba66 [fix] NullPredicate should implement evaluate_vec (#9689)
     new c4c4364534 [config](checksum) Disable consistency checker by default (#9699)
     new 05bd5c82f9 [refactor](fe): remove unused code (#8986)
     new 25ee329f52 [improvement](planner) Backfill the original predicate pushdown code (#9703)

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/CMakeLists.txt                                  |  50 ++++-
 be/src/common/config.h                             |   3 +
 be/src/exec/broker_scanner.cpp                     |  29 ++-
 be/src/exec/parquet_reader.cpp                     | 137 ++++++++----
 be/src/exec/parquet_reader.h                       |  23 +-
 be/src/http/action/stream_load.cpp                 |   4 +-
 be/src/olap/column_predicate.h                     |   5 +-
 be/src/olap/delta_writer.cpp                       |  21 +-
 be/src/olap/delta_writer.h                         |   6 +-
 be/src/olap/null_predicate.cpp                     |  12 +
 be/src/olap/null_predicate.h                       |   2 +
 be/src/olap/rowset/segment_v2/column_reader.cpp    |  14 +-
 be/src/olap/rowset/segment_v2/column_reader.h      |   2 +-
 be/src/runtime/fragment_mgr.cpp                    |   2 +-
 be/src/runtime/runtime_state.h                     |   4 +
 be/src/runtime/stream_load/stream_load_pipe.h      |   6 +-
 be/src/runtime/tablets_channel.cpp                 |  21 +-
 be/src/runtime/tablets_channel.h                   |   6 +-
 be/src/vec/functions/function_binary_arithmetic.h  |  30 ++-
 be/test/olap/delta_writer_test.cpp                 | 246 ++++++++++++++++++++-
 build.sh                                           |   6 +-
 docs/en/administrator-guide/config/fe_config.md    |  10 +-
 docs/zh-CN/administrator-guide/config/fe_config.md |  10 +-
 .../java/org/apache/doris/analysis/Analyzer.java   |  39 +++-
 .../doris/analysis/CreateTableAsSelectStmt.java    |  44 ++--
 .../java/org/apache/doris/catalog/Catalog.java     |   2 +-
 .../main/java/org/apache/doris/common/Config.java  |   7 +-
 .../org/apache/doris/planner/AnalyticEvalNode.java |   3 -
 .../apache/doris/planner/PredicatePushDown.java    |  35 ++-
 .../apache/doris/planner/SingleNodePlanner.java    |   3 +
 .../main/java/org/apache/doris/qe/Coordinator.java |   3 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  16 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |  36 ++-
 .../org/apache/doris/rewrite/InferFiltersRule.java |   8 +
 .../apache/doris/rewrite/InferFiltersRuleTest.java |  23 +-
 gensrc/thrift/PaloInternalService.thrift           |   3 +
 run-be-ut.sh                                       |   1 +
 thirdparty/build-thirdparty.sh                     |  37 +++-
 thirdparty/download-thirdparty.sh                  |  12 +-
 thirdparty/patches/libevent.patch                  | 126 +++++++----
 thirdparty/patches/libgsasl-1.8.0.patch            |  24 ++
 thirdparty/vars.sh                                 |  20 +-
 42 files changed, 887 insertions(+), 204 deletions(-)
 create mode 100644 thirdparty/patches/libgsasl-1.8.0.patch


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 08/13: [refactor](load) add tablet errors when close_wait return error (#9619)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 0e38fa4e9d4b18746987560d84f8b44c59501fa0
Author: pengxiangyu <di...@163.com>
AuthorDate: Sun May 22 21:27:42 2022 +0800

    [refactor](load) add tablet errors when close_wait return error (#9619)
---
 be/src/olap/delta_writer.cpp       |  21 +---
 be/src/olap/delta_writer.h         |   6 +-
 be/src/runtime/tablets_channel.cpp |  21 +++-
 be/src/runtime/tablets_channel.h   |   6 +-
 be/test/olap/delta_writer_test.cpp | 246 ++++++++++++++++++++++++++++++++++++-
 5 files changed, 271 insertions(+), 29 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 198019561f..58b89e5d64 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -279,9 +279,7 @@ OLAPStatus DeltaWriter::close() {
     return OLAP_SUCCESS;
 }
 
-OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
-                               google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
-                               bool is_broken) {
+OLAPStatus DeltaWriter::close_wait() {
     std::lock_guard<std::mutex> l(_lock);
     DCHECK(_is_init)
             << "delta writer is supposed be to initialized before close_wait() being called";
@@ -291,14 +289,7 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
     }
 
     // return error if previous flush failed
-    OLAPStatus st = _flush_token->wait();
-    if (st != OLAP_SUCCESS) {
-        PTabletError* tablet_error = tablet_errors->Add();
-        tablet_error->set_tablet_id(_tablet->tablet_id());
-        tablet_error->set_msg("flush failed");
-        return st;
-    }
-    DCHECK_EQ(_mem_tracker->consumption(), 0);
+    RETURN_NOT_OK(_flush_token->wait());
 
     // use rowset meta manager to save meta
     _cur_rowset = _rowset_writer->build();
@@ -314,14 +305,6 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
         return res;
     }
 
-#ifndef BE_TEST
-    if (!is_broken) {
-        PTabletInfo* tablet_info = tablet_vec->Add();
-        tablet_info->set_tablet_id(_tablet->tablet_id());
-        tablet_info->set_schema_hash(_tablet->schema_hash());
-    }
-#endif
-
     _delta_written_success = true;
 
     const FlushStatistic& stat = _flush_token->get_stats();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index cf5a2729d2..70164e56c8 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -67,9 +67,7 @@ public:
     OLAPStatus close();
     // wait for all memtables to be flushed.
     // mem_consumption() should be 0 after this function returns.
-    OLAPStatus close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
-                      google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
-                      bool is_broken);
+    OLAPStatus close_wait();
 
     // abandon current memtable and wait for all pending-flushing memtables to be destructed.
     // mem_consumption() should be 0 after this function returns.
@@ -91,6 +89,8 @@ public:
 
     int64_t tablet_id() { return _tablet->tablet_id(); }
 
+    int32_t schema_hash() { return _tablet->schema_hash(); }
+
     int64_t save_mem_consumption_snapshot();
 
     int64_t get_mem_consumption_snapshot() const;
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index 926cba7aaa..19f342360c 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -198,9 +198,7 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
         for (auto writer : need_wait_writers) {
             // close may return failed, but no need to handle it here.
             // tablet_vec will only contains success tablet, and then let FE judge it.
-            writer->close_wait(
-                    tablet_vec, tablet_errors,
-                    (_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end()));
+            _close_wait(writer, tablet_vec, tablet_errors);
         }
         // TODO(gaodayue) clear and destruct all delta writers to make sure all memory are freed
         // DCHECK_EQ(_mem_tracker->consumption(), 0);
@@ -208,6 +206,23 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
     return Status::OK();
 }
 
+void TabletsChannel::_close_wait(DeltaWriter* writer,
+                                 google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
+                                 google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors) {
+    OLAPStatus st = writer->close_wait();
+    if (st != OLAP_SUCCESS) {
+        if (_broken_tablets.find(writer->tablet_id()) == _broken_tablets.end()) {
+            PTabletInfo* tablet_info = tablet_vec->Add();
+            tablet_info->set_tablet_id(writer->tablet_id());
+            tablet_info->set_schema_hash(writer->schema_hash());
+        }
+    } else {
+        PTabletError* tablet_error = tablet_errors->Add();
+        tablet_error->set_tablet_id(writer->tablet_id());
+        tablet_error->set_msg("close wait failed: " + st);
+    }
+}
+
 Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kFinished) {
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 360242ae88..cd5ee67867 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -89,7 +89,11 @@ private:
     // open all writer
     Status _open_all_writers(const PTabletWriterOpenRequest& request);
 
-private:
+    // deal with DeltaWriter close_wait(), add tablet to list for return.
+    void _close_wait(DeltaWriter* writer,
+                     google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
+                     google::protobuf::RepeatedPtrField<PTabletError>* tablet_error);
+
     // id of this load channel
     TabletsChannelKey _key;
 
diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp
index 1e2cab2e1e..e51dbe9605 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -371,9 +371,18 @@ TEST_F(TestDeltaWriter, open) {
     DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer);
     ASSERT_NE(delta_writer, nullptr);
     res = delta_writer->close();
-    ASSERT_EQ(OLAP_SUCCESS, res);
-    res = delta_writer->close_wait(nullptr, false);
-    ASSERT_EQ(OLAP_SUCCESS, res);
+    EXPECT_EQ(OLAP_SUCCESS, res);
+    res = delta_writer->close_wait();
+    EXPECT_EQ(OLAP_SUCCESS, res);
+    SAFE_DELETE(delta_writer);
+
+    // test vec delta writer
+    DeltaWriter::open(&write_req, &delta_writer, true);
+    EXPECT_NE(delta_writer, nullptr);
+    res = delta_writer->close();
+    EXPECT_EQ(OLAP_SUCCESS, res);
+    res = delta_writer->close_wait();
+    EXPECT_EQ(OLAP_SUCCESS, res);
     SAFE_DELETE(delta_writer);
 
     TDropTabletReq drop_request;
@@ -470,9 +479,149 @@ TEST_F(TestDeltaWriter, write) {
     }
 
     res = delta_writer->close();
+<<<<<<< HEAD
     ASSERT_EQ(OLAP_SUCCESS, res);
     res = delta_writer->close_wait(nullptr, false);
     ASSERT_EQ(OLAP_SUCCESS, res);
+=======
+    EXPECT_EQ(Status::OK(), res);
+    res = delta_writer->close_wait();
+    EXPECT_EQ(Status::OK(), res);
+
+    // publish version success
+    TabletSharedPtr tablet =
+            k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash);
+    OlapMeta* meta = tablet->data_dir()->get_meta();
+    Version version;
+    version.first = tablet->rowset_with_max_version()->end_version() + 1;
+    version.second = tablet->rowset_with_max_version()->end_version() + 1;
+    std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
+    StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
+            write_req.txn_id, write_req.partition_id, &tablet_related_rs);
+    for (auto& tablet_rs : tablet_related_rs) {
+        RowsetSharedPtr rowset = tablet_rs.second;
+        res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
+                                                   write_req.tablet_id, write_req.schema_hash,
+                                                   tablet_rs.first.tablet_uid, version);
+        EXPECT_EQ(Status::OK(), res);
+        res = tablet->add_inc_rowset(rowset);
+        EXPECT_EQ(Status::OK(), res);
+    }
+    EXPECT_EQ(1, tablet->num_rows());
+
+    auto tablet_id = 10003;
+    auto schema_hash = 270068375;
+    res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
+    EXPECT_EQ(Status::OK(), res);
+    delete delta_writer;
+}
+
+TEST_F(TestDeltaWriter, vec_write) {
+    TCreateTabletReq request;
+    create_tablet_request(10004, 270068376, &request);
+    Status res = k_engine->create_tablet(request);
+    ASSERT_TRUE(res.ok());
+
+    TDescriptorTable tdesc_tbl = create_descriptor_tablet();
+    ObjectPool obj_pool;
+    DescriptorTbl* desc_tbl = nullptr;
+    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    //     const std::vector<SlotDescriptor*>& slots = tuple_desc->slots();
+
+    PUniqueId load_id;
+    load_id.set_hi(0);
+    load_id.set_lo(0);
+    WriteRequest write_req = {10004, 270068376, WriteType::LOAD, 20002,
+                              30002, load_id,   tuple_desc,      &(tuple_desc->slots())};
+    DeltaWriter* delta_writer = nullptr;
+    DeltaWriter::open(&write_req, &delta_writer, true);
+    ASSERT_NE(delta_writer, nullptr);
+
+    auto tracker = std::make_shared<MemTracker>();
+    MemPool pool(tracker.get());
+
+    vectorized::Block block;
+    for (const auto& slot_desc : tuple_desc->slots()) {
+        block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+                                                       slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+
+    auto columns = block.mutate_columns();
+    {
+        int8_t k1 = -127;
+        columns[0]->insert_data((const char*)&k1, sizeof(k1));
+
+        int16_t k2 = -32767;
+        columns[1]->insert_data((const char*)&k2, sizeof(k2));
+
+        int32_t k3 = -2147483647;
+        columns[2]->insert_data((const char*)&k3, sizeof(k3));
+
+        int64_t k4 = -9223372036854775807L;
+        columns[3]->insert_data((const char*)&k4, sizeof(k4));
+
+        int128_t k5 = -90000;
+        columns[4]->insert_data((const char*)&k5, sizeof(k5));
+
+        DateTimeValue k6;
+        k6.from_date_str("2048-11-10", 10);
+        auto k6_int = k6.to_int64();
+        columns[5]->insert_data((const char*)&k6_int, sizeof(k6_int));
+
+        DateTimeValue k7;
+        k7.from_date_str("2636-08-16 19:39:43", 19);
+        auto k7_int = k7.to_int64();
+        columns[6]->insert_data((const char*)&k7_int, sizeof(k7_int));
+
+        columns[7]->insert_data("abcd", 4);
+        columns[8]->insert_data("abcde", 5);
+
+        DecimalV2Value decimal_value;
+        decimal_value.assign_from_double(1.1);
+        columns[9]->insert_data((const char*)&decimal_value, sizeof(decimal_value));
+
+        int8_t v1 = -127;
+        columns[10]->insert_data((const char*)&v1, sizeof(v1));
+
+        int16_t v2 = -32767;
+        columns[11]->insert_data((const char*)&v2, sizeof(v2));
+
+        int32_t v3 = -2147483647;
+        columns[12]->insert_data((const char*)&v3, sizeof(v3));
+
+        int64_t v4 = -9223372036854775807L;
+        columns[13]->insert_data((const char*)&v4, sizeof(v4));
+
+        int128_t v5 = -90000;
+        columns[14]->insert_data((const char*)&v5, sizeof(v5));
+
+        DateTimeValue v6;
+        v6.from_date_str("2048-11-10", 10);
+        auto v6_int = v6.to_int64();
+        columns[15]->insert_data((const char*)&v6_int, sizeof(v6_int));
+
+        DateTimeValue v7;
+        v7.from_date_str("2636-08-16 19:39:43", 19);
+        auto v7_int = v7.to_int64();
+        columns[16]->insert_data((const char*)&v7_int, sizeof(v7_int));
+
+        columns[17]->insert_data("abcd", 4);
+        columns[18]->insert_data("abcde", 5);
+
+        decimal_value.assign_from_double(1.1);
+        columns[19]->insert_data((const char*)&decimal_value, sizeof(decimal_value));
+
+        res = delta_writer->write(&block, {0});
+        ASSERT_TRUE(res.ok());
+    }
+
+    res = delta_writer->close();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->close_wait();
+    ASSERT_TRUE(res.ok());
+>>>>>>> 75b3707a2 ([refactor](load) add tablet errors when close_wait return error (#9619))
 
     // publish version success
     TabletSharedPtr tablet =
@@ -548,9 +697,100 @@ TEST_F(TestDeltaWriter, sequence_col) {
     }
 
     res = delta_writer->close();
+<<<<<<< HEAD
     ASSERT_EQ(OLAP_SUCCESS, res);
     res = delta_writer->close_wait(nullptr, false);
     ASSERT_EQ(OLAP_SUCCESS, res);
+=======
+    EXPECT_EQ(Status::OK(), res);
+    res = delta_writer->close_wait();
+    EXPECT_EQ(Status::OK(), res);
+
+    // publish version success
+    TabletSharedPtr tablet =
+            k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash);
+    OlapMeta* meta = tablet->data_dir()->get_meta();
+    Version version;
+    version.first = tablet->rowset_with_max_version()->end_version() + 1;
+    version.second = tablet->rowset_with_max_version()->end_version() + 1;
+    std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
+    StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
+            write_req.txn_id, write_req.partition_id, &tablet_related_rs);
+    for (auto& tablet_rs : tablet_related_rs) {
+        RowsetSharedPtr rowset = tablet_rs.second;
+        res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
+                                                   write_req.tablet_id, write_req.schema_hash,
+                                                   tablet_rs.first.tablet_uid, version);
+        EXPECT_EQ(Status::OK(), res);
+        res = tablet->add_inc_rowset(rowset);
+        EXPECT_EQ(Status::OK(), res);
+    }
+    EXPECT_EQ(1, tablet->num_rows());
+
+    auto tablet_id = 10005;
+    auto schema_hash = 270068377;
+    res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
+    EXPECT_EQ(Status::OK(), res);
+    delete delta_writer;
+}
+
+TEST_F(TestDeltaWriter, vec_sequence_col) {
+    TCreateTabletReq request;
+    sleep(20);
+    create_tablet_request_with_sequence_col(10005, 270068377, &request);
+    Status res = k_engine->create_tablet(request);
+    ASSERT_TRUE(res.ok());
+
+    TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col();
+    ObjectPool obj_pool;
+    DescriptorTbl* desc_tbl = nullptr;
+    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+
+    PUniqueId load_id;
+    load_id.set_hi(0);
+    load_id.set_lo(0);
+    WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003,
+                              30003, load_id,   tuple_desc,      &(tuple_desc->slots())};
+    DeltaWriter* delta_writer = nullptr;
+    DeltaWriter::open(&write_req, &delta_writer, true);
+    ASSERT_NE(delta_writer, nullptr);
+
+    MemTracker tracker;
+    MemPool pool(&tracker);
+
+    vectorized::Block block;
+    for (const auto& slot_desc : tuple_desc->slots()) {
+        block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+                                                       slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+
+    auto columns = block.mutate_columns();
+    {
+        int8_t c1 = 123;
+        columns[0]->insert_data((const char*)&c1, sizeof(c1));
+
+        int16_t c2 = 456;
+        columns[1]->insert_data((const char*)&c2, sizeof(c2));
+
+        int32_t c3 = 1;
+        columns[2]->insert_data((const char*)&c3, sizeof(c2));
+
+        DateTimeValue c4;
+        c4.from_date_str("2020-07-16 19:39:43", 19);
+        int64_t c4_int = c4.to_int64();
+        columns[3]->insert_data((const char*)&c4_int, sizeof(c4));
+
+        res = delta_writer->write(&block, {0});
+        ASSERT_TRUE(res.ok());
+    }
+
+    res = delta_writer->close();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->close_wait();
+    ASSERT_TRUE(res.ok());
+>>>>>>> 75b3707a2 ([refactor](load) add tablet errors when close_wait return error (#9619))
 
     // publish version success
     TabletSharedPtr tablet =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 06/13: [Bug][Vectorized] fix schema change add varchar type column default value get wrong result (#9523)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 8490c3890662ff9d108533446138cec2e4050857
Author: Pxl <px...@qq.com>
AuthorDate: Thu May 19 23:38:57 2022 +0800

    [Bug][Vectorized] fix schema change add varchar type column default value get wrong result (#9523)
---
 be/src/olap/rowset/segment_v2/column_reader.cpp | 14 +++++++++++---
 be/src/olap/rowset/segment_v2/column_reader.h   |  2 +-
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp
index 982de671d7..63c4f0d36f 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -346,10 +346,10 @@ Status ColumnReader::new_iterator(ColumnIterator** iterator) {
         auto type = (FieldType)_meta.type();
         switch (type) {
         case FieldType::OLAP_FIELD_TYPE_ARRAY: {
-            ColumnIterator* item_iterator;
+            ColumnIterator* item_iterator = nullptr;
             RETURN_IF_ERROR(_sub_readers[0]->new_iterator(&item_iterator));
 
-            ColumnIterator* offset_iterator;
+            ColumnIterator* offset_iterator = nullptr;
             RETURN_IF_ERROR(_sub_readers[1]->new_iterator(&offset_iterator));
 
             ColumnIterator* null_iterator = nullptr;
@@ -485,7 +485,7 @@ Status FileColumnIterator::seek_to_page_start() {
     return seek_to_ordinal(_page.first_ordinal);
 }
 
-void FileColumnIterator::_seek_to_pos_in_page(ParsedPage* page, ordinal_t offset_in_page) {
+void FileColumnIterator::_seek_to_pos_in_page(ParsedPage* page, ordinal_t offset_in_page) const {
     if (page->offset_in_page == offset_in_page) {
         // fast path, do nothing
         return;
@@ -819,6 +819,14 @@ void DefaultValueColumnIterator::insert_default_data(vectorized::MutableColumnPt
             insert_column_data();
             break;
         }
+        case OLAP_FIELD_TYPE_STRING:
+        case OLAP_FIELD_TYPE_VARCHAR:
+        case OLAP_FIELD_TYPE_CHAR: {
+            data_ptr = ((Slice*)_mem_value)->data;
+            data_len = ((Slice*)_mem_value)->size;
+            insert_column_data();
+            break;
+        }
         default: {
             data_ptr = (char *) _mem_value;
             data_len = _type_size;
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h
index 99206bd53a..76c41ae991 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -278,7 +278,7 @@ public:
     bool is_nullable() { return _reader->is_nullable(); }
 
 private:
-    void _seek_to_pos_in_page(ParsedPage* page, ordinal_t offset_in_page);
+    void _seek_to_pos_in_page(ParsedPage* page, ordinal_t offset_in_page) const;
     Status _load_next_page(bool* eos);
     Status _read_data_page(const OrdinalPageIndexIterator& iter);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 12/13: [refactor](fe): remove unused code (#8986)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 05bd5c82f9dc9ef092610e5bd3507d263111ef09
Author: jakevin <30...@users.noreply.github.com>
AuthorDate: Thu Apr 14 11:44:21 2022 +0800

    [refactor](fe): remove unused code (#8986)
---
 .../org/apache/doris/planner/AnalyticEvalNode.java |   3 -
 .../apache/doris/planner/PredicatePushDown.java    | 122 ---------------------
 2 files changed, 125 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
index 239a1ce112..3e53db5a2d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
@@ -96,9 +96,6 @@ public class AnalyticEvalNode extends PlanNode {
         nullableTupleIds = Sets.newHashSet(input.getNullableTupleIds());
     }
 
-    public boolean isBlockingNode() {
-        return true;
-    }
     public List<Expr> getPartitionExprs() {
         return partitionExprs;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java
deleted file mode 100644
index 28abd58f52..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java
+++ /dev/null
@@ -1,122 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner;
-
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.BinaryPredicate;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.InPredicate;
-import org.apache.doris.analysis.JoinOperator;
-import org.apache.doris.analysis.Predicate;
-import org.apache.doris.analysis.SlotRef;
-import org.apache.doris.analysis.TupleId;
-
-import org.apache.directory.api.util.Strings;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-
-public class PredicatePushDown {
-    private final static Logger LOG = LogManager.getLogger(PredicatePushDown.class);
-
-    public static PlanNode visitScanNode(ScanNode scanNode, JoinOperator joinOp, Analyzer analyzer) {
-        switch (joinOp) {
-            case INNER_JOIN:
-            case LEFT_OUTER_JOIN:
-                predicateFromLeftSidePropagatesToRightSide(scanNode, analyzer);
-                break;
-            // TODO
-            default:
-                break;
-        }
-        return scanNode;
-    }
-
-    private static void predicateFromLeftSidePropagatesToRightSide(ScanNode scanNode, Analyzer analyzer) {
-        List<TupleId> tupleIdList = scanNode.getTupleIds();
-        if (tupleIdList.size() != 1) {
-            LOG.info("The predicate pushdown is not reflected "
-                            + "because the scan node involves more then one tuple:{}",
-                    Strings.listToString(tupleIdList));
-            return;
-        }
-        TupleId rightSideTuple = tupleIdList.get(0);
-        List<Expr> unassignedRightSideConjuncts = analyzer.getUnassignedConjuncts(scanNode);
-        List<Expr> eqJoinPredicates = analyzer.getEqJoinConjuncts(rightSideTuple);
-        if (eqJoinPredicates != null) {
-            List<Expr> allConjuncts = analyzer.getConjuncts(analyzer.getAllTupleIds());
-            allConjuncts.removeAll(unassignedRightSideConjuncts);
-            for (Expr conjunct : allConjuncts) {
-                if (!Predicate.canPushDownPredicate(conjunct)) {
-                    continue;
-                }
-                for (Expr eqJoinPredicate : eqJoinPredicates) {
-                    // we can ensure slot is left node, because NormalizeBinaryPredicatesRule
-                    SlotRef otherSlot = conjunct.getChild(0).unwrapSlotRef();
-
-                    // ensure the children for eqJoinPredicate both be SlotRef
-                    if (eqJoinPredicate.getChild(0).unwrapSlotRef() == null
-                            || eqJoinPredicate.getChild(1).unwrapSlotRef() == null) {
-                        continue;
-                    }
-
-                    SlotRef leftSlot = eqJoinPredicate.getChild(0).unwrapSlotRef();
-                    SlotRef rightSlot = eqJoinPredicate.getChild(1).unwrapSlotRef();
-                    // ensure the type is match
-                    if (!leftSlot.getDesc().getType().matchesType(rightSlot.getDesc().getType())) {
-                        continue;
-                    }
-
-                    // example: t1.id = t2.id and t1.id = 1  => t2.id =1
-                    if (otherSlot.isBound(leftSlot.getSlotId())
-                            && rightSlot.isBound(rightSideTuple)) {
-                        Expr pushDownConjunct = rewritePredicate(analyzer, conjunct, rightSlot);
-                        LOG.debug("pushDownConjunct: {}", pushDownConjunct);
-                        scanNode.addConjunct(pushDownConjunct);
-                    } else if (otherSlot.isBound(rightSlot.getSlotId())
-                            && leftSlot.isBound(rightSideTuple)) {
-                        Expr pushDownConjunct = rewritePredicate(analyzer, conjunct, leftSlot);
-                        LOG.debug("pushDownConjunct: {}", pushDownConjunct);
-                        scanNode.addConjunct(pushDownConjunct);
-                    }
-                }
-            }
-        }
-    }
-
-    // Rewrite the oldPredicate with new leftChild
-    // For example: oldPredicate is t1.id = 1, leftChild is t2.id, will return t2.id = 1
-    private static Expr rewritePredicate(Analyzer analyzer, Expr oldPredicate, Expr leftChild) {
-        if (oldPredicate instanceof BinaryPredicate) {
-            BinaryPredicate oldBP = (BinaryPredicate) oldPredicate;
-            BinaryPredicate bp = new BinaryPredicate(oldBP.getOp(), leftChild, oldBP.getChild(1));
-            bp.analyzeNoThrow(analyzer);
-            return bp;
-        }
-
-        if (oldPredicate instanceof InPredicate) {
-            InPredicate oldIP = (InPredicate) oldPredicate;
-            InPredicate ip = new InPredicate(leftChild, oldIP.getListChildren(), oldIP.isNotIn());
-            ip.analyzeNoThrow(analyzer);
-            return ip;
-        }
-
-        return oldPredicate;
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 03/13: [Feature] CTAS support insert data (#9271)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 733fb3a92af8b0180e8e52f49d7a390479c608d7
Author: Stalary <45...@qq.com>
AuthorDate: Sat May 7 08:51:54 2022 +0800

    [Feature] CTAS support insert data (#9271)
---
 .../doris/analysis/CreateTableAsSelectStmt.java    | 44 +++++++++++-----------
 .../java/org/apache/doris/catalog/Catalog.java     |  2 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     | 36 +++++++++++++++++-
 3 files changed, 58 insertions(+), 24 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java
index a5d17632f6..235e497e80 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java
@@ -21,28 +21,42 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
 
+import lombok.Getter;
+
 import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Represents a CREATE TABLE AS SELECT (CTAS) statement
- *  Syntax:
- *      CREATE TABLE table_name [( column_name_list )]
- *          opt_engine opt_partition opt_properties KW_AS query_stmt
+ * Represents a CREATE TABLE AS SELECT (CTAS) statement.
+ * Syntax:
+ * CREATE TABLE table_name [( column_name_list )]
+ * opt_engine opt_partition opt_properties KW_AS query_stmt
  */
 public class CreateTableAsSelectStmt extends DdlStmt {
+
+    @Getter
     private final CreateTableStmt createTableStmt;
+
+    @Getter
     private final List<String> columnNames;
+
+    @Getter
     private QueryStmt queryStmt;
-    
-    public CreateTableAsSelectStmt(CreateTableStmt createTableStmt,
-                                   List<String> columnNames, QueryStmt queryStmt) {
+
+    @Getter
+    private final InsertStmt insertStmt;
+
+    protected CreateTableAsSelectStmt(CreateTableStmt createTableStmt,
+                                      List<String> columnNames, QueryStmt queryStmt) {
         this.createTableStmt = createTableStmt;
         this.columnNames = columnNames;
         this.queryStmt = queryStmt;
-        // Insert is not currently supported
+        this.insertStmt = new InsertStmt(createTableStmt.getDbTbl(), queryStmt.clone());
     }
-    
+
+    /**
+     * Cannot analyze insertStmt because the table has not been created yet.
+     */
     @Override
     public void analyze(Analyzer analyzer) throws UserException {
         // first: we analyze queryStmt before create table.
@@ -63,16 +77,4 @@ public class CreateTableAsSelectStmt extends DdlStmt {
             ErrorReport.reportAnalysisException(ErrorCode.ERR_COL_NUMBER_NOT_MATCH);
         }
     }
-    
-    public CreateTableStmt getCreateTableStmt() {
-        return createTableStmt;
-    }
-    
-    public List<String> getColumnNames() {
-        return columnNames;
-    }
-    
-    public QueryStmt getQueryStmt() {
-        return queryStmt;
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 3bb59eac55..ffded947bf 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -3146,7 +3146,7 @@ public class Catalog {
             createTableStmt.analyze(dummyRootAnalyzer);
             createTable(createTableStmt);
         } catch (UserException e) {
-            throw new DdlException("Failed to execute CREATE TABLE AS SELECT Reason: " + e.getMessage());
+            throw new DdlException("Failed to execute CTAS Reason: " + e.getMessage());
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index e913f9cc96..6ba7364f1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -20,6 +20,7 @@ package org.apache.doris.qe;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.CreateTableAsSelectStmt;
 import org.apache.doris.analysis.DdlStmt;
+import org.apache.doris.analysis.DropTableStmt;
 import org.apache.doris.analysis.EnterStmt;
 import org.apache.doris.analysis.ExplainOptions;
 import org.apache.doris.analysis.ExportStmt;
@@ -414,6 +415,8 @@ public class StmtExecutor implements ProfileWriter {
                 handleUseStmt();
             } else if (parsedStmt instanceof TransactionStmt) {
                 handleTransactionStmt();
+            } else if (parsedStmt instanceof CreateTableAsSelectStmt) {
+                handleCtasStmt();
             } else if (parsedStmt instanceof InsertStmt) { // Must ahead of DdlStmt because InserStmt is its subclass
                 try {
                     handleInsertStmt();
@@ -1237,7 +1240,6 @@ public class StmtExecutor implements ProfileWriter {
         context.getMysqlChannel().reset();
         // create plan
         InsertStmt insertStmt = (InsertStmt) parsedStmt;
-
         if (insertStmt.getQueryStmt().hasOutFileClause()) {
             throw new DdlException("Not support OUTFILE clause in INSERT statement");
         }
@@ -1558,6 +1560,37 @@ public class StmtExecutor implements ProfileWriter {
         context.getCatalog().getExportMgr().addExportJob(exportStmt);
     }
 
+    private void handleCtasStmt() {
+        CreateTableAsSelectStmt ctasStmt = (CreateTableAsSelectStmt) this.parsedStmt;
+        try {
+            // create table
+            DdlExecutor.execute(context.getCatalog(), ctasStmt);
+            context.getState().setOk();
+        }  catch (Exception e) {
+            // Maybe our bug
+            LOG.warn("CTAS create table error, stmt={}", originStmt.originStmt, e);
+            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
+        }
+        // after success create table insert data
+        if (MysqlStateType.OK.equals(context.getState().getStateType())) {
+            try {
+                parsedStmt = ctasStmt.getInsertStmt();
+                execute();
+            } catch (Exception e) {
+                LOG.warn("CTAS insert data error, stmt={}", parsedStmt.toSql(), e);
+                // insert error drop table
+                DropTableStmt dropTableStmt = new DropTableStmt(true, ctasStmt.getCreateTableStmt().getDbTbl(), true);
+                try {
+                    DdlExecutor.execute(context.getCatalog(), dropTableStmt);
+                } catch (Exception ex) {
+                    LOG.warn("CTAS drop table error, stmt={}", parsedStmt.toSql(), ex);
+                    context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
+                            "Unexpected exception: " + ex.getMessage());
+                }
+            }
+        }
+    }
+
     public Data.PQueryStatistics getQueryStatisticsForAuditLog() {
         if (statisticsForAuditLog == null) {
             statisticsForAuditLog = Data.PQueryStatistics.newBuilder();
@@ -1581,4 +1614,3 @@ public class StmtExecutor implements ProfileWriter {
         return exprs.stream().map(e -> e.getType().getPrimitiveType()).collect(Collectors.toList());
     }
 }
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 10/13: [fix] NullPredicate should implement evaluate_vec (#9689)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit e52e27ba6636640931abf97b69cc3d646e642795
Author: Yongqiang YANG <98...@users.noreply.github.com>
AuthorDate: Sun May 22 21:29:53 2022 +0800

    [fix] NullPredicate should implement evaluate_vec (#9689)
    
    select column from table where column is null
---
 be/src/olap/column_predicate.h |  5 ++++-
 be/src/olap/null_predicate.cpp | 12 ++++++++++++
 be/src/olap/null_predicate.h   |  2 ++
 3 files changed, 18 insertions(+), 1 deletion(-)

diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h
index 56f817d9f3..bb5f468303 100644
--- a/be/src/olap/column_predicate.h
+++ b/be/src/olap/column_predicate.h
@@ -83,7 +83,10 @@ public:
     // used to evaluate pre read column in lazy matertialization
     // now only support integer/float
     // a vectorized eval way
-    virtual void evaluate_vec(vectorized::IColumn& column, uint16_t size, bool* flags) const {};
+    virtual void evaluate_vec(vectorized::IColumn& column, uint16_t size, bool* flags) const {
+        DCHECK(false) << "should not reach here";
+    }
+
     uint32_t column_id() const { return _column_id; }
 
 protected:
diff --git a/be/src/olap/null_predicate.cpp b/be/src/olap/null_predicate.cpp
index 43cfbcaab3..631eb67a86 100644
--- a/be/src/olap/null_predicate.cpp
+++ b/be/src/olap/null_predicate.cpp
@@ -165,4 +165,16 @@ void NullPredicate::evaluate_and(IColumn& column, uint16_t* sel, uint16_t size,
         if (_is_null) memset(flags, false, size);
     }
 }
+
+void NullPredicate::evaluate_vec(vectorized::IColumn& column, uint16_t size, bool* flags) const {
+    if (auto* nullable = check_and_get_column<ColumnNullable>(column)) {
+        auto& null_map = nullable->get_null_map_data();
+        for (uint16_t i = 0; i < size; ++i) {
+            flags[i] = (null_map[i] == _is_null);
+        }
+    } else {
+        if (_is_null) memset(flags, false, size);
+    }
+}
+
 } //namespace doris
diff --git a/be/src/olap/null_predicate.h b/be/src/olap/null_predicate.h
index 7b90ffbf92..f7e92d9fe4 100644
--- a/be/src/olap/null_predicate.h
+++ b/be/src/olap/null_predicate.h
@@ -53,6 +53,8 @@ public:
     void evaluate_and(vectorized::IColumn& column, uint16_t* sel, uint16_t size,
                       bool* flags) const override;
 
+    void evaluate_vec(vectorized::IColumn& column, uint16_t size, bool* flags) const override;
+
 private:
     bool _is_null; //true for null, false for not null
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 02/13: [fix](broker-scan-node) Remove trailing spaces in broker_scanner. Make it consistent with hive and trino behavior. (#9190)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 131a0ac56e5cb4c06122b99be63021199f9a72e4
Author: Jibing-Li <64...@users.noreply.github.com>
AuthorDate: Fri May 20 09:55:13 2022 +0800

    [fix](broker-scan-node) Remove trailing spaces in broker_scanner. Make it consistent with hive and trino behavior. (#9190)
    
    Hive and trino/presto would automatically trim the trailing spaces but Doris doesn't.
    This would cause different query result with hive.
    
    Add a new session variable "trim_tailing_spaces_for_external_table_query".
    If set to true, when reading csv from broker scan node, it will trim the tailing space of the column
---
 be/src/exec/broker_scanner.cpp                     | 29 ++++++++++++++++------
 be/src/runtime/runtime_state.h                     |  4 +++
 .../java/org/apache/doris/qe/SessionVariable.java  | 14 +++++++++++
 gensrc/thrift/PaloInternalService.thrift           |  3 +++
 4 files changed, 43 insertions(+), 7 deletions(-)

diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index 83b0794aa3..aad3f5deef 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -337,19 +337,20 @@ void BrokerScanner::split_line(const Slice& line) {
         delete[] ptr;
     } else {
         const char* value = line.data;
-        size_t start = 0;  // point to the start pos of next col value.
-        size_t curpos = 0; // point to the start pos of separator matching sequence.
-        size_t p1 = 0;     // point to the current pos of separator matching sequence.
+        size_t start = 0;     // point to the start pos of next col value.
+        size_t curpos = 0;    // point to the start pos of separator matching sequence.
+        size_t p1 = 0;        // point to the current pos of separator matching sequence.
+        size_t non_space = 0; // point to the last pos of non_space charactor.
 
         // Separator: AAAA
         //
-        //   curpos
+        //    p1
         //     ▼
         //     AAAA
         //   1000AAAA2000AAAA
         //   ▲   ▲
         // Start │
-        //       p1
+        //     curpos
 
         while (curpos < line.size) {
             if (*(value + curpos + p1) != _value_separator[p1]) {
@@ -360,16 +361,30 @@ void BrokerScanner::split_line(const Slice& line) {
                 p1++;
                 if (p1 == _value_separator_length) {
                     // Match a separator
-                    _split_values.emplace_back(value + start, curpos - start);
+                    non_space = curpos;
+                    // Trim tailing spaces. Be consistent with hive and trino's behavior.
+                    if (_state->trim_tailing_spaces_for_external_table_query()) {
+                        while (non_space > start && *(value + non_space - 1) == ' ') {
+                            non_space--;
+                        }
+                    }
+                    _split_values.emplace_back(value + start, non_space - start);
                     start = curpos + _value_separator_length;
                     curpos = start;
                     p1 = 0;
+                    non_space = 0;
                 }
             }
         }
 
         CHECK(curpos == line.size) << curpos << " vs " << line.size;
-        _split_values.emplace_back(value + start, curpos - start);
+        non_space = curpos;
+        if (_state->trim_tailing_spaces_for_external_table_query()) {
+            while (non_space > start && *(value + non_space - 1) == ' ') {
+                non_space--;
+            }
+        }
+        _split_values.emplace_back(value + start, non_space - start);
     }
 }
 
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 449b4c2a17..32f4bb97d4 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -349,6 +349,10 @@ public:
 
     bool enable_vectorized_exec() const { return _query_options.enable_vectorized_engine; }
 
+    bool trim_tailing_spaces_for_external_table_query() const {
+        return _query_options.trim_tailing_spaces_for_external_table_query;
+    }
+
     bool return_object_data_as_binary() const {
         return _query_options.return_object_data_as_binary;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index e2aa801283..a3ee7f5baf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -180,6 +180,8 @@ public class SessionVariable implements Serializable, Writable {
 
     public static final String ENABLE_PROJECTION = "enable_projection";
 
+    public static final String TRIM_TAILING_SPACES_FOR_EXTERNAL_TABLE_QUERY = "trim_tailing_spaces_for_external_table_query";
+
     // session origin value
     public Map<Field, String> sessionOriginValue = new HashMap<Field, String>();
     // check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@@ -439,6 +441,9 @@ public class SessionVariable implements Serializable, Writable {
     @VariableMgr.VarAttr(name = ENABLE_PROJECTION)
     private boolean enableProjection = false;
 
+    @VariableMgr.VarAttr(name = TRIM_TAILING_SPACES_FOR_EXTERNAL_TABLE_QUERY, needForward = true)
+    public boolean trimTailingSpacesForExternalTableQuery = false;
+
     public String getBlockEncryptionMode() {
         return blockEncryptionMode;
     }
@@ -895,6 +900,14 @@ public class SessionVariable implements Serializable, Writable {
         return enableProjection;
     }
 
+    public boolean isTrimTailingSpacesForExternalTableQuery() {
+        return trimTailingSpacesForExternalTableQuery;
+    }
+
+    public void setTrimTailingSpacesForExternalTableQuery(boolean trimTailingSpacesForExternalTableQuery) {
+        this.trimTailingSpacesForExternalTableQuery = trimTailingSpacesForExternalTableQuery;
+    }
+
     // Serialize to thrift object
     // used for rest api
     public TQueryOptions toThrift() {
@@ -912,6 +925,7 @@ public class SessionVariable implements Serializable, Writable {
         tResult.setCodegenLevel(codegenLevel);
         tResult.setEnableVectorizedEngine(enableVectorizedEngine);
         tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary);
+        tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery);
 
         tResult.setBatchSize(batchSize);
         tResult.setDisableStreamPreaggregations(disableStreamPreaggregations);
diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift
index 713487f53d..19ae35a64e 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -160,6 +160,9 @@ struct TQueryOptions {
   // show bitmap data in result, if use this in mysql cli may make the terminal
   // output corrupted character
   43: optional bool return_object_data_as_binary = false
+
+  // trim tailing spaces while querying external table and stream load
+  44: optional bool trim_tailing_spaces_for_external_table_query = false
 }
     
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 05/13: [Enhancement] improve parquet reader via arrow's prefetch and multi thread (#9472)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 571608dee59742811864ccc9bc161c6ae4fabd67
Author: Lightman <31...@users.noreply.github.com>
AuthorDate: Thu May 19 23:52:01 2022 +0800

    [Enhancement]  improve parquet reader via arrow's prefetch and multi thread (#9472)
    
    * add ArrowReaderProperties to parquet::arrow::FileReader
    
    * support perfecth batch
---
 be/src/common/config.h         |   3 +
 be/src/exec/parquet_reader.cpp | 137 ++++++++++++++++++++++++++++-------------
 be/src/exec/parquet_reader.h   |  23 ++++++-
 3 files changed, 117 insertions(+), 46 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index b37fbc048a..48582beabe 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -715,6 +715,9 @@ CONF_Validator(string_type_length_soft_limit_bytes,
 // is greater than object_pool_buffer_size, release the object in the unused_object_pool.
 CONF_Int32(object_pool_buffer_size, "100");
 
+// ParquetReaderWrap prefetch buffer size
+CONF_Int32(parquet_reader_max_buffer_size, "50");
+
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp
index ddb3531e17..61eb4d8e19 100644
--- a/be/src/exec/parquet_reader.cpp
+++ b/be/src/exec/parquet_reader.cpp
@@ -18,9 +18,15 @@
 
 #include <arrow/array.h>
 #include <arrow/status.h>
+#include <arrow/type_fwd.h>
 #include <time.h>
 
+#include <algorithm>
+#include <mutex>
+#include <thread>
+
 #include "common/logging.h"
+#include "common/status.h"
 #include "exec/file_reader.h"
 #include "gen_cpp/PaloBrokerService_types.h"
 #include "gen_cpp/TPaloBrokerService.h"
@@ -44,9 +50,6 @@ ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int32_t num_of_col
           _current_line_of_group(0),
           _current_line_of_batch(0) {
     _parquet = std::shared_ptr<ParquetFile>(new ParquetFile(file_reader));
-    _properties = parquet::ReaderProperties();
-    _properties.enable_buffered_stream();
-    _properties.set_buffer_size(65535);
 }
 
 ParquetReaderWrap::~ParquetReaderWrap() {
@@ -55,10 +58,23 @@ ParquetReaderWrap::~ParquetReaderWrap() {
 Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
                                               const std::string& timezone) {
     try {
-        // new file reader for parquet file
-        auto st = parquet::arrow::FileReader::Make(
-                arrow::default_memory_pool(),
-                parquet::ParquetFileReader::Open(_parquet, _properties), &_reader);
+        parquet::ArrowReaderProperties arrow_reader_properties =
+                parquet::default_arrow_reader_properties();
+        arrow_reader_properties.set_pre_buffer(true);
+        arrow_reader_properties.set_use_threads(true);
+        // Open Parquet file reader
+        auto reader_builder = parquet::arrow::FileReaderBuilder();
+        reader_builder.properties(arrow_reader_properties);
+
+        auto st = reader_builder.Open(_parquet);
+
+        if (!st.ok()) {
+            LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString();
+            return Status::InternalError("Failed to create file reader");
+        }
+
+        st = reader_builder.Build(&_reader);
+
         if (!st.ok()) {
             LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString();
             return Status::InternalError("Failed to create file reader");
@@ -85,31 +101,23 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
 
         _timezone = timezone;
 
-        if (_current_line_of_group == 0) { // the first read
-            RETURN_IF_ERROR(column_indices(tuple_slot_descs));
-            // read batch
-            arrow::Status status = _reader->GetRecordBatchReader({_current_group},
-                                                                 _parquet_column_ids, &_rb_batch);
-            if (!status.ok()) {
-                LOG(WARNING) << "Get RecordBatch Failed. " << status.ToString();
-                return Status::InternalError(status.ToString());
-            }
-            status = _rb_batch->ReadNext(&_batch);
-            if (!status.ok()) {
-                LOG(WARNING) << "The first read record. " << status.ToString();
-                return Status::InternalError(status.ToString());
-            }
-            _current_line_of_batch = 0;
-            //save column type
-            std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
-            for (int i = 0; i < _parquet_column_ids.size(); i++) {
-                std::shared_ptr<arrow::Field> field = field_schema->field(i);
-                if (!field) {
-                    LOG(WARNING) << "Get field schema failed. Column order:" << i;
-                    return Status::InternalError(status.ToString());
-                }
-                _parquet_column_type.emplace_back(field->type()->id());
+        RETURN_IF_ERROR(column_indices(tuple_slot_descs));
+
+        std::thread thread(&ParquetReaderWrap::prefetch_batch, this);
+        thread.detach();
+
+        // read batch
+        RETURN_IF_ERROR(read_next_batch());
+        _current_line_of_batch = 0;
+        //save column type
+        std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
+        for (int i = 0; i < _parquet_column_ids.size(); i++) {
+            std::shared_ptr<arrow::Field> field = field_schema->field(i);
+            if (!field) {
+                LOG(WARNING) << "Get field schema failed. Column order:" << i;
+                return Status::InternalError(_status.ToString());
             }
+            _parquet_column_type.emplace_back(field->type()->id());
         }
         return Status::OK();
     } catch (parquet::ParquetException& e) {
@@ -121,6 +129,8 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
 }
 
 void ParquetReaderWrap::close() {
+    _closed = true;
+    _queue_writer_cond.notify_one();
     arrow::Status st = _parquet->Close();
     if (!st.ok()) {
         LOG(WARNING) << "close parquet file error: " << st.ToString();
@@ -195,25 +205,15 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
         _rows_of_group = _file_metadata->RowGroup(_current_group)
                                  ->num_rows(); //get rows of the current row group
         // read batch
-        arrow::Status status =
-                _reader->GetRecordBatchReader({_current_group}, _parquet_column_ids, &_rb_batch);
-        if (!status.ok()) {
-            return Status::InternalError("Get RecordBatchReader Failed.");
-        }
-        status = _rb_batch->ReadNext(&_batch);
-        if (!status.ok()) {
-            return Status::InternalError("Read Batch Error With Libarrow.");
-        }
+        RETURN_IF_ERROR(read_next_batch());
         _current_line_of_batch = 0;
     } else if (_current_line_of_batch >= _batch->num_rows()) {
         VLOG_DEBUG << "read_record_batch, current group id:" << _current_group
                    << " current line of batch:" << _current_line_of_batch
                    << " is larger than batch size:" << _batch->num_rows()
                    << ". start to read next batch";
-        arrow::Status status = _rb_batch->ReadNext(&_batch);
-        if (!status.ok()) {
-            return Status::InternalError("Read Batch Error With Libarrow.");
-        }
+        // read batch
+        RETURN_IF_ERROR(read_next_batch());
         _current_line_of_batch = 0;
     }
     return Status::OK();
@@ -537,6 +537,55 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
     return read_record_batch(tuple_slot_descs, eof);
 }
 
+void ParquetReaderWrap::prefetch_batch() {
+    auto insert_batch = [this](const auto& batch) {
+        std::unique_lock<std::mutex> lock(_mtx);
+        while (!_closed && _queue.size() == _max_queue_size) {
+            _queue_writer_cond.wait_for(lock, std::chrono::seconds(1));
+        }
+        if (UNLIKELY(_closed)) {
+            return;
+        }
+        _queue.push_back(batch);
+        _queue_reader_cond.notify_one();
+    };
+    int current_group = 0;
+    while (true) {
+        if (_closed || current_group >= _total_groups) {
+            return;
+        }
+        _status = _reader->GetRecordBatchReader({current_group}, _parquet_column_ids, &_rb_batch);
+        if (!_status.ok()) {
+            _closed = true;
+            return;
+        }
+        arrow::RecordBatchVector batches;
+        _status = _rb_batch->ReadAll(&batches);
+        if (!_status.ok()) {
+            _closed = true;
+            return;
+        }
+        std::for_each(batches.begin(), batches.end(), insert_batch);
+        current_group++;
+    }
+}
+
+Status ParquetReaderWrap::read_next_batch() {
+    std::unique_lock<std::mutex> lock(_mtx);
+    while (!_closed && _queue.empty()) {
+        _queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
+    }
+
+    if (UNLIKELY(_closed)) {
+        return Status::InternalError(_status.message());
+    }
+
+    _batch = _queue.front();
+    _queue.pop_front();
+    _queue_writer_cond.notify_one();
+    return Status::OK();
+}
+
 ParquetFile::ParquetFile(FileReader* file) : _file(file) {}
 
 ParquetFile::~ParquetFile() {
diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h
index 2bd5b5a802..93f1a2b2dd 100644
--- a/be/src/exec/parquet_reader.h
+++ b/be/src/exec/parquet_reader.h
@@ -22,6 +22,7 @@
 #include <arrow/io/api.h>
 #include <arrow/io/file.h>
 #include <arrow/io/interfaces.h>
+#include <arrow/status.h>
 #include <parquet/api/reader.h>
 #include <parquet/api/writer.h>
 #include <parquet/arrow/reader.h>
@@ -29,10 +30,16 @@
 #include <parquet/exception.h>
 #include <stdint.h>
 
+#include <atomic>
+#include <condition_variable>
+#include <list>
 #include <map>
+#include <mutex>
 #include <string>
+#include <thread>
 
 #include "common/status.h"
+#include "common/config.h"
 #include "gen_cpp/PaloBrokerService_types.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "gen_cpp/Types_types.h"
@@ -51,7 +58,7 @@ class FileReader;
 class ParquetFile : public arrow::io::RandomAccessFile {
 public:
     ParquetFile(FileReader* file);
-    virtual ~ParquetFile();
+    ~ParquetFile() override;
     arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override;
     arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
     arrow::Result<int64_t> GetSize() override;
@@ -89,9 +96,12 @@ private:
     Status handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t* buf,
                             int32_t* wbtyes);
 
+private:
+    void prefetch_batch();
+    Status read_next_batch();
+
 private:
     const int32_t _num_of_columns_from_file;
-    parquet::ReaderProperties _properties;
     std::shared_ptr<ParquetFile> _parquet;
 
     // parquet file reader object
@@ -110,6 +120,15 @@ private:
     int _current_line_of_batch;
 
     std::string _timezone;
+
+private:
+    std::atomic<bool> _closed = false;
+    arrow::Status _status;
+    std::mutex _mtx;
+    std::condition_variable _queue_reader_cond;
+    std::condition_variable _queue_writer_cond;
+    std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
+    const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 13/13: [improvement](planner) Backfill the original predicate pushdown code (#9703)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 25ee329f52dbe0d5d11f561d3838f917529e8f81
Author: zhengshiJ <32...@users.noreply.github.com>
AuthorDate: Sun May 22 21:35:32 2022 +0800

    [improvement](planner) Backfill the original predicate pushdown code (#9703)
    
    Due to the current architecture, predicate derivation at rewrite cannot satisfy all cases,
    because rewrite is performed on first and then where, and when there are subqueries, all cases cannot be derived.
    So keep the predicate pushdown method here.
    
    eg.
    select * from t1 left join t2 on t1 = t2 where t1 = 1;
    
    InferFiltersRule can't infer t2 = 1, because this is out of specification.
    
    The expression(t2 = 1) can actually be deduced to push it down to the scan node.
---
 .../java/org/apache/doris/analysis/Analyzer.java   |  39 ++++--
 .../apache/doris/planner/PredicatePushDown.java    | 151 +++++++++++++++++++++
 .../apache/doris/planner/SingleNodePlanner.java    |   3 +
 .../org/apache/doris/rewrite/InferFiltersRule.java |   8 ++
 .../apache/doris/rewrite/InferFiltersRuleTest.java |  23 +++-
 5 files changed, 213 insertions(+), 11 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index 4fceedf384..b7da3f004b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -265,31 +265,37 @@ public class Analyzer {
         public final Map<Pair<TupleId, TupleId>, JoinOperator> anyTwoTalesJoinOperator = Maps.newHashMap();
 
         // slotEqSlotExpr: Record existing and infer equivalent connections
-        public final List<Expr> onSlotEqSlotExpr = new ArrayList<>();
+        private final List<Expr> onSlotEqSlotExpr = new ArrayList<>();
 
         // slotEqSlotDeDuplication: De-Duplication for slotEqSlotExpr
-        public final Set<Pair<Expr, Expr>> onSlotEqSlotDeDuplication = Sets.newHashSet();
+        private final Set<Pair<Expr, Expr>> onSlotEqSlotDeDuplication = Sets.newHashSet();
 
         // slotToLiteralExpr: Record existing and infer expr which slot and literal are equal
-        public final List<Expr> onSlotToLiteralExpr = new ArrayList<>();
+        private final List<Expr> onSlotToLiteralExpr = new ArrayList<>();
 
         // slotToLiteralDeDuplication: De-Duplication for slotToLiteralExpr
-        public final Set<Pair<Expr, Expr>> onSlotToLiteralDeDuplication = Sets.newHashSet();
+        private final Set<Pair<Expr, Expr>> onSlotToLiteralDeDuplication = Sets.newHashSet();
 
         // inExpr: Recoud existing and infer expr which in predicate
-        public final List<Expr> onInExpr = new ArrayList<>();
+        private final List<Expr> onInExpr = new ArrayList<>();
 
         // inExprDeDuplication: De-Duplication for inExpr
-        public final Set<Expr> onInDeDuplication = Sets.newHashSet();
+        private final Set<Expr> onInDeDuplication = Sets.newHashSet();
 
         // isNullExpr: Record existing and infer not null predicate
-        public final List<Expr> onIsNullExpr = new ArrayList<>();
+        private final List<Expr> onIsNullExpr = new ArrayList<>();
 
         //isNullDeDuplication: De-Duplication for isNullExpr
-        public final Set<Expr> onIsNullDeDuplication = Sets.newHashSet();
+        private final Set<Expr> onIsNullDeDuplication = Sets.newHashSet();
+
+        // slotToLiteralDeDuplication: De-Duplication for slotToLiteralExpr. Contain on and where.
+        private final Set<Pair<Expr, Expr>> globalSlotToLiteralDeDuplication = Sets.newHashSet();
+
+        // inExprDeDuplication: De-Duplication for inExpr. Contain on and where
+        private final Set<Expr> globalInDeDuplication = Sets.newHashSet();
 
         // map from slot id to the analyzer/block in which it was registered
-        public final Map<SlotId, Analyzer> blockBySlot = Maps.newHashMap();
+        private final Map<SlotId, Analyzer> blockBySlot = Maps.newHashMap();
 
         // Expr rewriter for normalizing and rewriting expressions.
         private final ExprRewriter exprRewriter_;
@@ -989,6 +995,14 @@ public class Analyzer {
         globalState.onIsNullDeDuplication.add(expr);
     }
 
+    public void registerGlobalSlotToLiteralDeDuplication(Pair<Expr, Expr> pair) {
+        globalState.globalSlotToLiteralDeDuplication.add(pair);
+    }
+
+    public void registerGlobalInDeDuplication(Expr expr) {
+        globalState.globalInDeDuplication.add(expr);
+    }
+
     public void registerConjunct(Expr e, TupleId tupleId) throws AnalysisException {
         final List<Expr> exprs = Lists.newArrayList();
         exprs.add(e);
@@ -1446,6 +1460,13 @@ public class Analyzer {
         return Sets.newHashSet(globalState.onIsNullDeDuplication);
     }
 
+    public Set<Pair<Expr, Expr>> getGlobalSlotToLiteralDeDuplication() {
+        return Sets.newHashSet(globalState.globalSlotToLiteralDeDuplication);
+    }
+
+    public Set<Expr> getGlobalInDeDuplication() {
+        return Sets.newHashSet(globalState.globalInDeDuplication);
+    }
     /**
      * Makes the given semi-joined tuple visible such that its slots can be referenced.
      * If tid is null, makes the currently visible semi-joined tuple invisible again.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java
new file mode 100644
index 0000000000..76c2ace45f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.JoinOperator;
+import org.apache.doris.analysis.Predicate;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.Pair;
+
+import org.apache.directory.api.util.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+/**
+ * Due to the current architecture, predicate derivation at rewrite cannot satisfy all cases,
+ * because rewrite is performed on first and then where, and when there are subqueries, all cases cannot be derived.
+ * So keep the predicate pushdown method here.
+ *
+ * <p>
+ *     eg:
+ *      origin: select * from t1 left join t2 on t1 = t2 where t1 = 1;
+ *      after: The function will be derived t2 = 1
+ * </p>
+ *
+ */
+public class PredicatePushDown {
+    private static final Logger LOG = LogManager.getLogger(PredicatePushDown.class);
+
+    /**
+     * Desc: Predicate pushdown for inner and left join.
+
+     * @param scanNode ScanNode to be judged
+     * @param joinOp join Operator
+     * @param analyzer global context
+     * @return {@link PlanNode}
+     */
+    public static PlanNode visitScanNode(ScanNode scanNode, JoinOperator joinOp, Analyzer analyzer) {
+        switch (joinOp) {
+            case INNER_JOIN:
+            case LEFT_OUTER_JOIN:
+                predicateFromLeftSidePropagatesToRightSide(scanNode, analyzer);
+                break;
+            // TODO
+            default:
+                break;
+        }
+        return scanNode;
+    }
+
+    private static void predicateFromLeftSidePropagatesToRightSide(ScanNode scanNode, Analyzer analyzer) {
+        List<TupleId> tupleIdList = scanNode.getTupleIds();
+        if (tupleIdList.size() != 1) {
+            LOG.info("The predicate pushdown is not reflected "
+                            + "because the scan node involves more then one tuple:{}",
+                    Strings.listToString(tupleIdList));
+            return;
+        }
+        TupleId rightSideTuple = tupleIdList.get(0);
+        List<Expr> unassignedRightSideConjuncts = analyzer.getUnassignedConjuncts(scanNode);
+        List<Expr> eqJoinPredicates = analyzer.getEqJoinConjuncts(rightSideTuple);
+        if (eqJoinPredicates != null) {
+            List<Expr> allConjuncts = analyzer.getConjuncts(analyzer.getAllTupleIds());
+            allConjuncts.removeAll(unassignedRightSideConjuncts);
+            for (Expr conjunct : allConjuncts) {
+                if (!Predicate.canPushDownPredicate(conjunct)) {
+                    continue;
+                }
+                for (Expr eqJoinPredicate : eqJoinPredicates) {
+                    // we can ensure slot is left node, because NormalizeBinaryPredicatesRule
+                    SlotRef otherSlot = conjunct.getChild(0).unwrapSlotRef();
+
+                    // ensure the children for eqJoinPredicate both be SlotRef
+                    if (eqJoinPredicate.getChild(0).unwrapSlotRef() == null
+                            || eqJoinPredicate.getChild(1).unwrapSlotRef() == null) {
+                        continue;
+                    }
+
+                    SlotRef leftSlot = eqJoinPredicate.getChild(0).unwrapSlotRef();
+                    SlotRef rightSlot = eqJoinPredicate.getChild(1).unwrapSlotRef();
+                    // ensure the type is match
+                    if (!leftSlot.getDesc().getType().matchesType(rightSlot.getDesc().getType())) {
+                        continue;
+                    }
+
+                    // example: t1.id = t2.id and t1.id = 1  => t2.id =1
+                    if (otherSlot.isBound(leftSlot.getSlotId())
+                            && rightSlot.isBound(rightSideTuple)) {
+                        Expr pushDownConjunct = rewritePredicate(analyzer, conjunct, rightSlot);
+                        LOG.debug("pushDownConjunct: {}", pushDownConjunct);
+                        if (!analyzer.getGlobalInDeDuplication().contains(pushDownConjunct)
+                                && !analyzer.getGlobalSlotToLiteralDeDuplication()
+                                .contains(new Pair<>(pushDownConjunct.getChild(0), pushDownConjunct.getChild(1)))) {
+                            scanNode.addConjunct(pushDownConjunct);
+                        }
+                    } else if (otherSlot.isBound(rightSlot.getSlotId())
+                            && leftSlot.isBound(rightSideTuple)) {
+                        Expr pushDownConjunct = rewritePredicate(analyzer, conjunct, leftSlot);
+                        LOG.debug("pushDownConjunct: {}", pushDownConjunct);
+                        if (!analyzer.getGlobalInDeDuplication().contains(pushDownConjunct)
+                                && !analyzer.getGlobalSlotToLiteralDeDuplication()
+                                .contains(new Pair<>(pushDownConjunct.getChild(0), pushDownConjunct.getChild(1)))) {
+                            scanNode.addConjunct(pushDownConjunct);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    // Rewrite the oldPredicate with new leftChild
+    // For example: oldPredicate is t1.id = 1, leftChild is t2.id, will return t2.id = 1
+    private static Expr rewritePredicate(Analyzer analyzer, Expr oldPredicate, Expr leftChild) {
+        if (oldPredicate instanceof BinaryPredicate) {
+            BinaryPredicate oldBP = (BinaryPredicate) oldPredicate;
+            BinaryPredicate bp = new BinaryPredicate(oldBP.getOp(), leftChild, oldBP.getChild(1));
+            bp.analyzeNoThrow(analyzer);
+            return bp;
+        }
+
+        if (oldPredicate instanceof InPredicate) {
+            InPredicate oldIP = (InPredicate) oldPredicate;
+            InPredicate ip = new InPredicate(leftChild, oldIP.getListChildren(), oldIP.isNotIn());
+            ip.analyzeNoThrow(analyzer);
+            return ip;
+        }
+
+        return oldPredicate;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 0f92c4d803..5236776f51 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1703,6 +1703,9 @@ public class SingleNodePlanner {
                 break;
         }
         if (scanNode instanceof OlapScanNode || scanNode instanceof EsScanNode || scanNode instanceof HiveScanNode) {
+            if (analyzer.enableInferPredicate()) {
+                PredicatePushDown.visitScanNode(scanNode, tblRef.getJoinOp(), analyzer);
+            }
             scanNode.setSortColumn(tblRef.getSortColumn());
         }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java b/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java
index 3ccd390144..0dd90ed502 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java
@@ -181,6 +181,7 @@ public class InferFiltersRule implements ExprRewriteRule {
                         analyzer.registerOnSlotToLiteralDeDuplication(pair);
                         analyzer.registerOnSlotToLiteralExpr(conjunct);
                     }
+                    analyzer.registerGlobalSlotToLiteralDeDuplication(pair);
                 }
             } else if (conjunct.getChild(0).unwrapSlotRef() instanceof SlotRef
                     && conjunct.getChild(1).unwrapSlotRef() instanceof SlotRef) {
@@ -220,6 +221,7 @@ public class InferFiltersRule implements ExprRewriteRule {
                     analyzer.registerInExpr(conjunct);
                     analyzer.registerInDeDuplication(conjunct.getChild(0).unwrapSlotRef());
                 }
+                analyzer.registerGlobalInDeDuplication(conjunct.getChild(0).unwrapSlotRef());
             }
         }
     }
@@ -498,6 +500,9 @@ public class InferFiltersRule implements ExprRewriteRule {
                     analyzer.registerOnSlotToLiteralDeDuplication(pair);
                     analyzer.registerOnSlotToLiteralExpr(newBP);
                 }
+                if (needAddnewExprWithState) {
+                    analyzer.registerGlobalSlotToLiteralDeDuplication(pair);
+                }
             }
         }
     }
@@ -666,6 +671,9 @@ public class InferFiltersRule implements ExprRewriteRule {
                     analyzer.registerInDeDuplication(newIP);
                     analyzer.registerInExpr(newIP);
                 }
+                if (needAddnewExprWithState) {
+                    analyzer.registerGlobalInDeDuplication(newIP);
+                }
             }
         }
     }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/rewrite/InferFiltersRuleTest.java b/fe/fe-core/src/test/java/org/apache/doris/rewrite/InferFiltersRuleTest.java
index c3c14f9ca5..bfaefc8287 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/rewrite/InferFiltersRuleTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/rewrite/InferFiltersRuleTest.java
@@ -301,7 +301,7 @@ public class InferFiltersRuleTest {
     }
 
     @Test
-    public void testOnAndWhere2TablesLeftJoin() throws Exception {
+    public void testOnAndWhere2TablesLeftJoin2ndIsLiteral() throws Exception {
         SessionVariable sessionVariable = dorisAssert.getSessionVariable();
         sessionVariable.setEnableInferPredicate(true);
         Assert.assertTrue(sessionVariable.isEnableInferPredicate());
@@ -311,7 +311,7 @@ public class InferFiltersRuleTest {
     }
 
     @Test
-    public void testOnAndWhere2TablesInnerJoin() throws Exception {
+    public void testOnAndWhere2TablesInnerJoin2ndIsLiteral() throws Exception {
         SessionVariable sessionVariable = dorisAssert.getSessionVariable();
         sessionVariable.setEnableInferPredicate(true);
         Assert.assertTrue(sessionVariable.isEnableInferPredicate());
@@ -320,4 +320,23 @@ public class InferFiltersRuleTest {
         Assert.assertTrue(planString.contains("`tb1`.`k1` = 1"));
     }
 
+    @Test
+    public void testOnAndWhere2TableLeftJoin1stIsLiteral() throws Exception {
+        SessionVariable sessionVariable = dorisAssert.getSessionVariable();
+        sessionVariable.setEnableInferPredicate(true);
+        Assert.assertTrue(sessionVariable.isEnableInferPredicate());
+        String query = "select * from tb1 left join tb2 on tb1.k1 = tb2.k1 where tb1.k1 = 1";
+        String planString = dorisAssert.query(query).explainQuery();
+        Assert.assertTrue(planString.contains("`tb2`.`k1` = 1"));
+    }
+
+    @Test
+    public void testOnAndWhere2TablesInnerJoin1stIsLiteral() throws Exception {
+        SessionVariable sessionVariable = dorisAssert.getSessionVariable();
+        sessionVariable.setEnableInferPredicate(true);
+        Assert.assertTrue(sessionVariable.isEnableInferPredicate());
+        String query = "select * from tb1 inner join tb2 on tb1.k1 = tb2.k1 where tb1.k1 = 1";
+        String planString = dorisAssert.query(query).explainQuery();
+        Assert.assertTrue(planString.contains("`tb2`.`k1` = 1"));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 01/13: [improvement](stream-load) adjust read unit of http to optimize stream load (#9154)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit b6489024a0e7dd837e5dbc3736d03fdbe446f160
Author: Yongqiang YANG <98...@users.noreply.github.com>
AuthorDate: Fri May 20 09:52:36 2022 +0800

    [improvement](stream-load) adjust read unit of http to optimize stream load (#9154)
---
 be/src/http/action/stream_load.cpp            |   4 +-
 be/src/runtime/fragment_mgr.cpp               |   2 +-
 be/src/runtime/stream_load/stream_load_pipe.h |   6 +-
 thirdparty/patches/libevent.patch             | 126 ++++++++++++++++++--------
 4 files changed, 94 insertions(+), 44 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index 65b0edcf3a..d97ad43edc 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -340,7 +340,7 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
 
     int64_t start_read_data_time = MonotonicNanos();
     while (evbuffer_get_length(evbuf) > 0) {
-        auto bb = ByteBuffer::allocate(4096);
+        auto bb = ByteBuffer::allocate(128 * 1024);
         auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
         bb->pos = remove_bytes;
         bb->flip();
@@ -383,7 +383,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
     request.formatType = ctx->format;
     request.__set_loadId(ctx->id.to_thrift());
     if (ctx->use_streaming) {
-        auto pipe = std::make_shared<StreamLoadPipe>(1024 * 1024 /* max_buffered_bytes */,
+        auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /* max_buffered_bytes */,
                                                      64 * 1024 /* min_chunk_size */,
                                                      ctx->body_bytes /* total_length */);
         RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe));
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 88fe4c7963..02bf9d5bec 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -502,7 +502,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
         stream_load_cxt->need_commit_self = true;
         stream_load_cxt->need_rollback = true;
         // total_length == -1 means read one message from pipe in once time, don't care the length.
-        auto pipe = std::make_shared<StreamLoadPipe>(1024 * 1024 /* max_buffered_bytes */,
+        auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /* max_buffered_bytes */,
                                                      64 * 1024 /* min_chunk_size */,
                                                      -1 /* total_length */, true /* use_proto */);
         stream_load_cxt->body_sink = pipe;
diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h
index d5d3006aed..c0e573b078 100644
--- a/be/src/runtime/stream_load/stream_load_pipe.h
+++ b/be/src/runtime/stream_load/stream_load_pipe.h
@@ -29,12 +29,14 @@
 
 namespace doris {
 
+const size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;
 // StreamLoadPipe use to transfer data from producer to consumer
 // Data in pip is stored in chunks.
 class StreamLoadPipe : public MessageBodySink, public FileReader {
 public:
-    StreamLoadPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024,
-                   int64_t total_length = -1, bool use_proto = false)
+    StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
+                   size_t min_chunk_size = 64 * 1024, int64_t total_length = -1,
+                   bool use_proto = false)
             : _buffered_bytes(0),
               _proto_buffered_bytes(0),
               _max_buffered_bytes(max_buffered_bytes),
diff --git a/thirdparty/patches/libevent.patch b/thirdparty/patches/libevent.patch
index 83e426d62e..a545897cf1 100644
--- a/thirdparty/patches/libevent.patch
+++ b/thirdparty/patches/libevent.patch
@@ -1,6 +1,75 @@
-diff -uprN a/http.c b/http.c
---- a/http.c	2020-07-05 20:02:46.000000000 +0800
-+++ b/http.c	2021-09-28 13:56:14.045159153 +0800
+diff --git a/CMakeLists.txt b/CMakeLists.txt
+index 676727f1..833fbf70 100644
+--- a/CMakeLists.txt
++++ b/CMakeLists.txt
+@@ -200,7 +200,7 @@ endif()
+ if (("${CMAKE_C_COMPILER_ID}" STREQUAL "GNU") OR (${CLANG}))
+     set(GNUC 1)
+ endif()
+-if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR (${CLANG}))
++if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR ("${CMAKE_C_SIMULATE_ID}" STREQUAL "MSVC"))
+     set(MSVC 1)
+ endif()
+ 
+diff --git a/buffer.c b/buffer.c
+index 3524b350..e5d97458 100644
+--- a/buffer.c
++++ b/buffer.c
+@@ -2204,9 +2204,9 @@ evbuffer_expand(struct evbuffer *buf, size_t datlen)
+ #define IOV_LEN_TYPE unsigned long
+ #endif
+ #endif
+-#define NUM_READ_IOVEC 4
++#define NUM_READ_IOVEC 8
+ 
+-#define EVBUFFER_MAX_READ	4096
++#define EVBUFFER_MAX_READ	(128 * 1024)
+ 
+ /** Helper function to figure out which space to use for reading data into
+     an evbuffer.  Internal use only.
+diff --git a/bufferevent_async.c b/bufferevent_async.c
+index 40c7c5e8..c1624878 100644
+--- a/bufferevent_async.c
++++ b/bufferevent_async.c
+@@ -275,7 +275,7 @@ bev_async_consider_reading(struct bufferevent_async *beva)
+ 		}
+ 		at_most = read_high - cur_size;
+ 	} else {
+-		at_most = 16384; /* FIXME totally magic. */
++		at_most = 128 * 1024; /* FIXME totally magic. */
+ 	}
+ 
+ 	/* XXXX This over-commits. */
+diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c
+index 25874968..9bc2b577 100644
+--- a/bufferevent_ratelim.c
++++ b/bufferevent_ratelim.c
+@@ -179,7 +179,7 @@ ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
+ }
+ 
+ /* Default values for max_single_read & max_single_write variables. */
+-#define MAX_SINGLE_READ_DEFAULT 16384
++#define MAX_SINGLE_READ_DEFAULT (128 * 1024)
+ #define MAX_SINGLE_WRITE_DEFAULT 16384
+ 
+ #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
+diff --git a/http-internal.h b/http-internal.h
+index feaf436d..9f9b5ab5 100644
+--- a/http-internal.h
++++ b/http-internal.h
+@@ -167,6 +167,8 @@ struct evhttp {
+ 	void *gencbarg;
+ 	struct bufferevent* (*bevcb)(struct event_base *, void *);
+ 	void *bevcbarg;
++	int (*newreqcb)(struct evhttp_request *req, void *);
++	void *newreqcbarg;
+ 
+ 	struct event_base *base;
+ };
+diff --git a/http.c b/http.c
+index 04f089bc..53951cba 100644
+--- a/http.c
++++ b/http.c
 @@ -3975,6 +3975,14 @@ evhttp_set_bevcb(struct evhttp *http,
  	http->bevcbarg = cbarg;
  }
@@ -16,7 +85,7 @@ diff -uprN a/http.c b/http.c
  /*
   * Request related functions
   */
-@@ -4036,6 +4044,8 @@ evhttp_request_free(struct evhttp_reques
+@@ -4036,6 +4044,8 @@ evhttp_request_free(struct evhttp_request *req)
  		req->flags |= EVHTTP_REQ_NEEDS_FREE;
  		return;
  	}
@@ -25,7 +94,7 @@ diff -uprN a/http.c b/http.c
  
  	if (req->remote_host != NULL)
  		mm_free(req->remote_host);
-@@ -4116,6 +4126,15 @@ evhttp_request_set_on_complete_cb(struct
+@@ -4116,6 +4126,15 @@ evhttp_request_set_on_complete_cb(struct evhttp_request *req,
  	req->on_complete_cb_arg = cb_arg;
  }
  
@@ -41,7 +110,7 @@ diff -uprN a/http.c b/http.c
  /*
   * Allows for inspection of the request URI
   */
-@@ -4307,10 +4326,15 @@ evhttp_associate_new_request_with_connec
+@@ -4307,10 +4326,15 @@ evhttp_associate_new_request_with_connection(struct evhttp_connection *evcon)
  	 */
  	req->userdone = 1;
  
@@ -59,25 +128,15 @@ diff -uprN a/http.c b/http.c
  
  	evhttp_start_read_(evcon);
  
-diff -uprN a/http-internal.h b/http-internal.h
---- a/http-internal.h	2020-07-05 20:02:46.000000000 +0800
-+++ b/http-internal.h	2021-09-28 13:56:13.925151028 +0800
-@@ -167,6 +167,8 @@ struct evhttp {
- 	void *gencbarg;
- 	struct bufferevent* (*bevcb)(struct event_base *, void *);
- 	void *bevcbarg;
-+	int (*newreqcb)(struct evhttp_request *req, void *);
-+	void *newreqcbarg;
- 
- 	struct event_base *base;
- };
-diff -uprN a/include/event2/http.h b/include/event2/http.h
---- a/include/event2/http.h	2020-07-05 20:02:46.000000000 +0800
-+++ b/include/event2/http.h	2021-09-28 13:56:13.928151231 +0800
-@@ -299,6 +299,20 @@ void evhttp_set_bevcb(struct evhttp *htt
+diff --git a/include/event2/http.h b/include/event2/http.h
+index 2a41303e..e80bab9a 100644
+--- a/include/event2/http.h
++++ b/include/event2/http.h
+@@ -298,6 +298,20 @@ EVENT2_EXPORT_SYMBOL
+ void evhttp_set_bevcb(struct evhttp *http,
      struct bufferevent *(*cb)(struct event_base *, void *), void *arg);
  
- /**
++/**
 +   Set a callback which allows the user to note or throttle incoming requests.
 +   The requests are not populated with HTTP level information. They
 +   are just associated to a connection.
@@ -91,10 +150,9 @@ diff -uprN a/include/event2/http.h b/include/event2/http.h
 +void evhttp_set_newreqcb(struct evhttp *http,
 +    int (*cb)(struct evhttp_request*, void *), void *arg);
 +
-+/**
+ /**
     Adds a virtual host to the http server.
  
-    A virtual host is a newly initialized evhttp object that has request
 @@ -624,6 +638,20 @@ EVENT2_EXPORT_SYMBOL
  void evhttp_request_set_on_complete_cb(struct evhttp_request *req,
      void (*cb)(struct evhttp_request *, void *), void *cb_arg);
@@ -116,9 +174,10 @@ diff -uprN a/include/event2/http.h b/include/event2/http.h
  /** Frees the request object and removes associated events. */
  EVENT2_EXPORT_SYMBOL
  void evhttp_request_free(struct evhttp_request *req);
-diff -uprN a/include/event2/http_struct.h b/include/event2/http_struct.h
---- a/include/event2/http_struct.h	2020-07-05 20:02:46.000000000 +0800
-+++ b/include/event2/http_struct.h	2021-09-28 13:56:13.928151231 +0800
+diff --git a/include/event2/http_struct.h b/include/event2/http_struct.h
+index 4bf5b1ff..0762cabd 100644
+--- a/include/event2/http_struct.h
++++ b/include/event2/http_struct.h
 @@ -142,6 +142,12 @@ struct {
  	 */
  	void (*on_complete_cb)(struct evhttp_request *, void *);
@@ -132,14 +191,3 @@ diff -uprN a/include/event2/http_struct.h b/include/event2/http_struct.h
  };
  
  #ifdef __cplusplus
-diff -uprN a/CMakeLists.txt b/CMakeLists.txt
---- a/CMakeLists.txt	2020-07-05 20:02:46.000000000 +0800
-+++ b/CMakeLists.txt	2022-01-10 13:29:32.912883436 +0800
-@@ -200,6 +200,6 @@ endif()
- if (("${CMAKE_C_COMPILER_ID}" STREQUAL "GNU") OR (${CLANG}))
-     set(GNUC 1)
- endif()
--if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR (${CLANG}))
-+if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR ("${CMAKE_C_SIMULATE_ID}" STREQUAL "MSVC"))
-     set(MSVC 1)
- endif()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 11/13: [config](checksum) Disable consistency checker by default (#9699)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit c4c436453495d293c700e10bd0aced53b38f86be
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Sun May 22 21:31:43 2022 +0800

    [config](checksum) Disable consistency checker by default (#9699)
    
    Disable by default because current checksum logic has some bugs.
    And it will also bring some overhead.
---
 docs/en/administrator-guide/config/fe_config.md               | 10 +++++++---
 docs/zh-CN/administrator-guide/config/fe_config.md            | 10 +++++++---
 fe/fe-core/src/main/java/org/apache/doris/common/Config.java  |  7 +++++--
 fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java |  3 ++-
 4 files changed, 21 insertions(+), 9 deletions(-)

diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md
index e68e6a56ee..bfef450d17 100644
--- a/docs/en/administrator-guide/config/fe_config.md
+++ b/docs/en/administrator-guide/config/fe_config.md
@@ -1021,17 +1021,21 @@ IsMutable:true
 
 MasterOnly:true
 
-Consistency checker will run from *consistency_check_start_time* to *consistency_check_end_time*. Default is from 23:00 to 04:00 
+Consistency checker will run from *consistency_check_start_time* to *consistency_check_end_time*.
+
+If the two times are the same, no consistency check will be triggered.
 
 ### consistency_check_end_time
 
-Default:04
+Default:23
 
 IsMutable:true
 
 MasterOnly:true
 
-Consistency checker will run from *consistency_check_start_time* to *consistency_check_end_time*. Default is from 23:00 to 04:00 
+Consistency checker will run from *consistency_check_start_time* to *consistency_check_end_time*.
+
+If the two times are the same, no consistency check will be triggered.
 
 ### export_tablet_num_per_task
 
diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md
index e082155212..0528430a60 100644
--- a/docs/zh-CN/administrator-guide/config/fe_config.md
+++ b/docs/zh-CN/administrator-guide/config/fe_config.md
@@ -1021,11 +1021,13 @@ colocote join PlanFragment instance 的 memory_limit = exec_mem_limit / min (que
 
 一致性检查开始时间
 
-一致性检查器将从 `consistency_check_start_time` 运行到 `consistency_check_end_time`。 默认为 23:00 至 04:00
+一致性检查器将从 `consistency_check_start_time` 运行到 `consistency_check_end_time`。
+
+如果两个时间相同,则不会触发一致性检查。
 
 ### `consistency_check_end_time`
 
-默认值:04
+默认值:23
 
 是否可以动态配置:true
 
@@ -1033,7 +1035,9 @@ colocote join PlanFragment instance 的 memory_limit = exec_mem_limit / min (que
 
 一致性检查结束时间
 
-一致性检查器将从 `consistency_check_start_time` 运行到 `consistency_check_end_time`。 默认为 23:00 至 04:00
+一致性检查器将从 `consistency_check_start_time` 运行到 `consistency_check_end_time`。
+
+如果两个时间相同,则不会触发一致性检查。
 
 ### `export_tablet_num_per_task`
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 14e827f3ab..6660d5e63c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -841,12 +841,15 @@ public class Config extends ConfigBase {
     // Configurations for consistency check
     /**
      * Consistency checker will run from *consistency_check_start_time* to *consistency_check_end_time*.
-     * Default is from 23:00 to 04:00
+     * If start time == end time, the checker will stop scheduling.
+     * And default is disabled.
+     * TODO(cmy): Disable by default because current checksum logic has some bugs.
+     * And it will also bring some overhead.
      */
     @ConfField(mutable = true, masterOnly = true)
     public static String consistency_check_start_time = "23";
     @ConfField(mutable = true, masterOnly = true)
-    public static String consistency_check_end_time = "4";
+    public static String consistency_check_end_time = "23";
     /**
      * Default timeout of a single consistency check task. Set long enough to fit your tablet size.
      */
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 91083f2e1f..7c050bdc2c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -640,7 +640,8 @@ public class Coordinator {
                         switch (code) {
                             case TIMEOUT:
                                 throw new RpcException(pair.first.backend.getHost(), "send fragment timeout. backend id: "
-                                        + pair.first.backend.getId());
+                                        + pair.first.backend.getId() + " fragment: " +
+                                        DebugUtil.printId(pair.first.rpcParams.params.fragment_instance_id));
                             case THRIFT_RPC_ERROR:
                                 SimpleScheduler.addToBlacklist(pair.first.backend.getId(), errMsg);
                                 throw new RpcException(pair.first.backend.getHost(), "rpc failed");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 09/13: [FeConfig](Project) Project optimization is enabled by default (#9667)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit b28b77a32f506e06491ed2a74814809665e2b0c3
Author: EmmyMiao87 <52...@qq.com>
AuthorDate: Thu May 19 14:03:14 2022 +0800

    [FeConfig](Project) Project optimization is enabled by default (#9667)
---
 fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index a3ee7f5baf..577ec2825d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -439,7 +439,7 @@ public class SessionVariable implements Serializable, Writable {
     public double autoBroadcastJoinThreshold = 0.8;
   
     @VariableMgr.VarAttr(name = ENABLE_PROJECTION)
-    private boolean enableProjection = false;
+    private boolean enableProjection = true;
 
     @VariableMgr.VarAttr(name = TRIM_TAILING_SPACES_FOR_EXTERNAL_TABLE_QUERY, needForward = true)
     public boolean trimTailingSpacesForExternalTableQuery = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 07/13: [deps] libhdfs3 build enable kerberos support (#9524)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 8a375dcafb27e962e73fe757484a24286e38c515
Author: gtchaos <gs...@gmail.com>
AuthorDate: Sun May 22 20:58:19 2022 +0800

    [deps] libhdfs3 build enable kerberos support (#9524)
    
    Currently, the libhdfs3 library integrated by doris BE does not support accessing the cluster with kerberos authentication
    enabled, and found that kerberos-related dependencies(gsasl and krb5) were not added when build libhdfs3.
    
    so, this pr will enable kerberos support and rebuild libhdfs3 with dependencies gsasl and krb5:
    
    - gsasl version: 1.8.0
    - krb5 version: 1.19
---
 be/CMakeLists.txt                       | 50 +++++++++++++++++++++++++++++----
 build.sh                                |  6 +++-
 run-be-ut.sh                            |  1 +
 thirdparty/build-thirdparty.sh          | 37 ++++++++++++++++++++++--
 thirdparty/download-thirdparty.sh       | 12 +++++++-
 thirdparty/patches/libgsasl-1.8.0.patch | 24 ++++++++++++++++
 thirdparty/vars.sh                      | 20 +++++++++++--
 7 files changed, 139 insertions(+), 11 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 687a9629ca..af3f4ac7e1 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -81,6 +81,7 @@ set(BASE_DIR "${CMAKE_CURRENT_SOURCE_DIR}")
 set(ENV{DORIS_HOME} "${BASE_DIR}/../")
 set(BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}")
 set(THIRDPARTY_DIR "$ENV{DORIS_THIRDPARTY}/installed/")
+set(HDFS3_KRB5_INSTALL_DIR "$ENV{DORIS_THIRDPARTY}/installed/libhdfs_with_kerberos/")
 set(GENSRC_DIR "${BASE_DIR}/../gensrc/build/")
 set(SRC_DIR "${BASE_DIR}/src/")
 set(TEST_DIR "${CMAKE_SOURCE_DIR}/test/")
@@ -93,6 +94,7 @@ else()
 endif()
 message(STATUS "make test: ${MAKE_TEST}")
 option(WITH_MYSQL "Support access MySQL" ON)
+option(WITH_KERBEROS "Enable or disable Kereberos support" ${WITH_KERBEROS})
 
 # Check gcc
 if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
@@ -321,17 +323,40 @@ set_target_properties(minizip PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib
 add_library(idn STATIC IMPORTED)
 set_target_properties(idn PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libidn.a)
 
-add_library(gsasl STATIC IMPORTED)
-set_target_properties(gsasl PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libgsasl.a)
-
 add_library(breakpad STATIC IMPORTED)
 set_target_properties(breakpad PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libbreakpad_client.a)
 
-if (ARCH_AMD64)
-    # libhdfs3 only support x86 or amd64
+if (WITH_KERBEROS)
+    # kerberos lib for libhdfs3 
+    add_library(gsasl STATIC IMPORTED)
+    set_target_properties(gsasl PROPERTIES IMPORTED_LOCATION ${HDFS3_KRB5_INSTALL_DIR}/lib/libgsasl.a)
+
+    add_library(krb5support STATIC IMPORTED)
+    set_target_properties(krb5support PROPERTIES IMPORTED_LOCATION ${HDFS3_KRB5_INSTALL_DIR}/lib/libkrb5support.a)
+    
+    add_library(krb5 STATIC IMPORTED)
+    set_target_properties(krb5 PROPERTIES IMPORTED_LOCATION ${HDFS3_KRB5_INSTALL_DIR}/lib/libkrb5.a)
+
+    add_library(com_err STATIC IMPORTED)
+    set_target_properties(com_err PROPERTIES IMPORTED_LOCATION ${HDFS3_KRB5_INSTALL_DIR}/lib/libcom_err.a)
+
+    add_library(k5crypto STATIC IMPORTED)
+    set_target_properties(k5crypto PROPERTIES IMPORTED_LOCATION ${HDFS3_KRB5_INSTALL_DIR}/lib/libk5crypto.a)
+
+    add_library(gssapi_krb5 STATIC IMPORTED)
+    set_target_properties(gssapi_krb5 PROPERTIES IMPORTED_LOCATION ${HDFS3_KRB5_INSTALL_DIR}/lib/libgssapi_krb5.a)
+
+    add_library(hdfs3 STATIC IMPORTED)
+    set_target_properties(hdfs3 PROPERTIES IMPORTED_LOCATION ${HDFS3_KRB5_INSTALL_DIR}/lib/libhdfs3.a)
+else()
+    add_library(gsasl STATIC IMPORTED)
+    set_target_properties(gsasl PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libgsasl.a)
+
     add_library(hdfs3 STATIC IMPORTED)
     set_target_properties(hdfs3 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libhdfs3.a)
+endif()
 
+if (ARCH_AMD64)
     add_library(xml2 STATIC IMPORTED)
     set_target_properties(xml2 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libxml2.a)
 
@@ -478,6 +503,14 @@ include_directories(
 set(WL_START_GROUP "-Wl,--start-group")
 set(WL_END_GROUP "-Wl,--end-group")
 
+
+set(KRB5_LIBS
+    krb5support
+    krb5
+    com_err
+    gssapi_krb5
+    k5crypto)
+
 set(AWS_LIBS
     aws-sdk-s3
     aws-sdk-core
@@ -618,6 +651,12 @@ if (WITH_MYSQL)
         )
 endif()
 
+if (WITH_KERBEROS)
+    set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES}
+    	${KRB5_LIBS}
+        )
+endif()
+
 set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} ${WL_END_GROUP})
 
 message(STATUS "DORIS_DEPENDENCIES is ${DORIS_DEPENDENCIES}")
@@ -629,6 +668,7 @@ set(DORIS_LINK_LIBS ${DORIS_LINK_LIBS}
     -static-libstdc++
     -static-libgcc
     -lstdc++fs
+    -lresolv
 )
 
 if ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU")
diff --git a/build.sh b/build.sh
index f5be3971fb..00123b90e2 100755
--- a/build.sh
+++ b/build.sh
@@ -175,7 +175,9 @@ if [[ ${HELP} -eq 1 ]]; then
     usage
     exit
 fi
-
+if [[ -z ${WITH_KERBEROS} ]]; then
+    WITH_KERBEROS=ON
+fi
 # build thirdparty libraries if necessary
 if [[ ! -f ${DORIS_THIRDPARTY}/installed/lib/libbacktrace.a ]]; then
     echo "Thirdparty libraries need to be build ..."
@@ -225,6 +227,7 @@ echo "Get params:
     PARALLEL            -- $PARALLEL
     CLEAN               -- $CLEAN
     WITH_MYSQL          -- $WITH_MYSQL
+    WITH_KERBEROS       -- $WITH_KERBEROS
     WITH_LZO            -- $WITH_LZO
     GLIBC_COMPATIBILITY -- $GLIBC_COMPATIBILITY
     USE_AVX2            -- $USE_AVX2
@@ -263,6 +266,7 @@ if [ ${BUILD_BE} -eq 1 ] ; then
             -DMAKE_TEST=OFF \
             ${CMAKE_USE_CCACHE} \
             -DWITH_MYSQL=${WITH_MYSQL} \
+            -DWITH_KERBEROS=${WITH_KERBEROS} \
             -DWITH_LZO=${WITH_LZO} \
             -DUSE_LIBCPP=${USE_LIBCPP} \
             -DBUILD_META_TOOL=${BUILD_META_TOOL} \
diff --git a/run-be-ut.sh b/run-be-ut.sh
index 85b2fc851d..af8ae5bac7 100755
--- a/run-be-ut.sh
+++ b/run-be-ut.sh
@@ -141,6 +141,7 @@ ${CMAKE_CMD} -G "${GENERATOR}" \
     -DGLIBC_COMPATIBILITY="${GLIBC_COMPATIBILITY}" \
     -DBUILD_META_TOOL=OFF \
     -DWITH_MYSQL=OFF \
+    -DWITH_KERBEROS=OFF \
     ${CMAKE_USE_CCACHE} ../
 ${BUILD_SYSTEM} -j ${PARALLEL} $RUN_FILE
 
diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh
index 3b7a108c28..d29ae4bf8e 100755
--- a/thirdparty/build-thirdparty.sh
+++ b/thirdparty/build-thirdparty.sh
@@ -904,16 +904,46 @@ build_gsasl() {
     make -j $PARALLEL && make install
 }
 
+# build_gsasl2 just for libgsasl1.8.0
+build_gsasl2() {
+    check_if_source_exist $GSASL2_SOURCE
+    cd $TP_SOURCE_DIR/$GSASL2_SOURCE
+    mkdir -p $BUILD_DIR && cd $BUILD_DIR
+    ../configure --prefix=$HDFS3_KRB5_INSTALL_DIR --with-gssapi-impl=mit --enable-shared=no --with-pic --with-libidn-prefix=$TP_INSTALL_DIR
+    make -j $PARALLEL && make install
+}
+
+# krb5
+build_krb5() {
+    check_if_source_exist $KRB5_SOURCE
+    cd $TP_SOURCE_DIR/$KRB5_SOURCE/src
+    mkdir -p $BUILD_DIR && cd $BUILD_DIR
+    CFLAGS="-fcommon" \
+    ../configure --prefix=$HDFS3_KRB5_INSTALL_DIR --disable-shared --enable-static
+    make -j $PARALLEL && make install
+}
+
 # hdfs3
 build_hdfs3() {
     check_if_source_exist $HDFS3_SOURCE
     cd $TP_SOURCE_DIR/$HDFS3_SOURCE
-    mkdir -p $BUILD_DIR && cd $BUILD_DIR
-    ../bootstrap --dependency=$TP_INSTALL_DIR --prefix=$TP_INSTALL_DIR
+    mkdir -p $BUILD_DIR && cd $BUILD_DIR && rm ./* -rf
+    # build libhdfs3 without kerberos
+    ../bootstrap --dependency="$TP_INSTALL_DIR" --prefix=$TP_INSTALL_DIR
     make CXXFLAGS="$libhdfs_cxx17" -j $PARALLEL
     make install
 }
 
+# hdfs3_with_kerberos
+build_hdfs3_with_kerberos() {
+    check_if_source_exist $HDFS3_SOURCE
+    cd $TP_SOURCE_DIR/$HDFS3_SOURCE
+    mkdir -p $BUILD_DIR && cd $BUILD_DIR && rm ./* -rf
+    # build libhdfs3 with kerberos support
+    ../bootstrap --dependency="$HDFS3_KRB5_INSTALL_DIR:$TP_INSTALL_DIR -DWITH_KERBEROS=true" --prefix=$HDFS3_KRB5_INSTALL_DIR
+    make CXXFLAGS="$libhdfs_cxx17" -j $PARALLEL
+    make install
+}
 # benchmark
 build_benchmark() {
     check_if_source_exist $BENCHMARK_SOURCE
@@ -999,7 +1029,10 @@ build_lzma
 build_xml2
 build_idn
 build_gsasl
+build_gsasl2
+build_krb5
 build_hdfs3
+build_hdfs3_with_kerberos
 build_benchmark
 build_breakpad
 build_simdjson
diff --git a/thirdparty/download-thirdparty.sh b/thirdparty/download-thirdparty.sh
index b17c6be0bd..6014611ab5 100755
--- a/thirdparty/download-thirdparty.sh
+++ b/thirdparty/download-thirdparty.sh
@@ -156,7 +156,7 @@ do
 done
 echo "===== Downloading thirdparty archives...done"
 
-# check if all tp archievs exists
+# check if all tp archives exists
 echo "===== Checking all thirdpart archives..."
 for TP_ARCH in ${TP_ARCHIVES[*]}
 do
@@ -271,6 +271,16 @@ fi
 cd -
 echo "Finished patching $S2_SOURCE"
 
+# gsasl2 patch to fix link error such as mutilple func defination
+# when link target with kerberos
+cd $TP_SOURCE_DIR/$GSASL2_SOURCE
+if [ ! -f $PATCHED_MARK ]; then
+    patch -p1 < $TP_PATCH_DIR/libgsasl-1.8.0.patch
+    touch $PATCHED_MARK
+fi
+cd -
+echo "Finished patching $GSASL2_SOURCE"
+
 # hdfs3 patch to fix compile error
 cd $TP_SOURCE_DIR/$HDFS3_SOURCE
 if [ ! -f $PATCHED_MARK ]; then
diff --git a/thirdparty/patches/libgsasl-1.8.0.patch b/thirdparty/patches/libgsasl-1.8.0.patch
new file mode 100644
index 0000000000..7fe5ebaeee
--- /dev/null
+++ b/thirdparty/patches/libgsasl-1.8.0.patch
@@ -0,0 +1,24 @@
+--- a/config.h.in	2012-05-29 01:11:28.000000000 +0800
++++ b/config.h.in	2022-05-11 18:06:02.000000000 +0800
+@@ -176,7 +176,7 @@
+ #undef HAVE_GSSAPI_H
+
+ /* Define to 1 if you have the `GSS_C_NT_HOSTBASED_SERVICE' function. */
+-#undef HAVE_GSS_C_NT_HOSTBASED_SERVICE
++#define HAVE_GSS_C_NT_HOSTBASED_SERVICE 1
+
+ /* Define to 1 if you have the `gss_decapsulate_token' function. */
+ #undef HAVE_GSS_DECAPSULATE_TOKEN
+@@ -185,10 +185,10 @@
+ #undef HAVE_GSS_ENCAPSULATE_TOKEN
+
+ /* Define to 1 if you have the `gss_inquire_mech_for_saslname' function. */
+-#undef HAVE_GSS_INQUIRE_MECH_FOR_SASLNAME
++#define HAVE_GSS_INQUIRE_MECH_FOR_SASLNAME 1
+
+ /* Define to 1 if you have the `gss_oid_equal' function. */
+-#undef HAVE_GSS_OID_EQUAL
++#define HAVE_GSS_OID_EQUAL 1
+
+ /* Define if you have the iconv() function and it works. */
+ #undef HAVE_ICONV
diff --git a/thirdparty/vars.sh b/thirdparty/vars.sh
index 65dbc0df23..12ece01216 100755
--- a/thirdparty/vars.sh
+++ b/thirdparty/vars.sh
@@ -32,6 +32,9 @@ export TP_SOURCE_DIR=$TP_DIR/src
 # thirdparties will be installed to here
 export TP_INSTALL_DIR=$TP_DIR/installed
 
+# libhdfs3-with-kerberos will be installed to here
+export HDFS3_KRB5_INSTALL_DIR=$TP_INSTALL_DIR/libhdfs_with_kerberos
+
 # patches for all thirdparties
 export TP_PATCH_DIR=$TP_DIR/patches
 
@@ -44,8 +47,8 @@ export TP_LIB_DIR=$TP_INSTALL_DIR/lib
 # all java libraries will be unpacked to here
 export TP_JAR_DIR=$TP_INSTALL_DIR/lib/jar
 
-# source of all dependencies
-export REPOSITORY_URL=https://doris-thirdparty-repo.bj.bcebos.com/thirdparty
+# source of all dependencies, default unuse it
+# export REPOSITORY_URL=https://doris-thirdparty-repo.bj.bcebos.com/thirdparty
 
 #####################################################
 # Download url, filename and unpaced filename
@@ -326,6 +329,17 @@ GSASL_NAME="libgsasl-1.10.0.tar.gz"
 GSASL_SOURCE="libgsasl-1.10.0"
 GSASL_MD5SUM="9c8fc632da4ce108fb7581b33de2a5ce"
 
+GSASL2_DOWNLOAD="https://ftp.gnu.org/gnu/gsasl/libgsasl-1.8.0.tar.gz"
+GSASL2_NAME="libgsasl-1.8.0.tar.gz"
+GSASL2_SOURCE="libgsasl-1.8.0"
+GSASL2_MD5SUM="5dbdf859f6e60e05813370e2b193b92b"
+
+# krb5
+KRB5_DOWNLOAD="https://kerberos.org/dist/krb5/1.19/krb5-1.19.tar.gz"
+KRB5_NAME="krb5-1.19.tar.gz"
+KRB5_SOURCE="krb5-1.19"
+KRB5_MD5SUM="aaf18447a5a014aa3b7e81814923f4c9"
+
 # hdfs3
 HDFS3_DOWNLOAD="https://doris-thirdparty-repo.bj.bcebos.com/thirdparty/libhdfs3-master.zip"
 HDFS3_NAME="libhdfs3-master.zip"
@@ -423,6 +437,8 @@ LZMA
 XML2
 IDN
 GSASL
+GSASL2
+KRB5
 HDFS3
 LIBDIVIDE
 PDQSORT


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 04/13: [fix] UT MathFunctionTest.round_test fix (#9447)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 2cc31df3c53071226f1c2d5bce9c702c3f744bfa
Author: camby <10...@qq.com>
AuthorDate: Mon May 9 09:37:27 2022 +0800

    [fix] UT MathFunctionTest.round_test fix (#9447)
    
    Function round support two format round(double) and round(double, int), the argument is variadic.
    But FunctionBinaryArithmetic not support variadic argument now, make get_function for round(double, int) failed.
    
    reproduce steps:
    1. set enable_vectorized_engine=true;
    2. try to call round(double, int);
    ```
    > select round(10.12345,2);
    ERROR 1105 (HY000): errCode = 2, detailMessage = Function round is not implemented
    ```
---
 be/src/vec/functions/function_binary_arithmetic.h | 30 ++++++++++++++---------
 1 file changed, 19 insertions(+), 11 deletions(-)

diff --git a/be/src/vec/functions/function_binary_arithmetic.h b/be/src/vec/functions/function_binary_arithmetic.h
index 74b7df4260..ad3ad38bdf 100644
--- a/be/src/vec/functions/function_binary_arithmetic.h
+++ b/be/src/vec/functions/function_binary_arithmetic.h
@@ -58,19 +58,20 @@ struct ModuloImpl;
 template <template <typename, typename> typename Operation>
 struct OperationTraits {
     using T = UInt8;
-    static constexpr bool is_plus_minus = std::is_same_v<Operation<T, T>, PlusImpl<T, T>> ||
-                                          std::is_same_v<Operation<T, T>, MinusImpl<T, T>>;
-    static constexpr bool is_multiply = std::is_same_v<Operation<T, T>, MultiplyImpl<T, T>>;
-    static constexpr bool is_division = std::is_same_v<Operation<T, T>, DivideFloatingImpl<T, T>> ||
-                                        std::is_same_v<Operation<T, T>, DivideIntegralImpl<T, T>>;
+    using Op = Operation<T, T>;
+    static constexpr bool is_plus_minus =
+            std::is_same_v<Op, PlusImpl<T, T>> || std::is_same_v<Op, MinusImpl<T, T>>;
+    static constexpr bool is_multiply = std::is_same_v<Op, MultiplyImpl<T, T>>;
+    static constexpr bool is_division = std::is_same_v<Op, DivideFloatingImpl<T, T>> ||
+                                        std::is_same_v<Op, DivideIntegralImpl<T, T>>;
     static constexpr bool allow_decimal =
-            std::is_same_v<Operation<T, T>, PlusImpl<T, T>> ||
-            std::is_same_v<Operation<T, T>, MinusImpl<T, T>> ||
-            std::is_same_v<Operation<T, T>, MultiplyImpl<T, T>> ||
-            std::is_same_v<Operation<T, T>, ModuloImpl<T, T>> ||
-            std::is_same_v<Operation<T, T>, DivideFloatingImpl<T, T>> ||
-            std::is_same_v<Operation<T, T>, DivideIntegralImpl<T, T>>;
+            std::is_same_v<Op, PlusImpl<T, T>> || std::is_same_v<Op, MinusImpl<T, T>> ||
+            std::is_same_v<Op, MultiplyImpl<T, T>> || std::is_same_v<Op, ModuloImpl<T, T>> ||
+            std::is_same_v<Op, DivideFloatingImpl<T, T>> ||
+            std::is_same_v<Op, DivideIntegralImpl<T, T>>;
     static constexpr bool can_overflow = is_plus_minus || is_multiply;
+    static constexpr bool has_variadic_argument =
+            !std::is_void_v<decltype(has_variadic_argument_types(std::declval<Op>()))>;
 };
 
 template <typename A, typename B, typename Op, typename ResultType = typename Op::ResultType>
@@ -756,6 +757,13 @@ public:
 
     size_t get_number_of_arguments() const override { return 2; }
 
+    DataTypes get_variadic_argument_types_impl() const override {
+        if constexpr (OpTraits::has_variadic_argument) {
+            return OpTraits::Op::get_variadic_argument_types();
+        }
+        return {};
+    }
+
     DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
         DataTypePtr type_res;
         bool valid = cast_both_types(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org