You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/03/11 04:00:48 UTC

[GitHub] [incubator-doris] yangzhg commented on a change in pull request #3056: implement except node

yangzhg commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r390734537
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +40,157 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_child_expr_lists.size(), true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        if (i != _children.size() - 1) {
+            // if this hash table is probed rebuild it
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
 
 Review comment:
    a excepte b , use a to build a hash table and use b to probe
    the result is  element in a and never mached

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org