You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/07/31 03:12:10 UTC

[doris] branch branch-2.0 updated (e5519e6c6c -> b6fbc0ad3d)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a change to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


    from e5519e6c6c [fix](cast) fix wrong result of casting empty string to array date (#22281)
     new 8ac0c83df7 [improve](agg) support distinct agg node (#22169)
     new 88085d9076 [bug](distinct-agg) fix distinct-agg outblock columns size not equal key size (#22357)
     new b6fbc0ad3d [fix](ipv6)Remove restrictions from IPv4 when add backend (#22323)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/exec_node.cpp                          |   7 +-
 ...istinct_streaming_aggregation_sink_operator.cpp |  97 +++++++++++++++
 ...distinct_streaming_aggregation_sink_operator.h} |  27 ++--
 ...tinct_streaming_aggregation_source_operator.cpp |  92 ++++++++++++++
 ...stinct_streaming_aggregation_source_operator.h} |  15 ++-
 be/src/pipeline/pipeline_fragment_context.cpp      |  17 ++-
 be/src/vec/exec/distinct_vaggregation_node.cpp     | 136 +++++++++++++++++++++
 be/src/vec/exec/distinct_vaggregation_node.h       |  55 +++++++++
 be/src/vec/exec/vaggregation_node.cpp              |  25 ++--
 be/src/vec/exec/vaggregation_node.h                |  49 ++++----
 .../org/apache/doris/common/util/NetUtils.java     |  14 +--
 .../expressions/functions/agg/WindowFunnel.java    |   8 +-
 .../org/apache/doris/system/SystemInfoService.java |   5 -
 .../apache/doris/system/SystemInfoServiceTest.java |  32 +++++
 14 files changed, 509 insertions(+), 70 deletions(-)
 create mode 100644 be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
 copy be/src/pipeline/exec/{streaming_aggregation_sink_operator.h => distinct_streaming_aggregation_sink_operator.h} (65%)
 create mode 100644 be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
 copy be/src/pipeline/exec/{streaming_aggregation_source_operator.h => distinct_streaming_aggregation_source_operator.h} (70%)
 create mode 100644 be/src/vec/exec/distinct_vaggregation_node.cpp
 create mode 100644 be/src/vec/exec/distinct_vaggregation_node.h


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 01/03: [improve](agg) support distinct agg node (#22169)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 8ac0c83df744705292d08f1e0b8aeaaad9992903
Author: zhangstar333 <87...@users.noreply.github.com>
AuthorDate: Fri Jul 28 13:54:10 2023 +0800

    [improve](agg) support distinct agg node (#22169)
    
    select c_name from customer union select c_name from customer
    this sql used agg node to get distinct row of c_name,
    so it's no need to wait for inserted all data to hash map,
    could output the data which it's inserted into hash map successed.
---
 be/src/exec/exec_node.cpp                          |   7 +-
 ...istinct_streaming_aggregation_sink_operator.cpp |  97 +++++++++++++++
 .../distinct_streaming_aggregation_sink_operator.h |  76 ++++++++++++
 ...tinct_streaming_aggregation_source_operator.cpp |  92 ++++++++++++++
 ...istinct_streaming_aggregation_source_operator.h |  67 ++++++++++
 be/src/pipeline/pipeline_fragment_context.cpp      |  17 ++-
 be/src/vec/exec/distinct_vaggregation_node.cpp     | 136 +++++++++++++++++++++
 be/src/vec/exec/distinct_vaggregation_node.h       |  55 +++++++++
 be/src/vec/exec/vaggregation_node.cpp              |  25 ++--
 be/src/vec/exec/vaggregation_node.h                |  49 ++++----
 .../expressions/functions/agg/WindowFunnel.java    |   8 +-
 11 files changed, 588 insertions(+), 41 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index c6b7826deb..bcb0771eda 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -44,6 +44,7 @@
 #include "util/uid_util.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/core/block.h"
+#include "vec/exec/distinct_vaggregation_node.h"
 #include "vec/exec/join/vhash_join_node.h"
 #include "vec/exec/join/vnested_loop_join_node.h"
 #include "vec/exec/scan/new_es_scan_node.h"
@@ -371,7 +372,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
         return Status::OK();
 
     case TPlanNodeType::AGGREGATION_NODE:
-        *node = pool->add(new vectorized::AggregationNode(pool, tnode, descs));
+        if (tnode.agg_node.aggregate_functions.empty() && state->enable_pipeline_exec()) {
+            *node = pool->add(new vectorized::DistinctAggregationNode(pool, tnode, descs));
+        } else {
+            *node = pool->add(new vectorized::AggregationNode(pool, tnode, descs));
+        }
         return Status::OK();
 
     case TPlanNodeType::HASH_JOIN_NODE:
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
new file mode 100644
index 0000000000..48695ed56f
--- /dev/null
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_sink_operator.h"
+
+#include <gen_cpp/Metrics_types.h>
+
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+#include "pipeline/exec/data_queue.h"
+#include "pipeline/exec/operator.h"
+#include "vec/exec/distinct_vaggregation_node.h"
+#include "vec/exec/vaggregation_node.h"
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+DistinctStreamingAggSinkOperator::DistinctStreamingAggSinkOperator(
+        OperatorBuilderBase* operator_builder, ExecNode* agg_node, std::shared_ptr<DataQueue> queue)
+        : StreamingOperator(operator_builder, agg_node), _data_queue(std::move(queue)) {}
+
+bool DistinctStreamingAggSinkOperator::can_write() {
+    // sink and source in diff threads
+    return _data_queue->has_enough_space_to_push();
+}
+
+Status DistinctStreamingAggSinkOperator::sink(RuntimeState* state, vectorized::Block* in_block,
+                                              SourceState source_state) {
+    if (in_block && in_block->rows() > 0) {
+        if (_output_block == nullptr) {
+            _output_block = _data_queue->get_free_block();
+        }
+        RETURN_IF_ERROR(
+                _node->_distinct_pre_agg_with_serialized_key(in_block, _output_block.get()));
+
+        // get enough data or reached limit rows, need push block to queue
+        if (_node->limit() != -1 &&
+            (_output_block->rows() + _output_distinct_rows) >= _node->limit()) {
+            auto limit_rows = _node->limit() - _output_block->rows();
+            _output_block->set_num_rows(limit_rows);
+            _output_distinct_rows += limit_rows;
+            _data_queue->push_block(std::move(_output_block));
+        } else if (_output_block->rows() >= state->batch_size()) {
+            _output_distinct_rows += _output_block->rows();
+            _data_queue->push_block(std::move(_output_block));
+        }
+    }
+
+    // reach limit or source finish
+    if ((UNLIKELY(source_state == SourceState::FINISHED)) || reached_limited_rows()) {
+        if (_output_block != nullptr) { //maybe the last block with eos
+            _output_distinct_rows += _output_block->rows();
+            _data_queue->push_block(std::move(_output_block));
+        }
+        _data_queue->set_finish();
+        return Status::Error<ErrorCode::END_OF_FILE>("");
+    }
+    return Status::OK();
+}
+
+Status DistinctStreamingAggSinkOperator::close(RuntimeState* state) {
+    if (_data_queue && !_data_queue->is_finish()) {
+        // finish should be set, if not set here means error.
+        _data_queue->set_canceled();
+    }
+    return StreamingOperator::close(state);
+}
+
+DistinctStreamingAggSinkOperatorBuilder::DistinctStreamingAggSinkOperatorBuilder(
+        int32_t id, ExecNode* exec_node, std::shared_ptr<DataQueue> queue)
+        : OperatorBuilder(id, "DistinctStreamingAggSinkOperator", exec_node),
+          _data_queue(std::move(queue)) {}
+
+OperatorPtr DistinctStreamingAggSinkOperatorBuilder::build_operator() {
+    return std::make_shared<DistinctStreamingAggSinkOperator>(this, _node, _data_queue);
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
new file mode 100644
index 0000000000..ae7106178e
--- /dev/null
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/exec/distinct_vaggregation_node.h"
+#include "vec/exec/vaggregation_node.h"
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+
+namespace pipeline {
+class DataQueue;
+
+class DistinctStreamingAggSinkOperatorBuilder final
+        : public OperatorBuilder<vectorized::DistinctAggregationNode> {
+public:
+    DistinctStreamingAggSinkOperatorBuilder(int32_t, ExecNode*, std::shared_ptr<DataQueue>);
+
+    OperatorPtr build_operator() override;
+
+    bool is_sink() const override { return true; }
+    bool is_source() const override { return false; }
+
+private:
+    std::shared_ptr<DataQueue> _data_queue;
+};
+
+class DistinctStreamingAggSinkOperator final
+        : public StreamingOperator<DistinctStreamingAggSinkOperatorBuilder> {
+public:
+    DistinctStreamingAggSinkOperator(OperatorBuilderBase* operator_builder, ExecNode*,
+                                     std::shared_ptr<DataQueue>);
+
+    Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override;
+
+    bool can_write() override;
+
+    Status close(RuntimeState* state) override;
+
+    bool reached_limited_rows() {
+        return _node->limit() != -1 && _output_distinct_rows > _node->limit();
+    }
+
+private:
+    int64_t _output_distinct_rows = 0;
+    std::shared_ptr<DataQueue> _data_queue;
+    std::unique_ptr<vectorized::Block> _output_block = vectorized::Block::create_unique();
+};
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
new file mode 100644
index 0000000000..f91fd3fbe3
--- /dev/null
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_source_operator.h"
+
+#include <utility>
+
+#include "pipeline/exec/data_queue.h"
+#include "pipeline/exec/operator.h"
+#include "runtime/descriptors.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/exec/distinct_vaggregation_node.h"
+#include "vec/exec/vaggregation_node.h"
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+
+namespace pipeline {
+DistinctStreamingAggSourceOperator::DistinctStreamingAggSourceOperator(
+        OperatorBuilderBase* templ, ExecNode* node, std::shared_ptr<DataQueue> queue)
+        : SourceOperator(templ, node), _data_queue(std::move(queue)) {}
+
+bool DistinctStreamingAggSourceOperator::can_read() {
+    return _data_queue->has_data_or_finished();
+}
+
+Status DistinctStreamingAggSourceOperator::pull_data(RuntimeState* state, vectorized::Block* block,
+                                                     bool* eos) {
+    std::unique_ptr<vectorized::Block> agg_block;
+    RETURN_IF_ERROR(_data_queue->get_block_from_queue(&agg_block));
+    if (agg_block != nullptr) {
+        block->swap(*agg_block);
+        agg_block->clear_column_data(_node->row_desc().num_materialized_slots());
+        _data_queue->push_free_block(std::move(agg_block));
+    }
+    if (_data_queue->data_exhausted()) { //the sink is eos or reached limit
+        *eos = true;
+    }
+    _node->_make_nullable_output_key(block);
+    if (_node->is_streaming_preagg() == false) {
+        // dispose the having clause, should not be execute in prestreaming agg
+        RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_node->get_conjuncts(), block,
+                                                               block->columns()));
+    }
+
+    rows_have_returned += block->rows();
+    return Status::OK();
+}
+
+Status DistinctStreamingAggSourceOperator::get_block(RuntimeState* state, vectorized::Block* block,
+                                                     SourceState& source_state) {
+    bool eos = false;
+    RETURN_IF_ERROR(_node->get_next_after_projects(
+            state, block, &eos,
+            std::bind(&DistinctStreamingAggSourceOperator::pull_data, this, std::placeholders::_1,
+                      std::placeholders::_2, std::placeholders::_3)));
+    if (UNLIKELY(eos)) {
+        _node->set_num_rows_returned(rows_have_returned);
+        source_state = SourceState::FINISHED;
+    } else {
+        source_state = SourceState::DEPEND_ON_SOURCE;
+    }
+    return Status::OK();
+}
+
+DistinctStreamingAggSourceOperatorBuilder::DistinctStreamingAggSourceOperatorBuilder(
+        int32_t id, ExecNode* exec_node, std::shared_ptr<DataQueue> queue)
+        : OperatorBuilder(id, "DistinctStreamingAggSourceOperator", exec_node),
+          _data_queue(std::move(queue)) {}
+
+OperatorPtr DistinctStreamingAggSourceOperatorBuilder::build_operator() {
+    return std::make_shared<DistinctStreamingAggSourceOperator>(this, _node, _data_queue);
+}
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h
new file mode 100644
index 0000000000..3534193bf8
--- /dev/null
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <stdint.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "vec/exec/distinct_vaggregation_node.h"
+#include "vec/exec/vaggregation_node.h"
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+namespace pipeline {
+class DataQueue;
+
+class DistinctStreamingAggSourceOperatorBuilder final
+        : public OperatorBuilder<vectorized::DistinctAggregationNode> {
+public:
+    DistinctStreamingAggSourceOperatorBuilder(int32_t, ExecNode*, std::shared_ptr<DataQueue>);
+
+    bool is_source() const override { return true; }
+
+    OperatorPtr build_operator() override;
+
+private:
+    std::shared_ptr<DataQueue> _data_queue;
+};
+
+class DistinctStreamingAggSourceOperator final
+        : public SourceOperator<DistinctStreamingAggSourceOperatorBuilder> {
+public:
+    DistinctStreamingAggSourceOperator(OperatorBuilderBase*, ExecNode*, std::shared_ptr<DataQueue>);
+    bool can_read() override;
+    Status get_block(RuntimeState*, vectorized::Block*, SourceState& source_state) override;
+    Status open(RuntimeState*) override { return Status::OK(); }
+    Status pull_data(RuntimeState* state, vectorized::Block* output_block, bool* eos);
+
+private:
+    int64_t rows_have_returned = 0;
+    std::shared_ptr<DataQueue> _data_queue;
+};
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 69848be82a..efbfe3ac0f 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -48,6 +48,8 @@
 #include "pipeline/exec/const_value_operator.h"
 #include "pipeline/exec/data_queue.h"
 #include "pipeline/exec/datagen_operator.h"
+#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h"
+#include "pipeline/exec/distinct_streaming_aggregation_source_operator.h"
 #include "pipeline/exec/empty_set_operator.h"
 #include "pipeline/exec/empty_source_operator.h"
 #include "pipeline/exec/exchange_sink_operator.h"
@@ -500,10 +502,21 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
         break;
     }
     case TPlanNodeType::AGGREGATION_NODE: {
-        auto* agg_node = assert_cast<vectorized::AggregationNode*>(node);
+        auto* agg_node = dynamic_cast<vectorized::AggregationNode*>(node);
         auto new_pipe = add_pipeline();
         RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipe));
-        if (agg_node->is_streaming_preagg()) {
+        if (agg_node->is_aggregate_evaluators_empty()) {
+            auto data_queue = std::make_shared<DataQueue>(1);
+            OperatorBuilderPtr pre_agg_sink =
+                    std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node,
+                                                                              data_queue);
+            RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink));
+
+            OperatorBuilderPtr pre_agg_source =
+                    std::make_shared<DistinctStreamingAggSourceOperatorBuilder>(
+                            node->id(), agg_node, data_queue);
+            RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
+        } else if (agg_node->is_streaming_preagg()) {
             auto data_queue = std::make_shared<DataQueue>(1);
             OperatorBuilderPtr pre_agg_sink = std::make_shared<StreamingAggSinkOperatorBuilder>(
                     node->id(), agg_node, data_queue);
diff --git a/be/src/vec/exec/distinct_vaggregation_node.cpp b/be/src/vec/exec/distinct_vaggregation_node.cpp
new file mode 100644
index 0000000000..bbbd196411
--- /dev/null
+++ b/be/src/vec/exec/distinct_vaggregation_node.cpp
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/distinct_vaggregation_node.h"
+
+#include "runtime/runtime_state.h"
+#include "vec/aggregate_functions/aggregate_function_uniq.h"
+#include "vec/exec/vaggregation_node.h"
+
+namespace doris {
+class ObjectPool;
+} // namespace doris
+
+namespace doris::vectorized {
+
+DistinctAggregationNode::DistinctAggregationNode(ObjectPool* pool, const TPlanNode& tnode,
+                                                 const DescriptorTbl& descs)
+        : AggregationNode(pool, tnode, descs) {
+    dummy_mapped_data = pool->add(new char('A'));
+}
+
+Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
+        doris::vectorized::Block* in_block, doris::vectorized::Block* out_block) {
+    SCOPED_TIMER(_build_timer);
+    DCHECK(!_probe_expr_ctxs.empty());
+
+    size_t key_size = _probe_expr_ctxs.size();
+    ColumnRawPtrs key_columns(key_size);
+    {
+        SCOPED_TIMER(_expr_timer);
+        for (size_t i = 0; i < key_size; ++i) {
+            int result_column_id = -1;
+            RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(in_block, &result_column_id));
+            in_block->get_by_position(result_column_id).column =
+                    in_block->get_by_position(result_column_id)
+                            .column->convert_to_full_column_if_const();
+            key_columns[i] = in_block->get_by_position(result_column_id).column.get();
+        }
+    }
+
+    int rows = in_block->rows();
+    IColumn::Selector distinct_row;
+    distinct_row.reserve(rows);
+
+    RETURN_IF_CATCH_EXCEPTION(
+            _emplace_into_hash_table_to_distinct(distinct_row, key_columns, rows));
+
+    bool mem_reuse = _make_nullable_keys.empty() && out_block->mem_reuse();
+    if (mem_reuse) {
+        for (int i = 0; i < key_size; ++i) {
+            auto dst = out_block->get_by_position(i).column->assume_mutable();
+            key_columns[i]->append_data_by_selector(dst, distinct_row);
+        }
+    } else {
+        ColumnsWithTypeAndName columns_with_schema;
+        for (int i = 0; i < key_size; ++i) {
+            auto distinct_column = key_columns[i]->clone_empty();
+            key_columns[i]->append_data_by_selector(distinct_column, distinct_row);
+            columns_with_schema.emplace_back(std::move(distinct_column),
+                                             _probe_expr_ctxs[i]->root()->data_type(),
+                                             _probe_expr_ctxs[i]->root()->expr_name());
+        }
+        out_block->swap(Block(columns_with_schema));
+    }
+    return Status::OK();
+}
+
+void DistinctAggregationNode::_emplace_into_hash_table_to_distinct(IColumn::Selector& distinct_row,
+                                                                   ColumnRawPtrs& key_columns,
+                                                                   const size_t num_rows) {
+    std::visit(
+            [&](auto&& agg_method) -> void {
+                SCOPED_TIMER(_hash_table_compute_timer);
+                using HashMethodType = std::decay_t<decltype(agg_method)>;
+                using HashTableType = std::decay_t<decltype(agg_method.data)>;
+                using AggState = typename HashMethodType::State;
+                AggState state(key_columns, _probe_key_sz, nullptr);
+                _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows);
+
+                if constexpr (HashTableTraits<HashTableType>::is_phmap) {
+                    if (_hash_values.size() < num_rows) {
+                        _hash_values.resize(num_rows);
+                    }
+                    if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
+                                          AggState>::value) {
+                        for (size_t i = 0; i < num_rows; ++i) {
+                            _hash_values[i] = agg_method.data.hash(agg_method.keys[i]);
+                        }
+                    } else {
+                        for (size_t i = 0; i < num_rows; ++i) {
+                            _hash_values[i] =
+                                    agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool));
+                        }
+                    }
+                }
+
+                /// For all rows.
+                COUNTER_UPDATE(_hash_table_input_counter, num_rows);
+                for (size_t i = 0; i < num_rows; ++i) {
+                    auto emplace_result = [&]() {
+                        if constexpr (HashTableTraits<HashTableType>::is_phmap) {
+                            if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) {
+                                agg_method.data.prefetch_by_hash(
+                                        _hash_values[i + HASH_MAP_PREFETCH_DIST]);
+                            }
+                            return state.emplace_key(agg_method.data, _hash_values[i], i,
+                                                     *_agg_arena_pool);
+                        } else {
+                            return state.emplace_key(agg_method.data, i, *_agg_arena_pool);
+                        }
+                    }();
+
+                    if (emplace_result.is_inserted()) {
+                        emplace_result.set_mapped(dummy_mapped_data);
+                        distinct_row.push_back(i);
+                    }
+                }
+            },
+            _agg_data->_aggregated_method_variant);
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/distinct_vaggregation_node.h b/be/src/vec/exec/distinct_vaggregation_node.h
new file mode 100644
index 0000000000..f5ca0ceebb
--- /dev/null
+++ b/be/src/vec/exec/distinct_vaggregation_node.h
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <cstdint>
+#include <memory>
+
+#include "vec/exec/vaggregation_node.h"
+#include "vec/exprs/vectorized_agg_fn.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/exprs/vslot_ref.h"
+
+namespace doris {
+class TPlanNode;
+class DescriptorTbl;
+class ObjectPool;
+class RuntimeState;
+
+namespace vectorized {
+
+// select c_name from customer union select c_name from customer
+// this sql used agg node to get distinct row of c_name,
+// so it's could output data when it's inserted into hashmap.
+// phase1: (_is_merge:false, _needs_finalize:false, Streaming Preaggregation:true, agg size:0, limit:-1)
+// phase2: (_is_merge:false, _needs_finalize:true,  Streaming Preaggregation:false,agg size:0, limit:-1)
+class DistinctAggregationNode final : public AggregationNode {
+public:
+    DistinctAggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+    ~DistinctAggregationNode() override = default;
+    Status _distinct_pre_agg_with_serialized_key(Block* in_block, Block* out_block);
+    void set_num_rows_returned(int64_t rows) { _num_rows_returned = rows; }
+    vectorized::VExprContextSPtrs get_conjuncts() { return _conjuncts; }
+
+private:
+    char* dummy_mapped_data = nullptr;
+    void _emplace_into_hash_table_to_distinct(IColumn::Selector& distinct_row,
+                                              ColumnRawPtrs& key_columns, const size_t num_rows);
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index d7ebfe0a51..f4da6d9aaf 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -17,6 +17,7 @@
 
 #include "vec/exec/vaggregation_node.h"
 
+#include <fmt/format.h>
 #include <gen_cpp/Exprs_types.h>
 #include <gen_cpp/Metrics_types.h>
 #include <gen_cpp/PlanNodes_types.h>
@@ -25,6 +26,7 @@
 #include <array>
 #include <atomic>
 #include <memory>
+#include <string>
 
 #include "exec/exec_node.h"
 #include "runtime/block_spill_manager.h"
@@ -102,27 +104,27 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
 AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
                                  const DescriptorTbl& descs)
         : ExecNode(pool, tnode, descs),
+          _hash_table_compute_timer(nullptr),
+          _hash_table_input_counter(nullptr),
+          _build_timer(nullptr),
+          _expr_timer(nullptr),
+          _exec_timer(nullptr),
           _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
           _intermediate_tuple_desc(nullptr),
           _output_tuple_id(tnode.agg_node.output_tuple_id),
           _output_tuple_desc(nullptr),
           _needs_finalize(tnode.agg_node.need_finalize),
           _is_merge(false),
-          _build_timer(nullptr),
           _serialize_key_timer(nullptr),
-          _exec_timer(nullptr),
           _merge_timer(nullptr),
-          _expr_timer(nullptr),
           _get_results_timer(nullptr),
           _serialize_data_timer(nullptr),
           _serialize_result_timer(nullptr),
           _deserialize_data_timer(nullptr),
-          _hash_table_compute_timer(nullptr),
           _hash_table_iterate_timer(nullptr),
           _insert_keys_to_column_timer(nullptr),
           _streaming_agg_timer(nullptr),
           _hash_table_size_counter(nullptr),
-          _hash_table_input_counter(nullptr),
           _max_row_size_counter(nullptr) {
     if (tnode.agg_node.__isset.use_streaming_preaggregation) {
         _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
@@ -454,7 +456,6 @@ Status AggregationNode::prepare_profile(RuntimeState* state) {
         }
 
         if (_is_streaming_preagg) {
-            runtime_profile()->append_exec_option("Streaming Preaggregation");
             _executor.pre_agg =
                     std::bind<Status>(&AggregationNode::_pre_agg_with_serialized_key, this,
                                       std::placeholders::_1, std::placeholders::_2);
@@ -478,6 +479,14 @@ Status AggregationNode::prepare_profile(RuntimeState* state) {
                                _needs_finalize;      // agg's finalize step
     }
 
+    fmt::memory_buffer msg;
+    fmt::format_to(msg,
+                   "(_is_merge: {}, _needs_finalize: {}, Streaming Preaggregation: {}, agg size: "
+                   "{}, limit: {})",
+                   _is_merge ? "true" : "false", _needs_finalize ? "true" : "false",
+                   _is_streaming_preagg ? "true" : "false",
+                   std::to_string(_aggregate_evaluators.size()), std::to_string(_limit));
+    runtime_profile()->add_info_string("AggInfos:", fmt::to_string(msg));
     return Status::OK();
 }
 
@@ -918,7 +927,9 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
                 _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows);
 
                 if constexpr (HashTableTraits<HashTableType>::is_phmap) {
-                    if (_hash_values.size() < num_rows) _hash_values.resize(num_rows);
+                    if (_hash_values.size() < num_rows) {
+                        _hash_values.resize(num_rows);
+                    }
                     if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
                                           AggState>::value) {
                         for (size_t i = 0; i < num_rows; ++i) {
diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h
index 9d6f4c4979..3418a004bd 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -865,7 +865,7 @@ struct SpillPartitionHelper {
 };
 
 // not support spill
-class AggregationNode final : public ::doris::ExecNode {
+class AggregationNode : public ::doris::ExecNode {
 public:
     using Sizes = std::vector<size_t>;
 
@@ -883,18 +883,34 @@ public:
     Status sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override;
     Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* output_block);
     bool is_streaming_preagg() const { return _is_streaming_preagg; }
+    bool is_aggregate_evaluators_empty() const { return _aggregate_evaluators.empty(); }
+    void _make_nullable_output_key(Block* block);
 
-private:
-    friend class pipeline::AggSinkOperator;
-    friend class pipeline::StreamingAggSinkOperator;
-    friend class pipeline::AggSourceOperator;
-    friend class pipeline::StreamingAggSourceOperator;
+protected:
+    bool _is_streaming_preagg;
+    bool _child_eos = false;
+    Block _preagg_block = Block();
+    ArenaUPtr _agg_arena_pool;
     // group by k1,k2
     VExprContextSPtrs _probe_expr_ctxs;
+    AggregatedDataVariantsUPtr _agg_data;
+
+    std::vector<size_t> _probe_key_sz;
+    std::vector<size_t> _hash_values;
     // left / full join will change the key nullable make output/input solt
     // nullable diff. so we need make nullable of it.
     std::vector<size_t> _make_nullable_keys;
-    std::vector<size_t> _probe_key_sz;
+    RuntimeProfile::Counter* _hash_table_compute_timer;
+    RuntimeProfile::Counter* _hash_table_input_counter;
+    RuntimeProfile::Counter* _build_timer;
+    RuntimeProfile::Counter* _expr_timer;
+    RuntimeProfile::Counter* _exec_timer;
+
+private:
+    friend class pipeline::AggSinkOperator;
+    friend class pipeline::StreamingAggSinkOperator;
+    friend class pipeline::AggSourceOperator;
+    friend class pipeline::StreamingAggSourceOperator;
 
     std::vector<AggFnEvaluator*> _aggregate_evaluators;
     bool _can_short_circuit = false;
@@ -920,47 +936,32 @@ private:
     size_t _external_agg_bytes_threshold;
     size_t _partitioned_threshold = 0;
 
-    AggregatedDataVariantsUPtr _agg_data;
-
     AggSpillContext _spill_context;
     std::unique_ptr<SpillPartitionHelper> _spill_partition_helper;
 
-    ArenaUPtr _agg_arena_pool;
-
-    RuntimeProfile::Counter* _build_timer;
     RuntimeProfile::Counter* _build_table_convert_timer;
     RuntimeProfile::Counter* _serialize_key_timer;
-    RuntimeProfile::Counter* _exec_timer;
     RuntimeProfile::Counter* _merge_timer;
-    RuntimeProfile::Counter* _expr_timer;
     RuntimeProfile::Counter* _get_results_timer;
     RuntimeProfile::Counter* _serialize_data_timer;
     RuntimeProfile::Counter* _serialize_result_timer;
     RuntimeProfile::Counter* _deserialize_data_timer;
-    RuntimeProfile::Counter* _hash_table_compute_timer;
     RuntimeProfile::Counter* _hash_table_iterate_timer;
     RuntimeProfile::Counter* _insert_keys_to_column_timer;
     RuntimeProfile::Counter* _streaming_agg_timer;
     RuntimeProfile::Counter* _hash_table_size_counter;
-    RuntimeProfile::Counter* _hash_table_input_counter;
     RuntimeProfile::Counter* _max_row_size_counter;
-
     RuntimeProfile::Counter* _memory_usage_counter;
     RuntimeProfile::Counter* _hash_table_memory_usage;
     RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage;
 
-    bool _is_streaming_preagg;
-    Block _preagg_block = Block();
     bool _should_expand_hash_table = true;
-    bool _child_eos = false;
-
     bool _should_limit_output = false;
     bool _reach_limit = false;
     bool _agg_data_created_without_key = false;
 
     PODArray<AggregateDataPtr> _places;
     std::vector<char> _deserialize_buffer;
-    std::vector<size_t> _hash_values;
     std::vector<AggregateDataPtr> _values;
     std::unique_ptr<AggregateDataContainer> _aggregate_data_container;
 
@@ -971,8 +972,6 @@ private:
 
     size_t _get_hash_table_size();
 
-    void _make_nullable_output_key(Block* block);
-
     Status _create_agg_status(AggregateDataPtr data);
     Status _destroy_agg_status(AggregateDataPtr data);
 
@@ -1003,6 +1002,7 @@ private:
     void _close_with_serialized_key();
     void _init_hash_method(const VExprContextSPtrs& probe_exprs);
 
+protected:
     template <typename AggState, typename AggMethod>
     void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method,
                                     const ColumnRawPtrs& key_columns, const size_t num_rows) {
@@ -1017,6 +1017,7 @@ private:
         }
     }
 
+private:
     template <bool limit>
     Status _execute_with_serialized_key_helper(Block* block) {
         SCOPED_TIMER(_build_timer);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
index c37d66471d..d19f63f658 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
@@ -31,7 +31,6 @@ import org.apache.doris.nereids.types.DateType;
 import org.apache.doris.nereids.types.DateV2Type;
 import org.apache.doris.nereids.types.IntegerType;
 import org.apache.doris.nereids.types.StringType;
-import org.apache.doris.nereids.types.VarcharType;
 import org.apache.doris.nereids.util.ExpressionUtils;
 
 import com.google.common.base.Preconditions;
@@ -47,12 +46,7 @@ public class WindowFunnel extends AggregateFunction
 
     public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
             FunctionSignature.ret(IntegerType.INSTANCE)
-                    .varArgs(BigIntType.INSTANCE, StringType.INSTANCE, DateTimeType.INSTANCE, BooleanType.INSTANCE),
-            FunctionSignature.ret(VarcharType.SYSTEM_DEFAULT)
-                    .args(BigIntType.INSTANCE,
-                            StringType.INSTANCE,
-                            DateTimeV2Type.SYSTEM_DEFAULT,
-                            BooleanType.INSTANCE)
+                    .varArgs(BigIntType.INSTANCE, StringType.INSTANCE, DateTimeType.INSTANCE, BooleanType.INSTANCE)
     );
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 02/03: [bug](distinct-agg) fix distinct-agg outblock columns size not equal key size (#22357)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 88085d9076a2ba921735039af37e2b6ccd894dee
Author: zhangstar333 <87...@users.noreply.github.com>
AuthorDate: Sat Jul 29 12:44:44 2023 +0800

    [bug](distinct-agg) fix distinct-agg outblock columns size not equal key size (#22357)
    
    * [imporve](flex) support scientific notation(aEb) parser
    
    * update
    
    * [bug](distinct-agg) fix distinct-agg outblock columns size not equal key size
---
 be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
index f91fd3fbe3..fb653bdcbd 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
@@ -46,7 +46,7 @@ Status DistinctStreamingAggSourceOperator::pull_data(RuntimeState* state, vector
     RETURN_IF_ERROR(_data_queue->get_block_from_queue(&agg_block));
     if (agg_block != nullptr) {
         block->swap(*agg_block);
-        agg_block->clear_column_data(_node->row_desc().num_materialized_slots());
+        agg_block->clear_column_data(block->columns());
         _data_queue->push_free_block(std::move(agg_block));
     }
     if (_data_queue->data_exhausted()) { //the sink is eos or reached limit


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 03/03: [fix](ipv6)Remove restrictions from IPv4 when add backend (#22323)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b6fbc0ad3d5d1ab4c73be1abb67308641e0a98e0
Author: zhangdong <49...@qq.com>
AuthorDate: Sun Jul 30 22:47:24 2023 +0800

    [fix](ipv6)Remove restrictions from IPv4 when add backend (#22323)
    
    When adding be, it is required to have only one colon, otherwise an error will be reported. However, ipv6 has many colons
    
    ```
    String[] pair = hostPort.split(":");
    if (pair.length != 2) {
        throw new AnalysisException("Invalid host port: " + hostPort);
    }
    ```
---
 .../org/apache/doris/common/util/NetUtils.java     | 14 +++++-----
 .../org/apache/doris/system/SystemInfoService.java |  5 ----
 .../apache/doris/system/SystemInfoServiceTest.java | 32 ++++++++++++++++++++++
 3 files changed, 39 insertions(+), 12 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java
index 3470cea203..334dd11564 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java
@@ -137,16 +137,16 @@ public class NetUtils {
     }
 
     public static SystemInfoService.HostInfo resolveHostInfoFromHostPort(String hostPort) throws AnalysisException {
+        String[] pair;
         if (hostPort.charAt(0) == '[') {
-            String[] pair = hostPort.substring(1).split("]:");
-            return new SystemInfoService.HostInfo(pair[0], Integer.valueOf(pair[1]));
+            pair = hostPort.substring(1).split("]:");
         } else {
-            String[] pair = hostPort.split(":");
-            if (pair.length != 2) {
-                throw new AnalysisException("invalid host port: " + hostPort);
-            }
-            return new SystemInfoService.HostInfo(pair[0], Integer.valueOf(pair[1]));
+            pair = hostPort.split(":");
+        }
+        if (pair.length != 2) {
+            throw new AnalysisException("invalid host port: " + hostPort);
         }
+        return new SystemInfoService.HostInfo(pair[0], Integer.valueOf(pair[1]));
     }
 
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index f423981d2d..b2e5d7df40 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -718,11 +718,6 @@ public class SystemInfoService {
             throw new AnalysisException("Invalid host port: " + hostPort);
         }
 
-        String[] pair = hostPort.split(":");
-        if (pair.length != 2) {
-            throw new AnalysisException("Invalid host port: " + hostPort);
-        }
-
         HostInfo hostInfo = NetUtils.resolveHostInfoFromHostPort(hostPort);
 
         String host = hostInfo.getHost();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index 172fbd5594..d207e0ce2a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -20,9 +20,11 @@ package org.apache.doris.system;
 import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.meta.MetaContext;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.system.SystemInfoService.HostInfo;
 import org.apache.doris.thrift.TStorageMedium;
 
 import com.google.common.collect.ImmutableMap;
@@ -57,6 +59,35 @@ public class SystemInfoServiceTest {
         infoService.addBackend(backend);
     }
 
+    @Test
+    public void testGetHostAndPort() {
+        String ipv4 = "192.168.1.2:9050";
+        String ipv6 = "[fe80::5054:ff:fec9:dee0]:9050";
+        String ipv6Error = "fe80::5054:ff:fec9:dee0:9050";
+        try {
+            HostInfo hostAndPort = SystemInfoService.getHostAndPort(ipv4);
+            Assert.assertEquals("192.168.1.2", hostAndPort.getHost());
+            Assert.assertEquals(9050, hostAndPort.getPort());
+        } catch (AnalysisException e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+        try {
+            HostInfo hostAndPort = SystemInfoService.getHostAndPort(ipv6);
+            Assert.assertEquals("fe80::5054:ff:fec9:dee0", hostAndPort.getHost());
+            Assert.assertEquals(9050, hostAndPort.getPort());
+        } catch (AnalysisException e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+        try {
+            SystemInfoService.getHostAndPort(ipv6Error);
+            Assert.fail();
+        } catch (AnalysisException e) {
+            e.printStackTrace();
+        }
+    }
+
     @Test
     public void testBackendHbResponseSerialization() throws IOException {
         MetaContext metaContext = new MetaContext();
@@ -401,4 +432,5 @@ public class SystemInfoServiceTest {
         tagMap.put(Tag.TYPE_ROLE, Tag.VALUE_COMPUTATION);
         be.setTagMap(tagMap);
     }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org