You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/03/09 03:10:42 UTC

[GitHub] [incubator-doris] yangzhg opened a new pull request #3056: implement except node

yangzhg opened a new pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056
 
 
   implement except node,
   support  statement like 
   ``` 
   select a from t1 except select b from t2
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wutiangan commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
wutiangan commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r390729921
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +40,157 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_child_expr_lists.size(), true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        if (i != _children.size() - 1) {
+            // if this hash table is probed rebuild it
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
 
 Review comment:
   why do you use  'if (!_hash_tbl_iterator.matched())'?
   may be you should use ' if (_hash_tbl_iterator.matched())'
   matched means exceptNode

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
yangzhg commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r392204572
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +41,190 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
+    _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_build_tuple_size, true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table, use _child_expr_lists[0] as probe is used for remove duplicted
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[0], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        SCOPED_TIMER(_build_timer);
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        // build hash table and remvoe duplicate items
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert_unique(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // rebuid hash table, for first time will rebuild with the no duplicated _hash_tbl,
+        if (i > 1) {
+            SCOPED_TIMER(_build_timer);
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
+                        temp_tbl->insert(_hash_tbl_iterator.get_row());
+                    }
+                }
+                _hash_tbl_iterator.next<false>();
+            }
+            _hash_tbl.swap(temp_tbl);
+            temp_tbl->close();
+        }
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        ScopedTimer<MonotonicStopWatch> probe_timer(_probe_timer);
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        // if a table is empty, the result must be empty
+        if (_hash_tbl->size() == 0) {
+            break;
+        }
+    }
+    _hash_tbl_iterator = _hash_tbl->begin();
+    return Status::OK();
+}
+
+Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    *eos = true;
+    if (reached_limit()) {
+        return Status::OK();
+    }
+    uint32_t previous_hash = -1;
+    TupleRow* previous_row = nullptr;
+    while (_hash_tbl_iterator.has_next()) {
+        if (!_hash_tbl_iterator.matched()) {
+            if (previous_hash != _hash_tbl_iterator.get_hash() ||
+                !equals(previous_row, _hash_tbl_iterator.get_row())) {
+                int row_idx = out_batch->add_row();
+                TupleRow* out_row = out_batch->get_row(row_idx);
+                uint8_t* out_ptr = reinterpret_cast<uint8_t*>(out_row);
+                memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size);
+                out_batch->commit_last_row();
+                ++_num_rows_returned;
+            }
+        }
+        previous_hash = _hash_tbl_iterator.get_hash();
+        previous_row = _hash_tbl_iterator.get_row();
+        _hash_tbl_iterator.next<false>();
+
+        *eos = !_hash_tbl_iterator.has_next() || reached_limit();
+        if (out_batch->is_full() || out_batch->at_resource_limit() || *eos) {
+            return Status::OK();
+        }
+    }
+    return Status::OK();
+}
+
+bool ExceptNode::equals(TupleRow* row, TupleRow* other) {
+    if (row == nullptr && other == nullptr) {
 
 Review comment:
   this for a safe guard

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangkaisen commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
kangkaisen commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r392185880
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +41,190 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
+    _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_build_tuple_size, true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table, use _child_expr_lists[0] as probe is used for remove duplicted
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[0], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        SCOPED_TIMER(_build_timer);
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        // build hash table and remvoe duplicate items
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert_unique(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // rebuid hash table, for first time will rebuild with the no duplicated _hash_tbl,
+        if (i > 1) {
+            SCOPED_TIMER(_build_timer);
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
+                        temp_tbl->insert(_hash_tbl_iterator.get_row());
+                    }
+                }
+                _hash_tbl_iterator.next<false>();
+            }
+            _hash_tbl.swap(temp_tbl);
+            temp_tbl->close();
+        }
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        ScopedTimer<MonotonicStopWatch> probe_timer(_probe_timer);
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        // if a table is empty, the result must be empty
+        if (_hash_tbl->size() == 0) {
+            break;
+        }
+    }
+    _hash_tbl_iterator = _hash_tbl->begin();
+    return Status::OK();
+}
+
+Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    *eos = true;
+    if (reached_limit()) {
+        return Status::OK();
+    }
+    uint32_t previous_hash = -1;
+    TupleRow* previous_row = nullptr;
+    while (_hash_tbl_iterator.has_next()) {
+        if (!_hash_tbl_iterator.matched()) {
+            if (previous_hash != _hash_tbl_iterator.get_hash() ||
+                !equals(previous_row, _hash_tbl_iterator.get_row())) {
+                int row_idx = out_batch->add_row();
+                TupleRow* out_row = out_batch->get_row(row_idx);
+                uint8_t* out_ptr = reinterpret_cast<uint8_t*>(out_row);
+                memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size);
+                out_batch->commit_last_row();
+                ++_num_rows_returned;
+            }
+        }
+        previous_hash = _hash_tbl_iterator.get_hash();
+        previous_row = _hash_tbl_iterator.get_row();
+        _hash_tbl_iterator.next<false>();
+
+        *eos = !_hash_tbl_iterator.has_next() || reached_limit();
+        if (out_batch->is_full() || out_batch->at_resource_limit() || *eos) {
+            return Status::OK();
+        }
+    }
+    return Status::OK();
+}
+
+bool ExceptNode::equals(TupleRow* row, TupleRow* other) {
+    if (row == nullptr && other == nullptr) {
+        return false;
+    }
+    if (row == nullptr || other == nullptr) {
+        return false;
+    }
+    for (int i = 0; i < _child_expr_lists[0].size(); ++i) {
+        void* val_row = _child_expr_lists[0][i]->get_value(row);
+        void* val_other = _child_expr_lists[0][i]->get_value(other);
+        if (val_row == nullptr && val_other == nullptr) {
+            continue;
 
 Review comment:
   why continue?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r390323844
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +40,158 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_child_expr_lists.size(), true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+
+    return ExecNode::close(state);
+}
 
 Review comment:
   ```suggestion
   }
   
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wutiangan commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
wutiangan commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r390731473
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +40,157 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_child_expr_lists.size(), true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        if (i != _children.size() - 1) {
+            // if this hash table is probed rebuild it
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
+                        temp_tbl->insert(_hash_tbl_iterator.get_row());
+                    }
+                }
+                _hash_tbl_iterator.next<false>();
+            }
+            _hash_tbl.swap(temp_tbl);
+            temp_tbl->close();
+        }
+        // if a table is empty, the result must be empty
+        if (_hash_tbl->size() == 0) {
+            break;
+        }
+    }
+    _hash_tbl_iterator = _hash_tbl->begin();
+    return Status::OK();
+}
+
+Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    *eos = true;
+    if (reached_limit()) {
+        return Status::OK();
+    }
+    uint32_t previous_hash = -1;
+    while (_hash_tbl_iterator.has_next()) {
 
 Review comment:
   How about if the hash value of hashtable is - 1?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wutiangan commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
wutiangan commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r390731473
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +40,157 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_child_expr_lists.size(), true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        if (i != _children.size() - 1) {
+            // if this hash table is probed rebuild it
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
+                        temp_tbl->insert(_hash_tbl_iterator.get_row());
+                    }
+                }
+                _hash_tbl_iterator.next<false>();
+            }
+            _hash_tbl.swap(temp_tbl);
+            temp_tbl->close();
+        }
+        // if a table is empty, the result must be empty
+        if (_hash_tbl->size() == 0) {
+            break;
+        }
+    }
+    _hash_tbl_iterator = _hash_tbl->begin();
+    return Status::OK();
+}
+
+Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    *eos = true;
+    if (reached_limit()) {
+        return Status::OK();
+    }
+    uint32_t previous_hash = -1;
+    while (_hash_tbl_iterator.has_next()) {
 
 Review comment:
   How about if the hash value is - 1?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
yangzhg commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r393403598
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +41,190 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
+    _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_build_tuple_size, true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table, use _child_expr_lists[0] as probe is used for remove duplicted
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[0], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        SCOPED_TIMER(_build_timer);
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        // build hash table and remvoe duplicate items
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert_unique(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // rebuid hash table, for first time will rebuild with the no duplicated _hash_tbl,
+        if (i > 1) {
+            SCOPED_TIMER(_build_timer);
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
+                        temp_tbl->insert(_hash_tbl_iterator.get_row());
+                    }
+                }
+                _hash_tbl_iterator.next<false>();
+            }
+            _hash_tbl.swap(temp_tbl);
+            temp_tbl->close();
+        }
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        ScopedTimer<MonotonicStopWatch> probe_timer(_probe_timer);
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        // if a table is empty, the result must be empty
+        if (_hash_tbl->size() == 0) {
+            break;
+        }
+    }
+    _hash_tbl_iterator = _hash_tbl->begin();
+    return Status::OK();
+}
+
+Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    *eos = true;
+    if (reached_limit()) {
+        return Status::OK();
+    }
+    uint32_t previous_hash = -1;
+    TupleRow* previous_row = nullptr;
+    while (_hash_tbl_iterator.has_next()) {
+        if (!_hash_tbl_iterator.matched()) {
+            if (previous_hash != _hash_tbl_iterator.get_hash() ||
+                !equals(previous_row, _hash_tbl_iterator.get_row())) {
+                int row_idx = out_batch->add_row();
+                TupleRow* out_row = out_batch->get_row(row_idx);
+                uint8_t* out_ptr = reinterpret_cast<uint8_t*>(out_row);
+                memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size);
+                out_batch->commit_last_row();
+                ++_num_rows_returned;
+            }
+        }
+        previous_hash = _hash_tbl_iterator.get_hash();
+        previous_row = _hash_tbl_iterator.get_row();
+        _hash_tbl_iterator.next<false>();
+
+        *eos = !_hash_tbl_iterator.has_next() || reached_limit();
+        if (out_batch->is_full() || out_batch->at_resource_limit() || *eos) {
+            return Status::OK();
+        }
+    }
+    return Status::OK();
+}
+
+bool ExceptNode::equals(TupleRow* row, TupleRow* other) {
+    if (row == nullptr && other == nullptr) {
+        return false;
+    }
+    if (row == nullptr || other == nullptr) {
+        return false;
+    }
+    for (int i = 0; i < _child_expr_lists[0].size(); ++i) {
+        void* val_row = _child_expr_lists[0][i]->get_value(row);
+        void* val_other = _child_expr_lists[0][i]->get_value(other);
+        if (val_row == nullptr && val_other == nullptr) {
+            continue;
 
 Review comment:
   if table a is 
   ```
   c1
   ---
   null
   ```
   
   table b is 
   ```
   c2
   ---
   null
   ```
   if caluse is 
   `select c1 from a except select c2 from b `
   will hit `val_row == nullptr && val_other == nullptr`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangkaisen commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
kangkaisen commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r392209204
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +41,190 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
+    _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_build_tuple_size, true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table, use _child_expr_lists[0] as probe is used for remove duplicted
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[0], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        SCOPED_TIMER(_build_timer);
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        // build hash table and remvoe duplicate items
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert_unique(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // rebuid hash table, for first time will rebuild with the no duplicated _hash_tbl,
+        if (i > 1) {
+            SCOPED_TIMER(_build_timer);
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
+                        temp_tbl->insert(_hash_tbl_iterator.get_row());
+                    }
+                }
+                _hash_tbl_iterator.next<false>();
+            }
+            _hash_tbl.swap(temp_tbl);
+            temp_tbl->close();
+        }
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        ScopedTimer<MonotonicStopWatch> probe_timer(_probe_timer);
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        // if a table is empty, the result must be empty
+        if (_hash_tbl->size() == 0) {
+            break;
+        }
+    }
+    _hash_tbl_iterator = _hash_tbl->begin();
+    return Status::OK();
+}
+
+Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    *eos = true;
+    if (reached_limit()) {
+        return Status::OK();
+    }
+    uint32_t previous_hash = -1;
+    TupleRow* previous_row = nullptr;
+    while (_hash_tbl_iterator.has_next()) {
+        if (!_hash_tbl_iterator.matched()) {
+            if (previous_hash != _hash_tbl_iterator.get_hash() ||
+                !equals(previous_row, _hash_tbl_iterator.get_row())) {
+                int row_idx = out_batch->add_row();
+                TupleRow* out_row = out_batch->get_row(row_idx);
+                uint8_t* out_ptr = reinterpret_cast<uint8_t*>(out_row);
+                memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size);
+                out_batch->commit_last_row();
+                ++_num_rows_returned;
+            }
+        }
+        previous_hash = _hash_tbl_iterator.get_hash();
+        previous_row = _hash_tbl_iterator.get_row();
+        _hash_tbl_iterator.next<false>();
+
+        *eos = !_hash_tbl_iterator.has_next() || reached_limit();
+        if (out_batch->is_full() || out_batch->at_resource_limit() || *eos) {
+            return Status::OK();
+        }
+    }
+    return Status::OK();
+}
+
+bool ExceptNode::equals(TupleRow* row, TupleRow* other) {
+    if (row == nullptr && other == nullptr) {
+        return false;
+    }
+    if (row == nullptr || other == nullptr) {
+        return false;
+    }
+    for (int i = 0; i < _child_expr_lists[0].size(); ++i) {
+        void* val_row = _child_expr_lists[0][i]->get_value(row);
+        void* val_other = _child_expr_lists[0][i]->get_value(other);
+        if (val_row == nullptr && val_other == nullptr) {
+            continue;
 
 Review comment:
   If `_child_expr_lists[0].size` is 1. `ExceptNode::equals` will return true. which is your expectation ? which should be conflicting with `else if (val_row == nullptr || val_other == nullptr) {
               return false;`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r390335009
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/analysis/SetOperationStmt.java
 ##########
 @@ -649,6 +648,13 @@ public void analyze(Analyzer parent) throws AnalysisException, UserException {
             if (isAnalyzed()) {
                 return;
             }
+            if (queryStmt instanceof SelectStmt && ((SelectStmt) queryStmt).fromClause_.isEmpty()) {
 
 Review comment:
   What this for? Could you add comment to explain this rewrite?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r390332548
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +40,158 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_child_expr_lists.size(), true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
 
 Review comment:
   Should it be 
   ```
   _hash_tbl_iterator == _hash_tbl->end()
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
yangzhg commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r392203155
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +41,190 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
+    _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_build_tuple_size, true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table, use _child_expr_lists[0] as probe is used for remove duplicted
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[0], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        SCOPED_TIMER(_build_timer);
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        // build hash table and remvoe duplicate items
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert_unique(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // rebuid hash table, for first time will rebuild with the no duplicated _hash_tbl,
+        if (i > 1) {
+            SCOPED_TIMER(_build_timer);
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
+                        temp_tbl->insert(_hash_tbl_iterator.get_row());
+                    }
+                }
+                _hash_tbl_iterator.next<false>();
+            }
+            _hash_tbl.swap(temp_tbl);
+            temp_tbl->close();
+        }
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        ScopedTimer<MonotonicStopWatch> probe_timer(_probe_timer);
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        // if a table is empty, the result must be empty
+        if (_hash_tbl->size() == 0) {
+            break;
+        }
+    }
+    _hash_tbl_iterator = _hash_tbl->begin();
+    return Status::OK();
+}
+
+Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    *eos = true;
+    if (reached_limit()) {
+        return Status::OK();
+    }
+    uint32_t previous_hash = -1;
+    TupleRow* previous_row = nullptr;
+    while (_hash_tbl_iterator.has_next()) {
+        if (!_hash_tbl_iterator.matched()) {
+            if (previous_hash != _hash_tbl_iterator.get_hash() ||
+                !equals(previous_row, _hash_tbl_iterator.get_row())) {
+                int row_idx = out_batch->add_row();
+                TupleRow* out_row = out_batch->get_row(row_idx);
+                uint8_t* out_ptr = reinterpret_cast<uint8_t*>(out_row);
+                memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size);
+                out_batch->commit_last_row();
+                ++_num_rows_returned;
+            }
+        }
+        previous_hash = _hash_tbl_iterator.get_hash();
+        previous_row = _hash_tbl_iterator.get_row();
+        _hash_tbl_iterator.next<false>();
+
+        *eos = !_hash_tbl_iterator.has_next() || reached_limit();
+        if (out_batch->is_full() || out_batch->at_resource_limit() || *eos) {
+            return Status::OK();
+        }
+    }
+    return Status::OK();
+}
+
+bool ExceptNode::equals(TupleRow* row, TupleRow* other) {
+    if (row == nullptr && other == nullptr) {
+        return false;
+    }
+    if (row == nullptr || other == nullptr) {
+        return false;
+    }
+    for (int i = 0; i < _child_expr_lists[0].size(); ++i) {
+        void* val_row = _child_expr_lists[0][i]->get_value(row);
+        void* val_other = _child_expr_lists[0][i]->get_value(other);
+        if (val_row == nullptr && val_other == nullptr) {
+            continue;
 
 Review comment:
   for nullable value when is null

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
yangzhg commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r393403598
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +41,190 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
+    _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_build_tuple_size, true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table, use _child_expr_lists[0] as probe is used for remove duplicted
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[0], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        SCOPED_TIMER(_build_timer);
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        // build hash table and remvoe duplicate items
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert_unique(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // rebuid hash table, for first time will rebuild with the no duplicated _hash_tbl,
+        if (i > 1) {
+            SCOPED_TIMER(_build_timer);
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
+                        temp_tbl->insert(_hash_tbl_iterator.get_row());
+                    }
+                }
+                _hash_tbl_iterator.next<false>();
+            }
+            _hash_tbl.swap(temp_tbl);
+            temp_tbl->close();
+        }
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        ScopedTimer<MonotonicStopWatch> probe_timer(_probe_timer);
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        // if a table is empty, the result must be empty
+        if (_hash_tbl->size() == 0) {
+            break;
+        }
+    }
+    _hash_tbl_iterator = _hash_tbl->begin();
+    return Status::OK();
+}
+
+Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    *eos = true;
+    if (reached_limit()) {
+        return Status::OK();
+    }
+    uint32_t previous_hash = -1;
+    TupleRow* previous_row = nullptr;
+    while (_hash_tbl_iterator.has_next()) {
+        if (!_hash_tbl_iterator.matched()) {
+            if (previous_hash != _hash_tbl_iterator.get_hash() ||
+                !equals(previous_row, _hash_tbl_iterator.get_row())) {
+                int row_idx = out_batch->add_row();
+                TupleRow* out_row = out_batch->get_row(row_idx);
+                uint8_t* out_ptr = reinterpret_cast<uint8_t*>(out_row);
+                memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size);
+                out_batch->commit_last_row();
+                ++_num_rows_returned;
+            }
+        }
+        previous_hash = _hash_tbl_iterator.get_hash();
+        previous_row = _hash_tbl_iterator.get_row();
+        _hash_tbl_iterator.next<false>();
+
+        *eos = !_hash_tbl_iterator.has_next() || reached_limit();
+        if (out_batch->is_full() || out_batch->at_resource_limit() || *eos) {
+            return Status::OK();
+        }
+    }
+    return Status::OK();
+}
+
+bool ExceptNode::equals(TupleRow* row, TupleRow* other) {
+    if (row == nullptr && other == nullptr) {
+        return false;
+    }
+    if (row == nullptr || other == nullptr) {
+        return false;
+    }
+    for (int i = 0; i < _child_expr_lists[0].size(); ++i) {
+        void* val_row = _child_expr_lists[0][i]->get_value(row);
+        void* val_other = _child_expr_lists[0][i]->get_value(other);
+        if (val_row == nullptr && val_other == nullptr) {
+            continue;
 
 Review comment:
   if table a is 
   ```
   c1
   ---
   null
   ```
   
   table b is 
   ```
   c2
   ---
   null
   ```
   if caluse is 
   `select c1 from a except select c2 from b `
   will hit `val_row == nullptr && val_other == nullptr`
   null and null is equal

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangkaisen commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
kangkaisen commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r392209204
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +41,190 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
+    _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_build_tuple_size, true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table, use _child_expr_lists[0] as probe is used for remove duplicted
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[0], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        SCOPED_TIMER(_build_timer);
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        // build hash table and remvoe duplicate items
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert_unique(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // rebuid hash table, for first time will rebuild with the no duplicated _hash_tbl,
+        if (i > 1) {
+            SCOPED_TIMER(_build_timer);
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
+                        temp_tbl->insert(_hash_tbl_iterator.get_row());
+                    }
+                }
+                _hash_tbl_iterator.next<false>();
+            }
+            _hash_tbl.swap(temp_tbl);
+            temp_tbl->close();
+        }
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        ScopedTimer<MonotonicStopWatch> probe_timer(_probe_timer);
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        // if a table is empty, the result must be empty
+        if (_hash_tbl->size() == 0) {
+            break;
+        }
+    }
+    _hash_tbl_iterator = _hash_tbl->begin();
+    return Status::OK();
+}
+
+Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    *eos = true;
+    if (reached_limit()) {
+        return Status::OK();
+    }
+    uint32_t previous_hash = -1;
+    TupleRow* previous_row = nullptr;
+    while (_hash_tbl_iterator.has_next()) {
+        if (!_hash_tbl_iterator.matched()) {
+            if (previous_hash != _hash_tbl_iterator.get_hash() ||
+                !equals(previous_row, _hash_tbl_iterator.get_row())) {
+                int row_idx = out_batch->add_row();
+                TupleRow* out_row = out_batch->get_row(row_idx);
+                uint8_t* out_ptr = reinterpret_cast<uint8_t*>(out_row);
+                memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size);
+                out_batch->commit_last_row();
+                ++_num_rows_returned;
+            }
+        }
+        previous_hash = _hash_tbl_iterator.get_hash();
+        previous_row = _hash_tbl_iterator.get_row();
+        _hash_tbl_iterator.next<false>();
+
+        *eos = !_hash_tbl_iterator.has_next() || reached_limit();
+        if (out_batch->is_full() || out_batch->at_resource_limit() || *eos) {
+            return Status::OK();
+        }
+    }
+    return Status::OK();
+}
+
+bool ExceptNode::equals(TupleRow* row, TupleRow* other) {
+    if (row == nullptr && other == nullptr) {
+        return false;
+    }
+    if (row == nullptr || other == nullptr) {
+        return false;
+    }
+    for (int i = 0; i < _child_expr_lists[0].size(); ++i) {
+        void* val_row = _child_expr_lists[0][i]->get_value(row);
+        void* val_other = _child_expr_lists[0][i]->get_value(other);
+        if (val_row == nullptr && val_other == nullptr) {
+            continue;
 
 Review comment:
   If `_child_expr_lists[0].size` is 1. `ExceptNode::equals` will return true. which is your expectation.  which should be conflicting with `else if (val_row == nullptr || val_other == nullptr) {
               return false;`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r392244642
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +41,190 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
+    _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_build_tuple_size, true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table, use _child_expr_lists[0] as probe is used for remove duplicted
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[0], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        SCOPED_TIMER(_build_timer);
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        // build hash table and remvoe duplicate items
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert_unique(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // rebuid hash table, for first time will rebuild with the no duplicated _hash_tbl,
+        if (i > 1) {
+            SCOPED_TIMER(_build_timer);
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
+                        temp_tbl->insert(_hash_tbl_iterator.get_row());
+                    }
+                }
+                _hash_tbl_iterator.next<false>();
+            }
+            _hash_tbl.swap(temp_tbl);
+            temp_tbl->close();
+        }
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        ScopedTimer<MonotonicStopWatch> probe_timer(_probe_timer);
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        // if a table is empty, the result must be empty
+        if (_hash_tbl->size() == 0) {
+            break;
+        }
+    }
+    _hash_tbl_iterator = _hash_tbl->begin();
+    return Status::OK();
+}
+
+Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    *eos = true;
+    if (reached_limit()) {
+        return Status::OK();
+    }
+    uint32_t previous_hash = -1;
+    TupleRow* previous_row = nullptr;
+    while (_hash_tbl_iterator.has_next()) {
+        if (!_hash_tbl_iterator.matched()) {
+            if (previous_hash != _hash_tbl_iterator.get_hash() ||
+                !equals(previous_row, _hash_tbl_iterator.get_row())) {
+                int row_idx = out_batch->add_row();
+                TupleRow* out_row = out_batch->get_row(row_idx);
+                uint8_t* out_ptr = reinterpret_cast<uint8_t*>(out_row);
+                memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size);
+                out_batch->commit_last_row();
+                ++_num_rows_returned;
+            }
+        }
+        previous_hash = _hash_tbl_iterator.get_hash();
+        previous_row = _hash_tbl_iterator.get_row();
+        _hash_tbl_iterator.next<false>();
+
+        *eos = !_hash_tbl_iterator.has_next() || reached_limit();
+        if (out_batch->is_full() || out_batch->at_resource_limit() || *eos) {
+            return Status::OK();
+        }
+    }
+    return Status::OK();
+}
+
+bool ExceptNode::equals(TupleRow* row, TupleRow* other) {
+    if (row == nullptr && other == nullptr) {
 
 Review comment:
   > this for a safe guard
   
   I think kaisen means this `if (row == nullptr && other == nullptr)` clause is meaningless because you write another `if (row == nullptr || other == nullptr)` below.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman merged pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
morningman merged pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangkaisen commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
kangkaisen commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r392178079
 
 

 ##########
 File path: be/src/exec/exec_node.cpp
 ##########
 @@ -446,9 +446,9 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
         *node = pool->add(new IntersectNode(pool, tnode, descs));
         return Status::OK();
 
-    // case TPlanNodeType::EXCEPT_NODE:
-    //     *node = pool->add(new ExceptNode(pool, tnode, descs));
-    //     return Status::OK();
 
 Review comment:
   Remove this line.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangkaisen commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
kangkaisen commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r392206863
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +41,190 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
+    _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_build_tuple_size, true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table, use _child_expr_lists[0] as probe is used for remove duplicted
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[0], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        SCOPED_TIMER(_build_timer);
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        // build hash table and remvoe duplicate items
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert_unique(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // rebuid hash table, for first time will rebuild with the no duplicated _hash_tbl,
+        if (i > 1) {
+            SCOPED_TIMER(_build_timer);
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
+                        temp_tbl->insert(_hash_tbl_iterator.get_row());
+                    }
+                }
+                _hash_tbl_iterator.next<false>();
+            }
+            _hash_tbl.swap(temp_tbl);
+            temp_tbl->close();
+        }
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        ScopedTimer<MonotonicStopWatch> probe_timer(_probe_timer);
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        // if a table is empty, the result must be empty
+        if (_hash_tbl->size() == 0) {
+            break;
+        }
+    }
+    _hash_tbl_iterator = _hash_tbl->begin();
+    return Status::OK();
+}
+
+Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    *eos = true;
+    if (reached_limit()) {
+        return Status::OK();
+    }
+    uint32_t previous_hash = -1;
+    TupleRow* previous_row = nullptr;
+    while (_hash_tbl_iterator.has_next()) {
+        if (!_hash_tbl_iterator.matched()) {
+            if (previous_hash != _hash_tbl_iterator.get_hash() ||
+                !equals(previous_row, _hash_tbl_iterator.get_row())) {
+                int row_idx = out_batch->add_row();
+                TupleRow* out_row = out_batch->get_row(row_idx);
+                uint8_t* out_ptr = reinterpret_cast<uint8_t*>(out_row);
+                memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size);
+                out_batch->commit_last_row();
+                ++_num_rows_returned;
+            }
+        }
+        previous_hash = _hash_tbl_iterator.get_hash();
+        previous_row = _hash_tbl_iterator.get_row();
+        _hash_tbl_iterator.next<false>();
+
+        *eos = !_hash_tbl_iterator.has_next() || reached_limit();
+        if (out_batch->is_full() || out_batch->at_resource_limit() || *eos) {
+            return Status::OK();
+        }
+    }
+    return Status::OK();
+}
+
+bool ExceptNode::equals(TupleRow* row, TupleRow* other) {
+    if (row == nullptr && other == nullptr) {
 
 Review comment:
   I don't see any meaning.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
yangzhg commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r390734537
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +40,157 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_child_expr_lists.size(), true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        if (i != _children.size() - 1) {
+            // if this hash table is probed rebuild it
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
 
 Review comment:
    a excepte b , use a to build a hash table and use b to probe
    the result is  element in a and never mached

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
yangzhg commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r392203989
 
 

 ##########
 File path: be/src/exec/exec_node.cpp
 ##########
 @@ -446,9 +446,9 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
         *node = pool->add(new IntersectNode(pool, tnode, descs));
         return Status::OK();
 
-    // case TPlanNodeType::EXCEPT_NODE:
-    //     *node = pool->add(new ExceptNode(pool, tnode, descs));
-    //     return Status::OK();
 
 Review comment:
   comment in wrong place ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangkaisen commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
kangkaisen commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r392184633
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +41,190 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
+    _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_build_tuple_size, true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table, use _child_expr_lists[0] as probe is used for remove duplicted
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[0], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        SCOPED_TIMER(_build_timer);
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        // build hash table and remvoe duplicate items
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert_unique(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // rebuid hash table, for first time will rebuild with the no duplicated _hash_tbl,
+        if (i > 1) {
+            SCOPED_TIMER(_build_timer);
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
+                        temp_tbl->insert(_hash_tbl_iterator.get_row());
+                    }
+                }
+                _hash_tbl_iterator.next<false>();
+            }
+            _hash_tbl.swap(temp_tbl);
+            temp_tbl->close();
+        }
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        ScopedTimer<MonotonicStopWatch> probe_timer(_probe_timer);
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        // if a table is empty, the result must be empty
+        if (_hash_tbl->size() == 0) {
+            break;
+        }
+    }
+    _hash_tbl_iterator = _hash_tbl->begin();
+    return Status::OK();
+}
+
+Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    *eos = true;
+    if (reached_limit()) {
+        return Status::OK();
+    }
+    uint32_t previous_hash = -1;
+    TupleRow* previous_row = nullptr;
+    while (_hash_tbl_iterator.has_next()) {
+        if (!_hash_tbl_iterator.matched()) {
+            if (previous_hash != _hash_tbl_iterator.get_hash() ||
+                !equals(previous_row, _hash_tbl_iterator.get_row())) {
+                int row_idx = out_batch->add_row();
+                TupleRow* out_row = out_batch->get_row(row_idx);
+                uint8_t* out_ptr = reinterpret_cast<uint8_t*>(out_row);
+                memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size);
+                out_batch->commit_last_row();
+                ++_num_rows_returned;
+            }
+        }
+        previous_hash = _hash_tbl_iterator.get_hash();
+        previous_row = _hash_tbl_iterator.get_row();
+        _hash_tbl_iterator.next<false>();
+
+        *eos = !_hash_tbl_iterator.has_next() || reached_limit();
+        if (out_batch->is_full() || out_batch->at_resource_limit() || *eos) {
+            return Status::OK();
+        }
+    }
+    return Status::OK();
+}
+
+bool ExceptNode::equals(TupleRow* row, TupleRow* other) {
+    if (row == nullptr && other == nullptr) {
 
 Review comment:
   This if statement could remove ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
yangzhg commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r390734954
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +40,157 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_child_expr_lists.size(), true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        if (i != _children.size() - 1) {
+            // if this hash table is probed rebuild it
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
+                        temp_tbl->insert(_hash_tbl_iterator.get_row());
+                    }
+                }
+                _hash_tbl_iterator.next<false>();
+            }
+            _hash_tbl.swap(temp_tbl);
+            temp_tbl->close();
+        }
+        // if a table is empty, the result must be empty
+        if (_hash_tbl->size() == 0) {
+            break;
+        }
+    }
+    _hash_tbl_iterator = _hash_tbl->begin();
+    return Status::OK();
+}
+
+Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    *eos = true;
+    if (reached_limit()) {
+        return Status::OK();
+    }
+    uint32_t previous_hash = -1;
+    while (_hash_tbl_iterator.has_next()) {
 
 Review comment:
   because hash value  is a unsigned int 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wutiangan commented on a change in pull request #3056: implement except node

Posted by GitBox <gi...@apache.org>.
wutiangan commented on a change in pull request #3056: implement except node
URL: https://github.com/apache/incubator-doris/pull/3056#discussion_r390732016
 
 

 ##########
 File path: be/src/exec/except_node.cpp
 ##########
 @@ -56,4 +40,157 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
     }
     return Status::OK();
 }
-}
\ No newline at end of file
+
+Status ExceptNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_pool.reset(new MemPool(mem_tracker()));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
+        RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
+                                      expr_mem_tracker()));
+    }
+    _build_tuple_size = child(0)->row_desc().tuple_descriptors().size();
+    _build_tuple_row_size = _build_tuple_size * sizeof(Tuple*);
+    _build_tuple_idx.reserve(_build_tuple_size);
+
+    for (int i = 0; i < _build_tuple_size; ++i) {
+        TupleDescriptor* build_tuple_desc = child(0)->row_desc().tuple_descriptors()[i];
+        _build_tuple_idx.push_back(_row_descriptor.get_tuple_idx(build_tuple_desc->id()));
+    }
+    _find_nulls = std::vector<bool>(_child_expr_lists.size(), true);
+    return Status::OK();
+}
+Status ExceptNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    for (auto& exprs : _child_expr_lists) {
+        Expr::close(exprs, state);
+    }
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    // Must reset _probe_batch in close() to release resources
+    _probe_batch.reset(NULL);
+
+    if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) {
+        COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes());
+        COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size());
+    }
+    if (_hash_tbl.get() != NULL) {
+        _hash_tbl->close();
+    }
+    if (_build_pool.get() != NULL) {
+        _build_pool->free_all();
+    }
+    return ExecNode::close(state);
+}
+Status ExceptNode::open(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    // open result expr lists.
+    for (const vector<ExprContext*>& exprs : _child_expr_lists) {
+        RETURN_IF_ERROR(Expr::open(exprs, state));
+    }
+    // initial build hash table
+    _hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+    RowBatch build_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
+    RETURN_IF_ERROR(child(0)->open(state));
+
+    bool eos = false;
+    while (!eos) {
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(child(0)->get_next(state, &build_batch, &eos));
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, " Except, while constructing the hash table.");
+        for (int i = 0; i < build_batch.num_rows(); ++i) {
+            _hash_tbl->insert(build_batch.get_row(i));
+        }
+        VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true, &child(0)->row_desc());
+        build_batch.reset();
+    }
+    // if a table is empty, the result must be empty
+
+    if (_hash_tbl->size() == 0) {
+        _hash_tbl_iterator = _hash_tbl->begin();
+        return Status::OK();
+    }
+
+    for (int i = 1; i < _children.size(); ++i) {
+        // probe
+        _probe_batch.reset(new RowBatch(child(i)->row_desc(), state->batch_size(), mem_tracker()));
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        while (!eos) {
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(), &eos));
+            RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash table.");
+            for (int j = 0; j < _probe_batch->num_rows(); ++j) {
+                _hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
+                if (_hash_tbl_iterator != _hash_tbl->end()) {
+                    _hash_tbl_iterator.set_matched();
+                }
+            }
+            _probe_batch->reset();
+        }
+        if (i != _children.size() - 1) {
+            // if this hash table is probed rebuild it
+            std::unique_ptr<HashTable> temp_tbl(
+                    new HashTable(_child_expr_lists[0], _child_expr_lists[i], _build_tuple_size,
+                                  true, _find_nulls, id(), mem_tracker(), 1024));
+            _hash_tbl_iterator = _hash_tbl->begin();
+            uint32_t previous_hash = -1;
+            while (_hash_tbl_iterator.has_next()) {
+                if (previous_hash != _hash_tbl_iterator.get_hash()) {
+                    previous_hash = _hash_tbl_iterator.get_hash();
+                    if (!_hash_tbl_iterator.matched()) {
+                        temp_tbl->insert(_hash_tbl_iterator.get_row());
+                    }
+                }
+                _hash_tbl_iterator.next<false>();
+            }
+            _hash_tbl.swap(temp_tbl);
+            temp_tbl->close();
+        }
+        // if a table is empty, the result must be empty
+        if (_hash_tbl->size() == 0) {
+            break;
+        }
+    }
+    _hash_tbl_iterator = _hash_tbl->begin();
+    return Status::OK();
+}
+
+Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    *eos = true;
+    if (reached_limit()) {
+        return Status::OK();
+    }
+    uint32_t previous_hash = -1;
+    while (_hash_tbl_iterator.has_next()) {
+        if (previous_hash != _hash_tbl_iterator.get_hash()) {
+            previous_hash = _hash_tbl_iterator.get_hash();
+            if (!_hash_tbl_iterator.matched()) {
+                int row_idx = out_batch->add_row();
+                TupleRow* out_row = out_batch->get_row(row_idx);
+                uint8_t* out_ptr = reinterpret_cast<uint8_t*>(out_row);
+                memcpy(out_ptr, _hash_tbl_iterator.get_row(), _build_tuple_row_size);
+                out_batch->commit_last_row();
+            }
+        }
 
 Review comment:
   If the two hash values are equal, it does not mean that the two values are equal. You need to compare the original values.
   There may be a problem with the else's logic

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org