You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/07 01:55:25 UTC
[doris] branch master updated: [feature-wip](new-scan)Add new jdbc scanner and new jdbc scan node (#12848)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b41748efa1 [feature-wip](new-scan)Add new jdbc scanner and new jdbc scan node (#12848)
b41748efa1 is described below
commit b41748efa13dfb8628492c7450c26be263f359a8
Author: Tiewei Fang <43...@users.noreply.github.com>
AuthorDate: Fri Oct 7 09:55:17 2022 +0800
[feature-wip](new-scan)Add new jdbc scanner and new jdbc scan node (#12848)
Related pr: #11582
This pr is the new jdbc scan node and scanner.
---
be/src/exec/exec_node.cpp | 13 ++-
be/src/runtime/plan_fragment_executor.cpp | 7 +-
be/src/vec/CMakeLists.txt | 2 +
be/src/vec/exec/scan/new_jdbc_scan_node.cpp | 62 +++++++++++
be/src/vec/exec/scan/new_jdbc_scan_node.h | 46 ++++++++
be/src/vec/exec/scan/new_jdbc_scanner.cpp | 156 ++++++++++++++++++++++++++++
be/src/vec/exec/scan/new_jdbc_scanner.h | 58 +++++++++++
7 files changed, 341 insertions(+), 3 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 4a9d648d1c..ff22f26b84 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -62,6 +62,7 @@
#include "vec/exec/file_scan_node.h"
#include "vec/exec/join/vhash_join_node.h"
#include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/exec/scan/new_jdbc_scan_node.h"
#include "vec/exec/scan/new_odbc_scan_node.h"
#include "vec/exec/scan/new_olap_scan_node.h"
#include "vec/exec/vaggregation_node.h"
@@ -462,7 +463,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
case TPlanNodeType::JDBC_SCAN_NODE:
if (state->enable_vectorized_exec()) {
#ifdef LIBJVM
- *node = pool->add(new vectorized::VJdbcScanNode(pool, tnode, descs));
+ if (config::enable_new_scan_node) {
+ *node = pool->add(new vectorized::NewJdbcScanNode(pool, tnode, descs));
+ } else {
+ *node = pool->add(new vectorized::VJdbcScanNode(pool, tnode, descs));
+ }
#else
return Status::InternalError("Jdbc scan node is disabled since no libjvm is found!");
#endif
@@ -731,7 +736,11 @@ void ExecNode::try_do_aggregate_serde_improve() {
ExecNode* child0 = agg_node[0]->_children[0];
if (typeid(*child0) == typeid(vectorized::NewOlapScanNode) ||
typeid(*child0) == typeid(vectorized::NewFileScanNode) ||
- typeid(*child0) == typeid(vectorized::NewOdbcScanNode)) {
+ typeid(*child0) == typeid(vectorized::NewOdbcScanNode)
+#ifdef LIBJVM
+ || typeid(*child0) == typeid(vectorized::NewJdbcScanNode)
+#endif
+ ) {
vectorized::VScanNode* scan_node =
static_cast<vectorized::VScanNode*>(agg_node[0]->_children[0]);
scan_node->set_no_agg_finalize();
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 2fe491a374..cff6630c23 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -47,6 +47,7 @@
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/exec/scan/new_jdbc_scan_node.h"
#include "vec/exec/scan/new_odbc_scan_node.h"
#include "vec/exec/scan/new_olap_scan_node.h"
#include "vec/exec/vexchange_node.h"
@@ -170,7 +171,11 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
ExecNode* node = scan_nodes[i];
if (typeid(*node) == typeid(vectorized::NewOlapScanNode) ||
typeid(*node) == typeid(vectorized::NewFileScanNode) ||
- typeid(*node) == typeid(vectorized::NewOdbcScanNode)) {
+ typeid(*node) == typeid(vectorized::NewOdbcScanNode)
+#ifdef LIBJVM
+ || typeid(*node) == typeid(vectorized::NewJdbcScanNode)
+#endif
+ ) {
vectorized::VScanNode* scan_node = static_cast<vectorized::VScanNode*>(scan_nodes[i]);
const std::vector<TScanRangeParams>& scan_ranges =
find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 9632fa13d3..c69cdeb01d 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -254,6 +254,8 @@ set(VEC_FILES
exec/scan/vfile_scanner.cpp
exec/scan/new_odbc_scanner.cpp
exec/scan/new_odbc_scan_node.cpp
+ exec/scan/new_jdbc_scanner.cpp
+ exec/scan/new_jdbc_scan_node.cpp
)
add_library(Vec STATIC
diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
new file mode 100644
index 0000000000..da76859915
--- /dev/null
+++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/new_jdbc_scan_node.h"
+#ifdef LIBJVM
+
+#include "vec/exec/scan/new_jdbc_scanner.h"
+#include "vec/exec/scan/vscanner.h"
+namespace doris::vectorized {
+NewJdbcScanNode::NewJdbcScanNode(ObjectPool* pool, const TPlanNode& tnode,
+ const DescriptorTbl& descs)
+ : VScanNode(pool, tnode, descs),
+ _table_name(tnode.jdbc_scan_node.table_name),
+ _tuple_id(tnode.jdbc_scan_node.tuple_id),
+ _query_string(tnode.jdbc_scan_node.query_string) {
+ _output_tuple_id = tnode.jdbc_scan_node.tuple_id;
+}
+
+std::string NewJdbcScanNode::get_name() {
+ return fmt::format("VNewJdbcScanNode({0})", _table_name);
+}
+
+Status NewJdbcScanNode::prepare(RuntimeState* state) {
+ VLOG_CRITICAL << "VNewJdbcScanNode::Prepare";
+ RETURN_IF_ERROR(VScanNode::prepare(state));
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+ _scanner_mem_tracker = std::make_unique<MemTracker>("NewJdbcScanners");
+ return Status::OK();
+}
+
+Status NewJdbcScanNode::_init_profile() {
+ RETURN_IF_ERROR(VScanNode::_init_profile());
+ return Status::OK();
+}
+
+Status NewJdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) {
+ if (_eos == true) {
+ return Status::OK();
+ }
+ NewJdbcScanner* scanner = new NewJdbcScanner(
+ _state, this, _limit_per_scanner, _scanner_mem_tracker.get(), _tuple_id, _query_string);
+ _scanner_pool.add(scanner);
+ RETURN_IF_ERROR(scanner->prepare(_state));
+ scanners->push_back(static_cast<VScanner*>(scanner));
+ return Status::OK();
+}
+} // namespace doris::vectorized
+#endif
diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.h b/be/src/vec/exec/scan/new_jdbc_scan_node.h
new file mode 100644
index 0000000000..287522fc0d
--- /dev/null
+++ b/be/src/vec/exec/scan/new_jdbc_scan_node.h
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#ifdef LIBJVM
+
+#include "runtime/runtime_state.h"
+#include "vec/exec/scan/vscan_node.h"
+
+namespace doris {
+namespace vectorized {
+class NewJdbcScanNode : public VScanNode {
+public:
+ NewJdbcScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+
+ Status prepare(RuntimeState* state) override;
+ std::string get_name() override;
+
+protected:
+ Status _init_profile() override;
+ Status _init_scanners(std::list<VScanner*>* scanners) override;
+
+private:
+ std::string _table_name;
+ TupleId _tuple_id;
+ std::string _query_string;
+
+ std::unique_ptr<MemTracker> _scanner_mem_tracker;
+};
+} // namespace vectorized
+} // namespace doris
+#endif
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
new file mode 100644
index 0000000000..c571714e94
--- /dev/null
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "new_jdbc_scanner.h"
+
+#ifdef LIBJVM
+
+namespace doris::vectorized {
+NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit,
+ MemTracker* mem_tracker, TupleId tuple_id, std::string query_string)
+ : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker),
+ _is_init(false),
+ _jdbc_eos(false),
+ _tuple_id(tuple_id),
+ _query_string(query_string),
+ _tuple_desc(nullptr) {}
+
+Status NewJdbcScanner::prepare(RuntimeState* state) {
+ VLOG_CRITICAL << "NewJdbcScanner::Prepare";
+ if (_is_init) {
+ return Status::OK();
+ }
+
+ if (state == nullptr) {
+ return Status::InternalError("input pointer is NULL of VJdbcScanNode::prepare.");
+ }
+
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+
+ // get tuple desc
+ _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+ if (_tuple_desc == nullptr) {
+ return Status::InternalError("Failed to get tuple descriptor.");
+ }
+
+ // get jdbc table info
+ const JdbcTableDescriptor* jdbc_table =
+ static_cast<const JdbcTableDescriptor*>(_tuple_desc->table_desc());
+ if (jdbc_table == nullptr) {
+ return Status::InternalError("jdbc table pointer is NULL of VJdbcScanNode::prepare.");
+ }
+ _jdbc_param.driver_class = jdbc_table->jdbc_driver_class();
+ _jdbc_param.driver_path = jdbc_table->jdbc_driver_url();
+ _jdbc_param.resource_name = jdbc_table->jdbc_resource_name();
+ _jdbc_param.driver_checksum = jdbc_table->jdbc_driver_checksum();
+ _jdbc_param.jdbc_url = jdbc_table->jdbc_url();
+ _jdbc_param.user = jdbc_table->jdbc_user();
+ _jdbc_param.passwd = jdbc_table->jdbc_passwd();
+ _jdbc_param.tuple_desc = _tuple_desc;
+ _jdbc_param.query_string = std::move(_query_string);
+
+ _jdbc_connector.reset(new (std::nothrow) JdbcConnector(_jdbc_param));
+ if (_jdbc_connector == nullptr) {
+ return Status::InternalError("new a jdbc scanner failed.");
+ }
+
+ _is_init = true;
+ return Status::OK();
+}
+
+Status NewJdbcScanner::open(RuntimeState* state) {
+ VLOG_CRITICAL << "NewJdbcScanner::open";
+ if (state == nullptr) {
+ return Status::InternalError("input pointer is NULL of VJdbcScanNode::open.");
+ }
+
+ if (!_is_init) {
+ return Status::InternalError("used before initialize of VJdbcScanNode::open.");
+ }
+ RETURN_IF_CANCELLED(state);
+ RETURN_IF_ERROR(VScanner::open(state));
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+ RETURN_IF_ERROR(_jdbc_connector->open());
+ RETURN_IF_ERROR(_jdbc_connector->query());
+ return Status::OK();
+}
+
+Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
+ VLOG_CRITICAL << "NewJdbcScanner::_get_block_impl";
+ if (nullptr == state || nullptr == block || nullptr == eof) {
+ return Status::InternalError("input is NULL pointer");
+ }
+
+ if (!_is_init) {
+ return Status::InternalError("used before initialize of VJdbcScanNode::get_next.");
+ }
+
+ if (_jdbc_eos == true) {
+ *eof = true;
+ return Status::OK();
+ }
+
+ auto column_size = _tuple_desc->slots().size();
+ std::vector<MutableColumnPtr> columns(column_size);
+ bool mem_reuse = block->mem_reuse();
+ // only empty block should be here
+ DCHECK(block->rows() == 0);
+
+ do {
+ RETURN_IF_CANCELLED(state);
+
+ columns.resize(column_size);
+ for (auto i = 0; i < column_size; i++) {
+ if (mem_reuse) {
+ columns[i] = std::move(*block->get_by_position(i).column).mutate();
+ } else {
+ columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
+ }
+ }
+
+ RETURN_IF_ERROR(_jdbc_connector->get_next(&_jdbc_eos, columns, state->batch_size()));
+
+ if (_jdbc_eos == true) {
+ if (block->rows() == 0) {
+ *eof = true;
+ }
+ break;
+ }
+
+ // Before really use the Block, must clear other ptr of column in block
+ // So here need do std::move and clear in `columns`
+ if (!mem_reuse) {
+ int column_index = 0;
+ for (const auto slot_desc : _tuple_desc->slots()) {
+ block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]),
+ slot_desc->get_data_type_ptr(),
+ slot_desc->col_name()));
+ }
+ } else {
+ columns.clear();
+ }
+ VLOG_ROW << "NewJdbcScanNode output rows: " << block->rows();
+ } while (block->rows() == 0 && !(*eof));
+ return Status::OK();
+}
+
+Status NewJdbcScanner::close(RuntimeState* state) {
+ RETURN_IF_ERROR(VScanner::close(state));
+ return Status::OK();
+}
+} // namespace doris::vectorized
+#endif
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h b/be/src/vec/exec/scan/new_jdbc_scanner.h
new file mode 100644
index 0000000000..75dabcacfa
--- /dev/null
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#ifdef LIBJVM
+
+#include "runtime/runtime_state.h"
+#include "vec/exec/scan/new_jdbc_scan_node.h"
+#include "vec/exec/scan/vscanner.h"
+#include "vec/exec/vjdbc_connector.h"
+namespace doris {
+namespace vectorized {
+class NewJdbcScanner : public VScanner {
+public:
+ NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit,
+ MemTracker* mem_tracker, TupleId tuple_id, std::string query_string);
+
+ Status open(RuntimeState* state) override;
+ Status close(RuntimeState* state) override;
+
+public:
+ Status prepare(RuntimeState* state);
+
+protected:
+ Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
+
+private:
+ bool _is_init;
+
+ bool _jdbc_eos;
+
+ // Tuple id resolved in prepare() to set _tuple_desc;
+ TupleId _tuple_id;
+ // SQL
+ std::string _query_string;
+ // Descriptor of tuples read from JDBC table.
+ const TupleDescriptor* _tuple_desc;
+ // Scanner of JDBC.
+ std::unique_ptr<JdbcConnector> _jdbc_connector;
+ JdbcConnectorParam _jdbc_param;
+};
+} // namespace vectorized
+} // namespace doris
+#endif
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org