You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/07 01:55:25 UTC

[doris] branch master updated: [feature-wip](new-scan)Add new jdbc scanner and new jdbc scan node (#12848)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b41748efa1 [feature-wip](new-scan)Add new jdbc scanner and new jdbc scan node (#12848)
b41748efa1 is described below

commit b41748efa13dfb8628492c7450c26be263f359a8
Author: Tiewei Fang <43...@users.noreply.github.com>
AuthorDate: Fri Oct 7 09:55:17 2022 +0800

    [feature-wip](new-scan)Add new jdbc scanner and new jdbc scan node (#12848)
    
    Related pr: #11582
    This pr is the new jdbc scan node and scanner.
---
 be/src/exec/exec_node.cpp                   |  13 ++-
 be/src/runtime/plan_fragment_executor.cpp   |   7 +-
 be/src/vec/CMakeLists.txt                   |   2 +
 be/src/vec/exec/scan/new_jdbc_scan_node.cpp |  62 +++++++++++
 be/src/vec/exec/scan/new_jdbc_scan_node.h   |  46 ++++++++
 be/src/vec/exec/scan/new_jdbc_scanner.cpp   | 156 ++++++++++++++++++++++++++++
 be/src/vec/exec/scan/new_jdbc_scanner.h     |  58 +++++++++++
 7 files changed, 341 insertions(+), 3 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 4a9d648d1c..ff22f26b84 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -62,6 +62,7 @@
 #include "vec/exec/file_scan_node.h"
 #include "vec/exec/join/vhash_join_node.h"
 #include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/exec/scan/new_jdbc_scan_node.h"
 #include "vec/exec/scan/new_odbc_scan_node.h"
 #include "vec/exec/scan/new_olap_scan_node.h"
 #include "vec/exec/vaggregation_node.h"
@@ -462,7 +463,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
     case TPlanNodeType::JDBC_SCAN_NODE:
         if (state->enable_vectorized_exec()) {
 #ifdef LIBJVM
-            *node = pool->add(new vectorized::VJdbcScanNode(pool, tnode, descs));
+            if (config::enable_new_scan_node) {
+                *node = pool->add(new vectorized::NewJdbcScanNode(pool, tnode, descs));
+            } else {
+                *node = pool->add(new vectorized::VJdbcScanNode(pool, tnode, descs));
+            }
 #else
             return Status::InternalError("Jdbc scan node is disabled since no libjvm is found!");
 #endif
@@ -731,7 +736,11 @@ void ExecNode::try_do_aggregate_serde_improve() {
     ExecNode* child0 = agg_node[0]->_children[0];
     if (typeid(*child0) == typeid(vectorized::NewOlapScanNode) ||
         typeid(*child0) == typeid(vectorized::NewFileScanNode) ||
-        typeid(*child0) == typeid(vectorized::NewOdbcScanNode)) {
+        typeid(*child0) == typeid(vectorized::NewOdbcScanNode)
+#ifdef LIBJVM
+        || typeid(*child0) == typeid(vectorized::NewJdbcScanNode)
+#endif
+    ) {
         vectorized::VScanNode* scan_node =
                 static_cast<vectorized::VScanNode*>(agg_node[0]->_children[0]);
         scan_node->set_no_agg_finalize();
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 2fe491a374..cff6630c23 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -47,6 +47,7 @@
 #include "util/uid_util.h"
 #include "vec/core/block.h"
 #include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/exec/scan/new_jdbc_scan_node.h"
 #include "vec/exec/scan/new_odbc_scan_node.h"
 #include "vec/exec/scan/new_olap_scan_node.h"
 #include "vec/exec/vexchange_node.h"
@@ -170,7 +171,11 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
         ExecNode* node = scan_nodes[i];
         if (typeid(*node) == typeid(vectorized::NewOlapScanNode) ||
             typeid(*node) == typeid(vectorized::NewFileScanNode) ||
-            typeid(*node) == typeid(vectorized::NewOdbcScanNode)) {
+            typeid(*node) == typeid(vectorized::NewOdbcScanNode)
+#ifdef LIBJVM
+            || typeid(*node) == typeid(vectorized::NewJdbcScanNode)
+#endif
+        ) {
             vectorized::VScanNode* scan_node = static_cast<vectorized::VScanNode*>(scan_nodes[i]);
             const std::vector<TScanRangeParams>& scan_ranges =
                     find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 9632fa13d3..c69cdeb01d 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -254,6 +254,8 @@ set(VEC_FILES
   exec/scan/vfile_scanner.cpp
   exec/scan/new_odbc_scanner.cpp
   exec/scan/new_odbc_scan_node.cpp
+  exec/scan/new_jdbc_scanner.cpp
+  exec/scan/new_jdbc_scan_node.cpp
   )
 
 add_library(Vec STATIC
diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
new file mode 100644
index 0000000000..da76859915
--- /dev/null
+++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/new_jdbc_scan_node.h"
+#ifdef LIBJVM
+
+#include "vec/exec/scan/new_jdbc_scanner.h"
+#include "vec/exec/scan/vscanner.h"
+namespace doris::vectorized {
+NewJdbcScanNode::NewJdbcScanNode(ObjectPool* pool, const TPlanNode& tnode,
+                                 const DescriptorTbl& descs)
+        : VScanNode(pool, tnode, descs),
+          _table_name(tnode.jdbc_scan_node.table_name),
+          _tuple_id(tnode.jdbc_scan_node.tuple_id),
+          _query_string(tnode.jdbc_scan_node.query_string) {
+    _output_tuple_id = tnode.jdbc_scan_node.tuple_id;
+}
+
+std::string NewJdbcScanNode::get_name() {
+    return fmt::format("VNewJdbcScanNode({0})", _table_name);
+}
+
+Status NewJdbcScanNode::prepare(RuntimeState* state) {
+    VLOG_CRITICAL << "VNewJdbcScanNode::Prepare";
+    RETURN_IF_ERROR(VScanNode::prepare(state));
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+    _scanner_mem_tracker = std::make_unique<MemTracker>("NewJdbcScanners");
+    return Status::OK();
+}
+
+Status NewJdbcScanNode::_init_profile() {
+    RETURN_IF_ERROR(VScanNode::_init_profile());
+    return Status::OK();
+}
+
+Status NewJdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) {
+    if (_eos == true) {
+        return Status::OK();
+    }
+    NewJdbcScanner* scanner = new NewJdbcScanner(
+            _state, this, _limit_per_scanner, _scanner_mem_tracker.get(), _tuple_id, _query_string);
+    _scanner_pool.add(scanner);
+    RETURN_IF_ERROR(scanner->prepare(_state));
+    scanners->push_back(static_cast<VScanner*>(scanner));
+    return Status::OK();
+}
+} // namespace doris::vectorized
+#endif
diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.h b/be/src/vec/exec/scan/new_jdbc_scan_node.h
new file mode 100644
index 0000000000..287522fc0d
--- /dev/null
+++ b/be/src/vec/exec/scan/new_jdbc_scan_node.h
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#ifdef LIBJVM
+
+#include "runtime/runtime_state.h"
+#include "vec/exec/scan/vscan_node.h"
+
+namespace doris {
+namespace vectorized {
+class NewJdbcScanNode : public VScanNode {
+public:
+    NewJdbcScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+
+    Status prepare(RuntimeState* state) override;
+    std::string get_name() override;
+
+protected:
+    Status _init_profile() override;
+    Status _init_scanners(std::list<VScanner*>* scanners) override;
+
+private:
+    std::string _table_name;
+    TupleId _tuple_id;
+    std::string _query_string;
+
+    std::unique_ptr<MemTracker> _scanner_mem_tracker;
+};
+} // namespace vectorized
+} // namespace doris
+#endif
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
new file mode 100644
index 0000000000..c571714e94
--- /dev/null
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "new_jdbc_scanner.h"
+
+#ifdef LIBJVM
+
+namespace doris::vectorized {
+NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit,
+                               MemTracker* mem_tracker, TupleId tuple_id, std::string query_string)
+        : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker),
+          _is_init(false),
+          _jdbc_eos(false),
+          _tuple_id(tuple_id),
+          _query_string(query_string),
+          _tuple_desc(nullptr) {}
+
+Status NewJdbcScanner::prepare(RuntimeState* state) {
+    VLOG_CRITICAL << "NewJdbcScanner::Prepare";
+    if (_is_init) {
+        return Status::OK();
+    }
+
+    if (state == nullptr) {
+        return Status::InternalError("input pointer is NULL of VJdbcScanNode::prepare.");
+    }
+
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+
+    // get tuple desc
+    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+    if (_tuple_desc == nullptr) {
+        return Status::InternalError("Failed to get tuple descriptor.");
+    }
+
+    // get jdbc table info
+    const JdbcTableDescriptor* jdbc_table =
+            static_cast<const JdbcTableDescriptor*>(_tuple_desc->table_desc());
+    if (jdbc_table == nullptr) {
+        return Status::InternalError("jdbc table pointer is NULL of VJdbcScanNode::prepare.");
+    }
+    _jdbc_param.driver_class = jdbc_table->jdbc_driver_class();
+    _jdbc_param.driver_path = jdbc_table->jdbc_driver_url();
+    _jdbc_param.resource_name = jdbc_table->jdbc_resource_name();
+    _jdbc_param.driver_checksum = jdbc_table->jdbc_driver_checksum();
+    _jdbc_param.jdbc_url = jdbc_table->jdbc_url();
+    _jdbc_param.user = jdbc_table->jdbc_user();
+    _jdbc_param.passwd = jdbc_table->jdbc_passwd();
+    _jdbc_param.tuple_desc = _tuple_desc;
+    _jdbc_param.query_string = std::move(_query_string);
+
+    _jdbc_connector.reset(new (std::nothrow) JdbcConnector(_jdbc_param));
+    if (_jdbc_connector == nullptr) {
+        return Status::InternalError("new a jdbc scanner failed.");
+    }
+
+    _is_init = true;
+    return Status::OK();
+}
+
+Status NewJdbcScanner::open(RuntimeState* state) {
+    VLOG_CRITICAL << "NewJdbcScanner::open";
+    if (state == nullptr) {
+        return Status::InternalError("input pointer is NULL of VJdbcScanNode::open.");
+    }
+
+    if (!_is_init) {
+        return Status::InternalError("used before initialize of VJdbcScanNode::open.");
+    }
+    RETURN_IF_CANCELLED(state);
+    RETURN_IF_ERROR(VScanner::open(state));
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+    RETURN_IF_ERROR(_jdbc_connector->open());
+    RETURN_IF_ERROR(_jdbc_connector->query());
+    return Status::OK();
+}
+
+Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
+    VLOG_CRITICAL << "NewJdbcScanner::_get_block_impl";
+    if (nullptr == state || nullptr == block || nullptr == eof) {
+        return Status::InternalError("input is NULL pointer");
+    }
+
+    if (!_is_init) {
+        return Status::InternalError("used before initialize of VJdbcScanNode::get_next.");
+    }
+
+    if (_jdbc_eos == true) {
+        *eof = true;
+        return Status::OK();
+    }
+
+    auto column_size = _tuple_desc->slots().size();
+    std::vector<MutableColumnPtr> columns(column_size);
+    bool mem_reuse = block->mem_reuse();
+    // only empty block should be here
+    DCHECK(block->rows() == 0);
+
+    do {
+        RETURN_IF_CANCELLED(state);
+
+        columns.resize(column_size);
+        for (auto i = 0; i < column_size; i++) {
+            if (mem_reuse) {
+                columns[i] = std::move(*block->get_by_position(i).column).mutate();
+            } else {
+                columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
+            }
+        }
+
+        RETURN_IF_ERROR(_jdbc_connector->get_next(&_jdbc_eos, columns, state->batch_size()));
+
+        if (_jdbc_eos == true) {
+            if (block->rows() == 0) {
+                *eof = true;
+            }
+            break;
+        }
+
+        // Before really use the Block, must clear other ptr of column in block
+        // So here need do std::move and clear in `columns`
+        if (!mem_reuse) {
+            int column_index = 0;
+            for (const auto slot_desc : _tuple_desc->slots()) {
+                block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]),
+                                                    slot_desc->get_data_type_ptr(),
+                                                    slot_desc->col_name()));
+            }
+        } else {
+            columns.clear();
+        }
+        VLOG_ROW << "NewJdbcScanNode output rows: " << block->rows();
+    } while (block->rows() == 0 && !(*eof));
+    return Status::OK();
+}
+
+Status NewJdbcScanner::close(RuntimeState* state) {
+    RETURN_IF_ERROR(VScanner::close(state));
+    return Status::OK();
+}
+} // namespace doris::vectorized
+#endif
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h b/be/src/vec/exec/scan/new_jdbc_scanner.h
new file mode 100644
index 0000000000..75dabcacfa
--- /dev/null
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#ifdef LIBJVM
+
+#include "runtime/runtime_state.h"
+#include "vec/exec/scan/new_jdbc_scan_node.h"
+#include "vec/exec/scan/vscanner.h"
+#include "vec/exec/vjdbc_connector.h"
+namespace doris {
+namespace vectorized {
+class NewJdbcScanner : public VScanner {
+public:
+    NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit,
+                   MemTracker* mem_tracker, TupleId tuple_id, std::string query_string);
+
+    Status open(RuntimeState* state) override;
+    Status close(RuntimeState* state) override;
+
+public:
+    Status prepare(RuntimeState* state);
+
+protected:
+    Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
+
+private:
+    bool _is_init;
+
+    bool _jdbc_eos;
+
+    // Tuple id resolved in prepare() to set _tuple_desc;
+    TupleId _tuple_id;
+    // SQL
+    std::string _query_string;
+    // Descriptor of tuples read from JDBC table.
+    const TupleDescriptor* _tuple_desc;
+    // Scanner of JDBC.
+    std::unique_ptr<JdbcConnector> _jdbc_connector;
+    JdbcConnectorParam _jdbc_param;
+};
+} // namespace vectorized
+} // namespace doris
+#endif


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org