You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/04/19 18:44:19 UTC
[06/24] incubator-quickstep git commit: Adds backend support for hash
semi/anti joins. (#164)
Adds backend support for hash semi/anti joins. (#164)
* Added implementations for HashSemiJoin and HashAntiJoin operators.
* Added component in ExecutionGenerator to convert semi/anti join nodes into relational operators.
* Add 'this' pointer in anonymous functionfor gcc build
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a39ad965
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a39ad965
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a39ad965
Branch: refs/heads/master
Commit: a39ad9654f10fbe67a90cb7b65dc30dc797f55b3
Parents: 914f2d8
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Thu Apr 14 16:28:24 2016 -0500
Committer: Jignesh Patel <pa...@users.noreply.github.com>
Committed: Thu Apr 14 16:28:24 2016 -0500
----------------------------------------------------------------------
query_optimizer/ExecutionGenerator.cpp | 42 +-
.../tests/execution_generator/Select.test | 43 +++
relational_operators/CMakeLists.txt | 2 +
relational_operators/HashJoinOperator.cpp | 360 +++++++++++++++--
relational_operators/HashJoinOperator.hpp | 350 ++++++++++++++---
relational_operators/WorkOrder.proto | 58 ++-
relational_operators/WorkOrderFactory.cpp | 211 ++++++++--
storage/HashTable.hpp | 387 +++++++++++++++++++
storage/LinearOpenAddressingHashTable.hpp | 72 ++++
storage/SeparateChainingHashTable.hpp | 54 +++
.../SimpleScalarSeparateChainingHashTable.hpp | 37 ++
storage/StorageBlock.hpp | 20 +
12 files changed, 1506 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index cf90be7..aa6b0dc 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -23,6 +23,7 @@
#include <cstddef>
#include <memory>
#include <string>
+#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>
@@ -570,16 +571,19 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
key_types.push_back(&left_attribute_type);
}
- // Choose the smaller table as the inner build table,
- // and the other one as the outer probe table.
std::size_t probe_cardinality = cost_model_->estimateCardinality(probe_physical);
std::size_t build_cardinality = cost_model_->estimateCardinality(build_physical);
- if (probe_cardinality < build_cardinality) {
- // Switch the probe and build physical nodes.
- std::swap(probe_physical, build_physical);
- std::swap(probe_cardinality, build_cardinality);
- std::swap(probe_attribute_ids, build_attribute_ids);
- std::swap(any_probe_attributes_nullable, any_build_attributes_nullable);
+ // For inner join, we may swap the probe table and the build table.
+ if (physical_plan->join_type() == P::HashJoin::JoinType::kInnerJoin) {
+ // Choose the smaller table as the inner build table,
+ // and the other one as the outer probe table.
+ if (probe_cardinality < build_cardinality) {
+ // Switch the probe and build physical nodes.
+ std::swap(probe_physical, build_physical);
+ std::swap(probe_cardinality, build_cardinality);
+ std::swap(probe_attribute_ids, build_attribute_ids);
+ std::swap(any_probe_attributes_nullable, any_build_attributes_nullable);
+ }
}
// Convert the residual predicate proto.
@@ -647,6 +651,25 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
&output_relation,
insert_destination_proto);
+ // Get JoinType
+ HashJoinOperator::JoinType join_type;
+ switch (physical_plan->join_type()) {
+ case P::HashJoin::JoinType::kInnerJoin:
+ join_type = HashJoinOperator::JoinType::kInnerJoin;
+ break;
+ case P::HashJoin::JoinType::kLeftSemiJoin:
+ join_type = HashJoinOperator::JoinType::kLeftSemiJoin;
+ break;
+ case P::HashJoin::JoinType::kLeftAntiJoin:
+ join_type = HashJoinOperator::JoinType::kLeftAntiJoin;
+ break;
+ default:
+ LOG(FATAL) << "Invalid physical::HashJoin::JoinType: "
+ << static_cast<typename std::underlying_type<P::HashJoin::JoinType>::type>(
+ physical_plan->join_type());
+ }
+
+ // Create hash join operator
const QueryPlan::DAGNodeIndex join_operator_index =
execution_plan_->addRelationalOperator(
new HashJoinOperator(
@@ -659,7 +682,8 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
insert_destination_index,
join_hash_table_index,
residual_predicate_index,
- project_expressions_group_index));
+ project_expressions_group_index,
+ join_type));
insert_destination_proto->set_relational_op_index(join_operator_index);
const QueryPlan::DAGNodeIndex destroy_operator_index =
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/query_optimizer/tests/execution_generator/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Select.test b/query_optimizer/tests/execution_generator/Select.test
index e1614cb..9bfa27c 100644
--- a/query_optimizer/tests/execution_generator/Select.test
+++ b/query_optimizer/tests/execution_generator/Select.test
@@ -721,6 +721,49 @@ WHERE CASE WHEN i < j THEN i
+-----------+-----------+
==
+SELECT i AS odd
+FROM generate_series(0, 10, 1) AS gs1(i)
+WHERE
+ NOT EXISTS (
+ SELECT *
+ FROM generate_series(0, 10, 2) AS gs2(even)
+ WHERE i = even
+ );
+--
++-----------+
+|odd |
++-----------+
+| 1|
+| 3|
+| 5|
+| 7|
+| 9|
++-----------+
+==
+
+SELECT i
+FROM generate_series(0, 100, 3) AS gs1(i)
+WHERE
+ EXISTS (
+ SELECT *
+ FROM generate_series(0, 100, 5) AS gs2(i)
+ WHERE gs1.i = gs2.i
+ )
+ AND NOT EXISTS (
+ SELECT *
+ FROM generate_series(0, 100, 10) AS gs3(i)
+ WHERE gs1.i = gs3.i
+ )
+ AND (i < 40 OR i > 60);
+--
++-----------+
+|i |
++-----------+
+| 15|
+| 75|
++-----------+
+==
+
# TODO(team): Support uncorrelated queries.
# SELECT COUNT(*)
# FROM test
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 17a9a6f..b02bc6b 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -176,11 +176,13 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator
quickstep_storage_StorageBlock
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageManager
+ quickstep_storage_SubBlocksReference
quickstep_storage_TupleReference
quickstep_storage_TupleStorageSubBlock
quickstep_storage_ValueAccessor
quickstep_types_containers_ColumnVectorsValueAccessor
quickstep_utility_Macros
+ quickstep_utility_PtrList
tmb)
target_link_libraries(quickstep_relationaloperators_InsertOperator
glog
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index f7bbf38..e0076e3 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -35,6 +35,7 @@
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
#include "storage/TupleReference.hpp"
#include "storage/TupleStorageSubBlock.hpp"
#include "storage/ValueAccessor.hpp"
@@ -85,7 +86,7 @@ class MapBasedJoinedTupleCollector {
// Consolidation is a no-op for this version, but we provide this trivial
// call so that MapBasedJoinedTupleCollector and
// VectorBasedJoinedTupleCollector have the same interface and can both be
- // used in the templated HashJoinWorkOrder::executeWithCollectorType() method.
+ // used in the templated HashInnerJoinWorkOrder::executeWithCollectorType() method.
inline void consolidate() const {
}
@@ -183,6 +184,25 @@ class VectorBasedJoinedTupleCollector {
consolidated_joined_tuples_;
};
+class SemiAntiJoinTupleCollector {
+ public:
+ explicit SemiAntiJoinTupleCollector(const TupleStorageSubBlock &tuple_store) {
+ filter_.reset(tuple_store.getExistenceMap());
+ }
+
+ template <typename ValueAccessorT>
+ inline void operator()(const ValueAccessorT &accessor) {
+ filter_->set(accessor.getCurrentPosition(), false);
+ }
+
+ const TupleIdSequence* filter() const {
+ return filter_.get();
+ }
+
+ private:
+ std::unique_ptr<TupleIdSequence> filter_;
+};
+
} // namespace
bool HashJoinOperator::getAllWorkOrders(
@@ -191,31 +211,53 @@ bool HashJoinOperator::getAllWorkOrders(
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
+ switch (join_type_) {
+ case JoinType::kInnerJoin:
+ return getAllNonOuterJoinWorkOrders<HashInnerJoinWorkOrder>(
+ container, query_context, storage_manager);
+ case JoinType::kLeftSemiJoin:
+ return getAllNonOuterJoinWorkOrders<HashSemiJoinWorkOrder>(
+ container, query_context, storage_manager);
+ case JoinType::kLeftAntiJoin:
+ return getAllNonOuterJoinWorkOrders<HashAntiJoinWorkOrder>(
+ container, query_context, storage_manager);
+ default:
+ LOG(FATAL) << "Unknown join type in HashJoinOperator::getAllWorkOrders()";
+ }
+}
+
+template <class JoinWorkOrderClass>
+bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
+ WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager) {
// We wait until the building of global hash table is complete.
if (blocking_dependencies_met_) {
DCHECK(query_context != nullptr);
- const Predicate *residual_predicate = query_context->getPredicate(residual_predicate_index_);
+ const Predicate *residual_predicate =
+ query_context->getPredicate(residual_predicate_index_);
const vector<unique_ptr<const Scalar>> &selection =
query_context->getScalarGroup(selection_index_);
InsertDestination *output_destination =
query_context->getInsertDestination(output_destination_index_);
- JoinHashTable *hash_table = query_context->getJoinHashTable(hash_table_index_);
+ const JoinHashTable &hash_table =
+ *(query_context->getJoinHashTable(hash_table_index_));
if (probe_relation_is_stored_) {
if (!started_) {
for (const block_id probe_block_id : probe_relation_block_ids_) {
container->addNormalWorkOrder(
- new HashJoinWorkOrder(build_relation_,
- probe_relation_,
- join_key_attributes_,
- any_join_key_attributes_nullable_,
- probe_block_id,
- residual_predicate,
- selection,
- output_destination,
- hash_table,
- storage_manager),
+ new JoinWorkOrderClass(build_relation_,
+ probe_relation_,
+ join_key_attributes_,
+ any_join_key_attributes_nullable_,
+ probe_block_id,
+ residual_predicate,
+ selection,
+ hash_table,
+ output_destination,
+ storage_manager),
op_index_);
}
started_ = true;
@@ -224,27 +266,26 @@ bool HashJoinOperator::getAllWorkOrders(
} else {
while (num_workorders_generated_ < probe_relation_block_ids_.size()) {
container->addNormalWorkOrder(
- new HashJoinWorkOrder(
- build_relation_,
- probe_relation_,
- join_key_attributes_,
- any_join_key_attributes_nullable_,
- probe_relation_block_ids_[num_workorders_generated_],
- residual_predicate,
- selection,
- output_destination,
- hash_table,
- storage_manager),
+ new JoinWorkOrderClass(build_relation_,
+ probe_relation_,
+ join_key_attributes_,
+ any_join_key_attributes_nullable_,
+ probe_relation_block_ids_[num_workorders_generated_],
+ residual_predicate,
+ selection,
+ hash_table,
+ output_destination,
+ storage_manager),
op_index_);
++num_workorders_generated_;
} // end while
return done_feeding_input_relation_;
- } // end else (input_relation_is_stored is false)
- } // end if (blocking_dependencies_met)
+ } // end else (probe_relation_is_stored_)
+ } // end if (blocking_dependencies_met_)
return false;
}
-void HashJoinWorkOrder::execute() {
+void HashInnerJoinWorkOrder::execute() {
if (FLAGS_vector_based_joined_tuple_collector) {
executeWithCollectorType<VectorBasedJoinedTupleCollector>();
} else {
@@ -253,7 +294,7 @@ void HashJoinWorkOrder::execute() {
}
template <typename CollectorT>
-void HashJoinWorkOrder::executeWithCollectorType() {
+void HashInnerJoinWorkOrder::executeWithCollectorType() {
BlockReference probe_block(
storage_manager_->getBlock(block_id_, probe_relation_));
const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
@@ -261,13 +302,13 @@ void HashJoinWorkOrder::executeWithCollectorType() {
std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
CollectorT collector;
if (join_key_attributes_.size() == 1) {
- hash_table_->getAllFromValueAccessor(
+ hash_table_.getAllFromValueAccessor(
probe_accessor.get(),
join_key_attributes_.front(),
any_join_key_attributes_nullable_,
&collector);
} else {
- hash_table_->getAllFromValueAccessorCompositeKey(
+ hash_table_.getAllFromValueAccessorCompositeKey(
probe_accessor.get(),
join_key_attributes_,
any_join_key_attributes_nullable_,
@@ -348,4 +389,263 @@ void HashJoinWorkOrder::executeWithCollectorType() {
}
}
+void HashSemiJoinWorkOrder::execute() {
+ if (residual_predicate_ == nullptr) {
+ executeWithoutResidualPredicate();
+ } else {
+ executeWithResidualPredicate();
+ }
+}
+
+void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
+ const relation_id build_relation_id = build_relation_.getID();
+ const relation_id probe_relation_id = probe_relation_.getID();
+
+ BlockReference probe_block = storage_manager_->getBlock(block_id_,
+ probe_relation_);
+ const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+ std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+
+ // TODO(harshad) - Make this function work with both types of collectors.
+
+ // We collect all the matching probe relation tuples, as there's a residual
+ // preidcate that needs to be applied after collecting these matches.
+ MapBasedJoinedTupleCollector collector;
+ if (join_key_attributes_.size() == 1) {
+ hash_table_.getAllFromValueAccessor(
+ probe_accessor.get(),
+ join_key_attributes_.front(),
+ any_join_key_attributes_nullable_,
+ &collector);
+ } else {
+ hash_table_.getAllFromValueAccessorCompositeKey(
+ probe_accessor.get(),
+ join_key_attributes_,
+ any_join_key_attributes_nullable_,
+ &collector);
+ }
+
+ // Get a filter for tuples in the given probe block.
+ TupleIdSequence filter(probe_store.getMaxTupleID() + 1);
+ filter.setRange(0, filter.length(), false);
+ for (const std::pair<const block_id,
+ std::vector<std::pair<tuple_id, tuple_id>>>
+ &build_block_entry : *collector.getJoinedTuples()) {
+ // First element of the pair build_block_entry is the build block ID
+ // 2nd element of the pair is a vector of pairs, in each of which -
+ // 1st element is a matching tuple ID from the inner (build) relation.
+ // 2nd element is a matching tuple ID from the outer (probe) relation.
+
+ // Get the block from the build relation for this pair of matched tuples.
+ BlockReference build_block =
+ storage_manager_->getBlock(build_block_entry.first, build_relation_);
+ const TupleStorageSubBlock &build_store =
+ build_block->getTupleStorageSubBlock();
+ std::unique_ptr<ValueAccessor> build_accessor(
+ build_store.createValueAccessor());
+ for (const std::pair<tuple_id, tuple_id> &hash_match
+ : build_block_entry.second) {
+ // For each pair, 1st element is a tuple ID from the build relation in the
+ // given build block, 2nd element is a tuple ID from the probe relation.
+ if (filter.get(hash_match.second)) {
+ // We have already found matches for this tuple that belongs to the
+ // probe side, skip it.
+ continue;
+ }
+ if (residual_predicate_->matchesForJoinedTuples(*build_accessor,
+ build_relation_id,
+ hash_match.first,
+ *probe_accessor,
+ probe_relation_id,
+ hash_match.second)) {
+ filter.set(hash_match.second, true);
+ }
+ }
+ }
+
+ SubBlocksReference sub_blocks_ref(probe_store,
+ probe_block->getIndices(),
+ probe_block->getIndicesConsistent());
+
+ std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+ probe_store.createValueAccessor(&filter));
+ ColumnVectorsValueAccessor temp_result;
+ for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
+ selection_it != selection_.end();
+ ++selection_it) {
+ temp_result.addColumn((*selection_it)->getAllValues(
+ probe_accessor_with_filter.get(), &sub_blocks_ref));
+ }
+
+ output_destination_->bulkInsertTuples(&temp_result);
+}
+
+void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() {
+ DCHECK(residual_predicate_ == nullptr);
+
+ BlockReference probe_block = storage_manager_->getBlock(block_id_,
+ probe_relation_);
+ const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+ std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+ SemiAntiJoinTupleCollector collector(probe_store);
+ // We collect all the probe relation tuples which have at least one matching
+ // tuple in the build relation. As a performance optimization, the hash table
+ // just looks for the existence of the probing key in the hash table and sets
+ // the bit for the probing key in the collector. The optimization works
+ // because there is no residual predicate in this case, unlike
+ // executeWithResidualPredicate().
+ if (join_key_attributes_.size() == 1u) {
+ // Call the collector to set the bit to 0 for every key without a match.
+ hash_table_.runOverKeysFromValueAccessorIfMatchNotFound(
+ probe_accessor.get(),
+ join_key_attributes_.front(),
+ any_join_key_attributes_nullable_,
+ &collector);
+ } else {
+ // Call the collector to set the bit to 0 for every key without a match.
+ hash_table_.runOverKeysFromValueAccessorIfMatchNotFoundCompositeKey(
+ probe_accessor.get(),
+ join_key_attributes_,
+ any_join_key_attributes_nullable_,
+ &collector);
+ }
+
+ SubBlocksReference sub_blocks_ref(probe_store,
+ probe_block->getIndices(),
+ probe_block->getIndicesConsistent());
+
+ std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+ probe_store.createValueAccessor(collector.filter()));
+ ColumnVectorsValueAccessor temp_result;
+ for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
+ selection_it != selection_.end(); ++selection_it) {
+ temp_result.addColumn((*selection_it)->getAllValues(
+ probe_accessor_with_filter.get(), &sub_blocks_ref));
+ }
+
+ output_destination_->bulkInsertTuples(&temp_result);
+}
+
+void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() {
+ DCHECK(residual_predicate_ == nullptr);
+
+ BlockReference probe_block = storage_manager_->getBlock(block_id_,
+ probe_relation_);
+ const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+ std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+ SemiAntiJoinTupleCollector collector(probe_store);
+ // We probe the hash table to find the keys which have an entry in the
+ // hash table.
+ if (join_key_attributes_.size() == 1) {
+ // Call the collector to set the bit to 0 for every key with a match.
+ hash_table_.runOverKeysFromValueAccessorIfMatchFound(
+ probe_accessor.get(),
+ join_key_attributes_.front(),
+ any_join_key_attributes_nullable_,
+ &collector);
+ } else {
+ // Call the collector to set the bit to 0 for every key with a match.
+ hash_table_.runOverKeysFromValueAccessorIfMatchFoundCompositeKey(
+ probe_accessor.get(),
+ join_key_attributes_,
+ any_join_key_attributes_nullable_,
+ &collector);
+ }
+
+ SubBlocksReference sub_blocks_ref(probe_store,
+ probe_block->getIndices(),
+ probe_block->getIndicesConsistent());
+
+ std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+ probe_store.createValueAccessor(collector.filter()));
+ ColumnVectorsValueAccessor temp_result;
+ for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
+ selection_it != selection_.end(); ++selection_it) {
+ temp_result.addColumn((*selection_it)->getAllValues(
+ probe_accessor_with_filter.get(), &sub_blocks_ref));
+ }
+
+ output_destination_->bulkInsertTuples(&temp_result);
+}
+
+void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
+ const relation_id build_relation_id = build_relation_.getID();
+ const relation_id probe_relation_id = probe_relation_.getID();
+
+ BlockReference probe_block = storage_manager_->getBlock(block_id_,
+ probe_relation_);
+ const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+ std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+ // TODO(harshad) - Make the following code work with both types of collectors.
+ MapBasedJoinedTupleCollector collector;
+ // We probe the hash table and get all the matches. Unlike
+ // executeWithoutResidualPredicate(), we have to collect all the matching
+ // tuples, because after this step we still have to evalute the residual
+ // predicate.
+ if (join_key_attributes_.size() == 1) {
+ hash_table_.getAllFromValueAccessor(
+ probe_accessor.get(),
+ join_key_attributes_.front(),
+ any_join_key_attributes_nullable_,
+ &collector);
+ } else {
+ hash_table_.getAllFromValueAccessorCompositeKey(
+ probe_accessor.get(),
+ join_key_attributes_,
+ any_join_key_attributes_nullable_,
+ &collector);
+ }
+
+ // Create a filter for all the tuples from the given probe block.
+ std::unique_ptr<TupleIdSequence> filter(probe_store.getExistenceMap());
+ for (const std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>
+ &build_block_entry : *collector.getJoinedTuples()) {
+ // First element of the pair build_block_entry is the build block ID
+ // 2nd element of the pair is a vector of pairs, in each of which -
+ // 1st element is a matching tuple ID from the inner (build) relation.
+ // 2nd element is a matching tuple ID from the outer (probe) relation.
+ BlockReference build_block = storage_manager_->getBlock(build_block_entry.first,
+ build_relation_);
+ const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock();
+ std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor());
+ for (const std::pair<tuple_id, tuple_id> &hash_match
+ : build_block_entry.second) {
+ if (!filter->get(hash_match.second)) {
+ // We have already seen this tuple, skip it.
+ continue;
+ }
+ if (residual_predicate_->matchesForJoinedTuples(*build_accessor,
+ build_relation_id,
+ hash_match.first,
+ *probe_accessor,
+ probe_relation_id,
+ hash_match.second)) {
+ // Note that the filter marks a match as false, as needed by the anti
+ // join definition.
+ filter->set(hash_match.second, false);
+ }
+ }
+ }
+
+ SubBlocksReference sub_blocks_ref(probe_store,
+ probe_block->getIndices(),
+ probe_block->getIndicesConsistent());
+
+ std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+ probe_store.createValueAccessor(filter.get()));
+ ColumnVectorsValueAccessor temp_result;
+ for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
+ selection_it != selection_.end();
+ ++selection_it) {
+ temp_result.addColumn((*selection_it)->getAllValues(probe_accessor_with_filter.get(),
+ &sub_blocks_ref));
+ }
+
+ output_destination_->bulkInsertTuples(&temp_result);
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index a00e590..c22f435 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -31,6 +31,7 @@
#include "storage/HashTable.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
+#include "utility/PtrList.hpp"
#include "glog/logging.h"
@@ -52,11 +53,17 @@ class WorkOrdersContainer;
*/
/**
- * @brief An operator which performs a hash-join on two
- * relations.
+ * @brief An operator which performs a hash-join, including inner-join,
+ * semi-join, anti-join and outer-join on two relations.
**/
class HashJoinOperator : public RelationalOperator {
public:
+ enum class JoinType {
+ kInnerJoin = 0,
+ kLeftSemiJoin,
+ kLeftAntiJoin
+ };
+
/**
* @brief Constructor.
*
@@ -98,6 +105,7 @@ class HashJoinOperator : public RelationalOperator {
* corresponding to the attributes of the relation referred by
* output_relation_id. Each Scalar is evaluated for the joined tuples,
* and the resulting value is inserted into the join result.
+ * @param join_type The type of join corresponding to this operator.
**/
HashJoinOperator(const CatalogRelation &build_relation,
const CatalogRelation &probe_relation,
@@ -108,21 +116,24 @@ class HashJoinOperator : public RelationalOperator {
const QueryContext::insert_destination_id output_destination_index,
const QueryContext::join_hash_table_id hash_table_index,
const QueryContext::predicate_id residual_predicate_index,
- const QueryContext::scalar_group_id selection_index)
- : build_relation_(build_relation),
- probe_relation_(probe_relation),
- probe_relation_is_stored_(probe_relation_is_stored),
- join_key_attributes_(join_key_attributes),
- any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
- output_relation_(output_relation),
- output_destination_index_(output_destination_index),
- hash_table_index_(hash_table_index),
- residual_predicate_index_(residual_predicate_index),
- selection_index_(selection_index),
- probe_relation_block_ids_(probe_relation_is_stored ? probe_relation.getBlocksSnapshot()
- : std::vector<block_id>()),
- num_workorders_generated_(0),
- started_(false) {}
+ const QueryContext::scalar_group_id selection_index,
+ const JoinType join_type = JoinType::kInnerJoin)
+ : build_relation_(build_relation),
+ probe_relation_(probe_relation),
+ probe_relation_is_stored_(probe_relation_is_stored),
+ join_key_attributes_(join_key_attributes),
+ any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+ output_relation_(output_relation),
+ output_destination_index_(output_destination_index),
+ hash_table_index_(hash_table_index),
+ residual_predicate_index_(residual_predicate_index),
+ selection_index_(selection_index),
+ join_type_(join_type),
+ probe_relation_block_ids_(probe_relation_is_stored
+ ? probe_relation.getBlocksSnapshot()
+ : std::vector<block_id>()),
+ num_workorders_generated_(0),
+ started_(false) {}
~HashJoinOperator() override {}
@@ -164,6 +175,11 @@ class HashJoinOperator : public RelationalOperator {
}
private:
+ template <class JoinWorkOrderClass>
+ bool getAllNonOuterJoinWorkOrders(WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager);
+
const CatalogRelation &build_relation_;
const CatalogRelation &probe_relation_;
const bool probe_relation_is_stored_;
@@ -174,6 +190,7 @@ class HashJoinOperator : public RelationalOperator {
const QueryContext::join_hash_table_id hash_table_index_;
const QueryContext::predicate_id residual_predicate_index_;
const QueryContext::scalar_group_id selection_index_;
+ const JoinType join_type_;
std::vector<block_id> probe_relation_block_ids_;
std::size_t num_workorders_generated_;
@@ -184,9 +201,9 @@ class HashJoinOperator : public RelationalOperator {
};
/**
- * @brief A WorkOrder produced by HashJoinOperator.
+ * @brief An inner join WorkOrder produced by HashJoinOperator.
**/
-class HashJoinWorkOrder : public WorkOrder {
+class HashInnerJoinWorkOrder : public WorkOrder {
public:
/**
* @brief Constructor.
@@ -206,20 +223,20 @@ class HashJoinWorkOrder : public WorkOrder {
* @param selection A list of Scalars corresponding to the relation attributes
* in \c output_destination. Each Scalar is evaluated for the joined
* tuples, and the resulting value is inserted into the join result.
- * @param output_destination The InsertDestination to insert the join results.
* @param hash_table The JoinHashTable to use.
+ * @param output_destination The InsertDestination to insert the join results.
* @param storage_manager The StorageManager to use.
**/
- HashJoinWorkOrder(const CatalogRelationSchema &build_relation,
- const CatalogRelationSchema &probe_relation,
- const std::vector<attribute_id> &join_key_attributes,
- const bool any_join_key_attributes_nullable,
- const block_id lookup_block_id,
- const Predicate *residual_predicate,
- const std::vector<std::unique_ptr<const Scalar>> &selection,
- InsertDestination *output_destination,
- JoinHashTable *hash_table,
- StorageManager *storage_manager)
+ HashInnerJoinWorkOrder(const CatalogRelationSchema &build_relation,
+ const CatalogRelationSchema &probe_relation,
+ const std::vector<attribute_id> &join_key_attributes,
+ const bool any_join_key_attributes_nullable,
+ const block_id lookup_block_id,
+ const Predicate *residual_predicate,
+ const std::vector<std::unique_ptr<const Scalar>> &selection,
+ const JoinHashTable &hash_table,
+ InsertDestination *output_destination,
+ StorageManager *storage_manager)
: build_relation_(build_relation),
probe_relation_(probe_relation),
join_key_attributes_(join_key_attributes),
@@ -227,8 +244,8 @@ class HashJoinWorkOrder : public WorkOrder {
block_id_(lookup_block_id),
residual_predicate_(residual_predicate),
selection_(selection),
+ hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- hash_table_(DCHECK_NOTNULL(hash_table)),
storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
/**
@@ -249,20 +266,20 @@ class HashJoinWorkOrder : public WorkOrder {
* @param selection A list of Scalars corresponding to the relation attributes
* in \c output_destination. Each Scalar is evaluated for the joined
* tuples, and the resulting value is inserted into the join result.
- * @param output_destination The InsertDestination to insert the join results.
* @param hash_table The JoinHashTable to use.
+ * @param output_destination The InsertDestination to insert the join results.
* @param storage_manager The StorageManager to use.
**/
- HashJoinWorkOrder(const CatalogRelationSchema &build_relation,
- const CatalogRelationSchema &probe_relation,
- std::vector<attribute_id> &&join_key_attributes,
- const bool any_join_key_attributes_nullable,
- const block_id lookup_block_id,
- const Predicate *residual_predicate,
- const std::vector<std::unique_ptr<const Scalar>> &selection,
- InsertDestination *output_destination,
- JoinHashTable *hash_table,
- StorageManager *storage_manager)
+ HashInnerJoinWorkOrder(const CatalogRelationSchema &build_relation,
+ const CatalogRelationSchema &probe_relation,
+ std::vector<attribute_id> &&join_key_attributes,
+ const bool any_join_key_attributes_nullable,
+ const block_id lookup_block_id,
+ const Predicate *residual_predicate,
+ const std::vector<std::unique_ptr<const Scalar>> &selection,
+ const JoinHashTable &hash_table,
+ InsertDestination *output_destination,
+ StorageManager *storage_manager)
: build_relation_(build_relation),
probe_relation_(probe_relation),
join_key_attributes_(std::move(join_key_attributes)),
@@ -270,11 +287,11 @@ class HashJoinWorkOrder : public WorkOrder {
block_id_(lookup_block_id),
residual_predicate_(residual_predicate),
selection_(selection),
+ hash_table_(hash_table),
output_destination_(DCHECK_NOTNULL(output_destination)),
- hash_table_(DCHECK_NOTNULL(hash_table)),
storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
- ~HashJoinWorkOrder() override {}
+ ~HashInnerJoinWorkOrder() override {}
/**
* @exception TupleTooLargeForBlock A tuple produced by this join was too
@@ -290,18 +307,257 @@ class HashJoinWorkOrder : public WorkOrder {
template <typename CollectorT>
void executeWithCollectorType();
- const CatalogRelationSchema &build_relation_, &probe_relation_;
+ const CatalogRelationSchema &build_relation_;
+ const CatalogRelationSchema &probe_relation_;
+ const std::vector<attribute_id> join_key_attributes_;
+ const bool any_join_key_attributes_nullable_;
+ const block_id block_id_;
+ const Predicate *residual_predicate_;
+ const std::vector<std::unique_ptr<const Scalar>> &selection_;
+ const JoinHashTable &hash_table_;
+
+ InsertDestination *output_destination_;
+ StorageManager *storage_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(HashInnerJoinWorkOrder);
+};
+
+/**
+ * @brief A left semi-join WorkOrder produced by the HashJoinOperator to execute
+ * EXISTS() clause.
+ **/
+class HashSemiJoinWorkOrder : public WorkOrder {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param build_relation The relation that the hash table was originally built
+ * on (i.e. the inner relation in the join).
+ * @param probe_relation The relation to probe the hash table with (i.e. the
+ * outer relation in the join).
+ * @param join_key_attributes The IDs of equijoin attributes in \c
+ * probe_relation.
+ * @param any_join_key_attributes_nullable If any attribute is nullable.
+ * @param lookup_block_id The block id of the probe_relation.
+ * @param residual_predicate If non-null, apply as an additional filter to
+ * pairs of tuples that match the hash-join (i.e. key equality)
+ * predicate. Effectively, this makes the join predicate the
+ * conjunction of the key-equality predicate and residual_predicate.
+ * @param selection A list of Scalars corresponding to the relation attributes
+ * in \c output_destination. Each Scalar is evaluated for the joined
+ * tuples, and the resulting value is inserted into the join result.
+ * @param hash_table The JoinHashTable to use.
+ * @param output_destination The InsertDestination to insert the join results.
+ * @param storage_manager The StorageManager to use.
+ **/
+ HashSemiJoinWorkOrder(const CatalogRelationSchema &build_relation,
+ const CatalogRelationSchema &probe_relation,
+ const std::vector<attribute_id> &join_key_attributes,
+ const bool any_join_key_attributes_nullable,
+ const block_id lookup_block_id,
+ const Predicate *residual_predicate,
+ const std::vector<std::unique_ptr<const Scalar>> &selection,
+ const JoinHashTable &hash_table,
+ InsertDestination *output_destination,
+ StorageManager *storage_manager)
+ : build_relation_(build_relation),
+ probe_relation_(probe_relation),
+ join_key_attributes_(join_key_attributes),
+ any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+ block_id_(lookup_block_id),
+ residual_predicate_(residual_predicate),
+ selection_(selection),
+ hash_table_(hash_table),
+ output_destination_(DCHECK_NOTNULL(output_destination)),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
+ /**
+ * @brief Constructor for the distributed version.
+ *
+ * @param build_relation The relation that the hash table was originally built
+ * on (i.e. the inner relation in the join).
+ * @param probe_relation The relation to probe the hash table with (i.e. the
+ * outer relation in the join).
+ * @param join_key_attributes The IDs of equijoin attributes in \c
+ * probe_relation.
+ * @param any_join_key_attributes_nullable If any attribute is nullable.
+ * @param lookup_block_id The block id of the probe_relation.
+ * @param residual_predicate If non-null, apply as an additional filter to
+ * pairs of tuples that match the hash-join (i.e. key equality)
+ * predicate. Effectively, this makes the join predicate the
+ * conjunction of the key-equality predicate and residual_predicate.
+ * @param selection A list of Scalars corresponding to the relation attributes
+ * in \c output_destination. Each Scalar is evaluated for the joined
+ * tuples, and the resulting value is inserted into the join result.
+ * @param hash_table The JoinHashTable to use.
+ * @param output_destination The InsertDestination to insert the join results.
+ * @param storage_manager The StorageManager to use.
+ **/
+ HashSemiJoinWorkOrder(const CatalogRelationSchema &build_relation,
+ const CatalogRelationSchema &probe_relation,
+ std::vector<attribute_id> &&join_key_attributes,
+ const bool any_join_key_attributes_nullable,
+ const block_id lookup_block_id,
+ const Predicate *residual_predicate,
+ const std::vector<std::unique_ptr<const Scalar>> &selection,
+ const JoinHashTable &hash_table,
+ InsertDestination *output_destination,
+ StorageManager *storage_manager)
+ : build_relation_(build_relation),
+ probe_relation_(probe_relation),
+ join_key_attributes_(std::move(join_key_attributes)),
+ any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+ block_id_(lookup_block_id),
+ residual_predicate_(residual_predicate),
+ selection_(selection),
+ hash_table_(hash_table),
+ output_destination_(DCHECK_NOTNULL(output_destination)),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
+ ~HashSemiJoinWorkOrder() override {}
+
+ void execute() override;
+
+ private:
+ void executeWithoutResidualPredicate();
+
+ void executeWithResidualPredicate();
+
+ const CatalogRelationSchema &build_relation_;
+ const CatalogRelationSchema &probe_relation_;
+ const std::vector<attribute_id> join_key_attributes_;
+ const bool any_join_key_attributes_nullable_;
+ const block_id block_id_;
+ const Predicate *residual_predicate_;
+ const std::vector<std::unique_ptr<const Scalar>> &selection_;
+ const JoinHashTable &hash_table_;
+
+ InsertDestination *output_destination_;
+ StorageManager *storage_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(HashSemiJoinWorkOrder);
+};
+
+/**
+ * @brief A left anti-join WorkOrder produced by the HashJoinOperator to execute
+ * NOT EXISTS() clause.
+ **/
+class HashAntiJoinWorkOrder : public WorkOrder {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param build_relation The relation that the hash table was originally built
+ * on (i.e. the inner relation in the join).
+ * @param probe_relation The relation to probe the hash table with (i.e. the
+ * outer relation in the join).
+ * @param join_key_attributes The IDs of equijoin attributes in \c
+ * probe_relation.
+ * @param any_join_key_attributes_nullable If any attribute is nullable.
+ * @param lookup_block_id The block id of the probe_relation.
+ * @param residual_predicate If non-null, apply as an additional filter to
+ * pairs of tuples that match the hash-join (i.e. key equality)
+ * predicate. Effectively, this makes the join predicate the
+ * conjunction of the key-equality predicate and residual_predicate.
+ * @param selection A list of Scalars corresponding to the relation attributes
+ * in \c output_destination. Each Scalar is evaluated for the joined
+ * tuples, and the resulting value is inserted into the join result.
+ * @param hash_table The JoinHashTable to use.
+ * @param output_destination The InsertDestination to insert the join results.
+ * @param storage_manager The StorageManager to use.
+ **/
+ HashAntiJoinWorkOrder(const CatalogRelationSchema &build_relation,
+ const CatalogRelationSchema &probe_relation,
+ const std::vector<attribute_id> &join_key_attributes,
+ const bool any_join_key_attributes_nullable,
+ const block_id lookup_block_id,
+ const Predicate *residual_predicate,
+ const std::vector<std::unique_ptr<const Scalar>> &selection,
+ const JoinHashTable &hash_table,
+ InsertDestination *output_destination,
+ StorageManager *storage_manager)
+ : build_relation_(build_relation),
+ probe_relation_(probe_relation),
+ join_key_attributes_(join_key_attributes),
+ any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+ block_id_(lookup_block_id),
+ residual_predicate_(residual_predicate),
+ selection_(selection),
+ hash_table_(hash_table),
+ output_destination_(DCHECK_NOTNULL(output_destination)),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
+ /**
+ * @brief Constructor for the distributed version.
+ *
+ * @param build_relation The relation that the hash table was originally built
+ * on (i.e. the inner relation in the join).
+ * @param probe_relation The relation to probe the hash table with (i.e. the
+ * outer relation in the join).
+ * @param join_key_attributes The IDs of equijoin attributes in \c
+ * probe_relation.
+ * @param any_join_key_attributes_nullable If any attribute is nullable.
+ * @param lookup_block_id The block id of the probe_relation.
+ * @param residual_predicate If non-null, apply as an additional filter to
+ * pairs of tuples that match the hash-join (i.e. key equality)
+ * predicate. Effectively, this makes the join predicate the
+ * conjunction of the key-equality predicate and residual_predicate.
+ * @param selection A list of Scalars corresponding to the relation attributes
+ * in \c output_destination. Each Scalar is evaluated for the joined
+ * tuples, and the resulting value is inserted into the join result.
+ * @param hash_table The JoinHashTable to use.
+ * @param output_destination The InsertDestination to insert the join results.
+ * @param storage_manager The StorageManager to use.
+ **/
+ HashAntiJoinWorkOrder(const CatalogRelationSchema &build_relation,
+ const CatalogRelationSchema &probe_relation,
+ std::vector<attribute_id> &&join_key_attributes,
+ const bool any_join_key_attributes_nullable,
+ const block_id lookup_block_id,
+ const Predicate *residual_predicate,
+ const std::vector<std::unique_ptr<const Scalar>> &selection,
+ const JoinHashTable &hash_table,
+ InsertDestination *output_destination,
+ StorageManager *storage_manager)
+ : build_relation_(build_relation),
+ probe_relation_(probe_relation),
+ join_key_attributes_(join_key_attributes),
+ any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+ block_id_(lookup_block_id),
+ residual_predicate_(residual_predicate),
+ selection_(selection),
+ hash_table_(hash_table),
+ output_destination_(DCHECK_NOTNULL(output_destination)),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
+ ~HashAntiJoinWorkOrder() override {}
+
+ void execute() override {
+ if (residual_predicate_ == nullptr) {
+ executeWithoutResidualPredicate();
+ } else {
+ executeWithResidualPredicate();
+ }
+ }
+
+ private:
+ void executeWithoutResidualPredicate();
+
+ void executeWithResidualPredicate();
+
+ const CatalogRelationSchema &build_relation_;
+ const CatalogRelationSchema &probe_relation_;
const std::vector<attribute_id> join_key_attributes_;
const bool any_join_key_attributes_nullable_;
const block_id block_id_;
const Predicate *residual_predicate_;
const std::vector<std::unique_ptr<const Scalar>> &selection_;
+ const JoinHashTable &hash_table_;
InsertDestination *output_destination_;
- JoinHashTable *hash_table_;
StorageManager *storage_manager_;
- DISALLOW_COPY_AND_ASSIGN(HashJoinWorkOrder);
+ DISALLOW_COPY_AND_ASSIGN(HashAntiJoinWorkOrder);
};
/** @} */
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index e0ec19d..1a0bcd1 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -29,18 +29,20 @@ enum WorkOrderType {
DESTROY_HASH = 6;
DROP_TABLE = 7;
FINALIZE_AGGREGATION = 8;
- HASH_JOIN = 9;
- INSERT = 10;
- NESTED_LOOP_JOIN = 11;
- SAMPLE = 12;
- SAVE_BLOCKS = 13;
- SELECT = 14;
- SORT_MERGE_RUN = 15;
- SORT_RUN_GENERATION = 16;
- TABLE_GENERATOR = 17;
- TEXT_SCAN = 18;
- TEXT_SPLIT = 19;
- UPDATE = 20;
+ HASH_INNER_JOIN = 9;
+ HASH_SEMI_JOIN = 10;
+ HASH_ANTI_JOIN = 11;
+ INSERT = 12;
+ NESTED_LOOP_JOIN = 13;
+ SAMPLE = 14;
+ SAVE_BLOCKS = 15;
+ SELECT = 16;
+ SORT_MERGE_RUN = 17;
+ SORT_RUN_GENERATION = 18;
+ TABLE_GENERATOR = 19;
+ TEXT_SCAN = 20;
+ TEXT_SPLIT = 21;
+ UPDATE = 22;
}
message WorkOrder {
@@ -104,7 +106,7 @@ message FinalizeAggregationWorkOrder {
}
}
-message HashJoinWorkOrder {
+message HashInnerJoinWorkOrder {
extend WorkOrder {
// All required.
optional int32 build_relation_id = 160;
@@ -119,6 +121,36 @@ message HashJoinWorkOrder {
}
}
+message HashAntiJoinWorkOrder {
+ extend WorkOrder {
+ // All required.
+ optional int32 build_relation_id = 350;
+ optional int32 probe_relation_id = 351;
+ repeated int32 join_key_attributes = 352;
+ optional bool any_join_key_attributes_nullable = 353;
+ optional int32 insert_destination_index = 354;
+ optional uint32 join_hash_table_index = 355;
+ optional int32 residual_predicate_index = 356;
+ optional int32 selection_index = 357;
+ optional fixed64 block_id = 358;
+ }
+}
+
+message HashSemiJoinWorkOrder {
+ extend WorkOrder {
+ // All required.
+ optional int32 build_relation_id = 360;
+ optional int32 probe_relation_id = 361;
+ repeated int32 join_key_attributes = 362;
+ optional bool any_join_key_attributes_nullable = 363;
+ optional int32 insert_destination_index = 364;
+ optional uint32 join_hash_table_index = 365;
+ optional int32 residual_predicate_index = 366;
+ optional int32 selection_index = 367;
+ optional fixed64 block_id = 368;
+ }
+}
+
message InsertWorkOrder {
extend WorkOrder {
// All required.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 4713681..92c1140 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -135,30 +135,88 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
query_context->getInsertDestination(
proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index)));
}
- case serialization::HASH_JOIN: {
- LOG(INFO) << "Creating HashJoinWorkOrder";
+ case serialization::HASH_INNER_JOIN: {
+ LOG(INFO) << "Creating HashInnerJoinWorkOrder";
vector<attribute_id> join_key_attributes;
- for (int i = 0; i < proto.ExtensionSize(serialization::HashJoinWorkOrder::join_key_attributes); ++i) {
+ const int join_key_attributes_size =
+ proto.ExtensionSize(serialization::HashAntiJoinWorkOrder::join_key_attributes);
+ for (int i = 0; i < join_key_attributes_size; ++i) {
join_key_attributes.push_back(
- proto.GetExtension(serialization::HashJoinWorkOrder::join_key_attributes, i));
+ proto.GetExtension(serialization::HashInnerJoinWorkOrder::join_key_attributes, i));
}
- return new HashJoinWorkOrder(
+ return new HashInnerJoinWorkOrder(
catalog_database->getRelationSchemaById(
- proto.GetExtension(serialization::HashJoinWorkOrder::build_relation_id)),
+ proto.GetExtension(serialization::HashInnerJoinWorkOrder::build_relation_id)),
catalog_database->getRelationSchemaById(
- proto.GetExtension(serialization::HashJoinWorkOrder::probe_relation_id)),
+ proto.GetExtension(serialization::HashInnerJoinWorkOrder::probe_relation_id)),
move(join_key_attributes),
- proto.GetExtension(serialization::HashJoinWorkOrder::any_join_key_attributes_nullable),
- proto.GetExtension(serialization::HashJoinWorkOrder::block_id),
+ proto.GetExtension(serialization::HashInnerJoinWorkOrder::any_join_key_attributes_nullable),
+ proto.GetExtension(serialization::HashInnerJoinWorkOrder::block_id),
query_context->getPredicate(
- proto.GetExtension(serialization::HashJoinWorkOrder::residual_predicate_index)),
+ proto.GetExtension(serialization::HashInnerJoinWorkOrder::residual_predicate_index)),
query_context->getScalarGroup(
- proto.GetExtension(serialization::HashJoinWorkOrder::selection_index)),
+ proto.GetExtension(serialization::HashInnerJoinWorkOrder::selection_index)),
+ *query_context->getJoinHashTable(
+ proto.GetExtension(serialization::HashInnerJoinWorkOrder::join_hash_table_index)),
query_context->getInsertDestination(
- proto.GetExtension(serialization::HashJoinWorkOrder::insert_destination_index)),
- query_context->getJoinHashTable(
- proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index)),
+ proto.GetExtension(serialization::HashInnerJoinWorkOrder::insert_destination_index)),
+ storage_manager);
+ }
+ case serialization::HASH_SEMI_JOIN: {
+ LOG(INFO) << "Creating HashSemiJoinWorkOrder";
+ vector<attribute_id> join_key_attributes;
+ const int join_key_attributes_size =
+ proto.ExtensionSize(serialization::HashAntiJoinWorkOrder::join_key_attributes);
+ for (int i = 0; i < join_key_attributes_size; ++i) {
+ join_key_attributes.push_back(
+ proto.GetExtension(serialization::HashSemiJoinWorkOrder::join_key_attributes, i));
+ }
+
+ return new HashSemiJoinWorkOrder(
+ catalog_database->getRelationSchemaById(
+ proto.GetExtension(serialization::HashSemiJoinWorkOrder::build_relation_id)),
+ catalog_database->getRelationSchemaById(
+ proto.GetExtension(serialization::HashSemiJoinWorkOrder::probe_relation_id)),
+ move(join_key_attributes),
+ proto.GetExtension(serialization::HashSemiJoinWorkOrder::any_join_key_attributes_nullable),
+ proto.GetExtension(serialization::HashSemiJoinWorkOrder::block_id),
+ query_context->getPredicate(
+ proto.GetExtension(serialization::HashSemiJoinWorkOrder::residual_predicate_index)),
+ query_context->getScalarGroup(
+ proto.GetExtension(serialization::HashSemiJoinWorkOrder::selection_index)),
+ *query_context->getJoinHashTable(
+ proto.GetExtension(serialization::HashSemiJoinWorkOrder::join_hash_table_index)),
+ query_context->getInsertDestination(
+ proto.GetExtension(serialization::HashSemiJoinWorkOrder::insert_destination_index)),
+ storage_manager);
+ }
+ case serialization::HASH_ANTI_JOIN: {
+ LOG(INFO) << "Creating HashAntiJoinWorkOrder";
+ vector<attribute_id> join_key_attributes;
+ const int join_key_attributes_size =
+ proto.ExtensionSize(serialization::HashAntiJoinWorkOrder::join_key_attributes);
+ for (int i = 0; i < join_key_attributes_size; ++i) {
+ join_key_attributes.push_back(
+ proto.GetExtension(serialization::HashAntiJoinWorkOrder::join_key_attributes, i));
+ }
+
+ return new HashAntiJoinWorkOrder(
+ catalog_database->getRelationSchemaById(
+ proto.GetExtension(serialization::HashAntiJoinWorkOrder::build_relation_id)),
+ catalog_database->getRelationSchemaById(
+ proto.GetExtension(serialization::HashAntiJoinWorkOrder::probe_relation_id)),
+ move(join_key_attributes),
+ proto.GetExtension(serialization::HashAntiJoinWorkOrder::any_join_key_attributes_nullable),
+ proto.GetExtension(serialization::HashAntiJoinWorkOrder::block_id),
+ query_context->getPredicate(
+ proto.GetExtension(serialization::HashAntiJoinWorkOrder::residual_predicate_index)),
+ query_context->getScalarGroup(
+ proto.GetExtension(serialization::HashAntiJoinWorkOrder::selection_index)),
+ *query_context->getJoinHashTable(
+ proto.GetExtension(serialization::HashAntiJoinWorkOrder::join_hash_table_index)),
+ query_context->getInsertDestination(
+ proto.GetExtension(serialization::HashAntiJoinWorkOrder::insert_destination_index)),
storage_manager);
}
case serialization::INSERT: {
@@ -394,46 +452,137 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
query_context.isValidInsertDestinationId(
proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index));
}
- case serialization::HASH_JOIN: {
- if (!proto.HasExtension(serialization::HashJoinWorkOrder::build_relation_id) ||
- !proto.HasExtension(serialization::HashJoinWorkOrder::probe_relation_id)) {
+ case serialization::HASH_INNER_JOIN: {
+ if (!proto.HasExtension(serialization::HashInnerJoinWorkOrder::build_relation_id) ||
+ !proto.HasExtension(serialization::HashInnerJoinWorkOrder::probe_relation_id)) {
+ return false;
+ }
+
+ const relation_id build_relation_id =
+ proto.GetExtension(serialization::HashInnerJoinWorkOrder::build_relation_id);
+ if (!catalog_database.hasRelationWithId(build_relation_id)) {
+ return false;
+ }
+
+ const relation_id probe_relation_id =
+ proto.GetExtension(serialization::HashInnerJoinWorkOrder::probe_relation_id);
+ if (!catalog_database.hasRelationWithId(probe_relation_id)) {
+ return false;
+ }
+
+ const CatalogRelationSchema &build_relation = catalog_database.getRelationSchemaById(build_relation_id);
+ const CatalogRelationSchema &probe_relation = catalog_database.getRelationSchemaById(probe_relation_id);
+ for (int i = 0; i < proto.ExtensionSize(serialization::HashInnerJoinWorkOrder::join_key_attributes); ++i) {
+ const attribute_id attr_id =
+ proto.GetExtension(serialization::HashInnerJoinWorkOrder::join_key_attributes, i);
+ if (!build_relation.hasAttributeWithId(attr_id) ||
+ !probe_relation.hasAttributeWithId(attr_id)) {
+ return false;
+ }
+ }
+
+ return proto.HasExtension(serialization::HashInnerJoinWorkOrder::any_join_key_attributes_nullable) &&
+ proto.HasExtension(serialization::HashInnerJoinWorkOrder::insert_destination_index) &&
+ query_context.isValidInsertDestinationId(
+ proto.GetExtension(serialization::HashInnerJoinWorkOrder::insert_destination_index)) &&
+ proto.HasExtension(serialization::HashInnerJoinWorkOrder::join_hash_table_index) &&
+ query_context.isValidJoinHashTableId(
+ proto.GetExtension(serialization::HashInnerJoinWorkOrder::join_hash_table_index)) &&
+ proto.HasExtension(serialization::HashInnerJoinWorkOrder::residual_predicate_index) &&
+ query_context.isValidPredicate(
+ proto.GetExtension(serialization::HashInnerJoinWorkOrder::residual_predicate_index)) &&
+ proto.HasExtension(serialization::HashInnerJoinWorkOrder::selection_index) &&
+ query_context.isValidScalarGroupId(
+ proto.GetExtension(serialization::HashInnerJoinWorkOrder::selection_index)) &&
+ proto.HasExtension(serialization::HashInnerJoinWorkOrder::block_id);
+ }
+ case serialization::HASH_SEMI_JOIN: {
+ if (!proto.HasExtension(serialization::HashSemiJoinWorkOrder::build_relation_id) ||
+ !proto.HasExtension(serialization::HashSemiJoinWorkOrder::probe_relation_id)) {
+ return false;
+ }
+
+ const relation_id build_relation_id =
+ proto.GetExtension(serialization::HashSemiJoinWorkOrder::build_relation_id);
+ if (!catalog_database.hasRelationWithId(build_relation_id)) {
+ return false;
+ }
+
+ const relation_id probe_relation_id =
+ proto.GetExtension(serialization::HashSemiJoinWorkOrder::probe_relation_id);
+ if (!catalog_database.hasRelationWithId(probe_relation_id)) {
+ return false;
+ }
+
+ const CatalogRelationSchema &build_relation = catalog_database.getRelationSchemaById(build_relation_id);
+ const CatalogRelationSchema &probe_relation = catalog_database.getRelationSchemaById(probe_relation_id);
+ for (int i = 0; i < proto.ExtensionSize(serialization::HashSemiJoinWorkOrder::join_key_attributes); ++i) {
+ const attribute_id attr_id =
+ proto.GetExtension(serialization::HashSemiJoinWorkOrder::join_key_attributes, i);
+ if (!build_relation.hasAttributeWithId(attr_id) ||
+ !probe_relation.hasAttributeWithId(attr_id)) {
+ return false;
+ }
+ }
+
+ return proto.HasExtension(serialization::HashSemiJoinWorkOrder::any_join_key_attributes_nullable) &&
+ proto.HasExtension(serialization::HashSemiJoinWorkOrder::insert_destination_index) &&
+ query_context.isValidInsertDestinationId(
+ proto.GetExtension(serialization::HashSemiJoinWorkOrder::insert_destination_index)) &&
+ proto.HasExtension(serialization::HashSemiJoinWorkOrder::join_hash_table_index) &&
+ query_context.isValidJoinHashTableId(
+ proto.GetExtension(serialization::HashSemiJoinWorkOrder::join_hash_table_index)) &&
+ proto.HasExtension(serialization::HashSemiJoinWorkOrder::residual_predicate_index) &&
+ query_context.isValidPredicate(
+ proto.GetExtension(serialization::HashSemiJoinWorkOrder::residual_predicate_index)) &&
+ proto.HasExtension(serialization::HashSemiJoinWorkOrder::selection_index) &&
+ query_context.isValidScalarGroupId(
+ proto.GetExtension(serialization::HashSemiJoinWorkOrder::selection_index)) &&
+ proto.HasExtension(serialization::HashSemiJoinWorkOrder::block_id);
+ }
+ case serialization::HASH_ANTI_JOIN: {
+ if (!proto.HasExtension(serialization::HashAntiJoinWorkOrder::build_relation_id) ||
+ !proto.HasExtension(serialization::HashAntiJoinWorkOrder::probe_relation_id)) {
return false;
}
- const relation_id build_relation_id = proto.GetExtension(serialization::HashJoinWorkOrder::build_relation_id);
+ const relation_id build_relation_id =
+ proto.GetExtension(serialization::HashAntiJoinWorkOrder::build_relation_id);
if (!catalog_database.hasRelationWithId(build_relation_id)) {
return false;
}
- const relation_id probe_relation_id = proto.GetExtension(serialization::HashJoinWorkOrder::probe_relation_id);
+ const relation_id probe_relation_id =
+ proto.GetExtension(serialization::HashAntiJoinWorkOrder::probe_relation_id);
if (!catalog_database.hasRelationWithId(probe_relation_id)) {
return false;
}
const CatalogRelationSchema &build_relation = catalog_database.getRelationSchemaById(build_relation_id);
const CatalogRelationSchema &probe_relation = catalog_database.getRelationSchemaById(probe_relation_id);
- for (int i = 0; i < proto.ExtensionSize(serialization::HashJoinWorkOrder::join_key_attributes); ++i) {
- const attribute_id attr_id = proto.GetExtension(serialization::HashJoinWorkOrder::join_key_attributes, i);
+ for (int i = 0; i < proto.ExtensionSize(serialization::HashAntiJoinWorkOrder::join_key_attributes); ++i) {
+ const attribute_id attr_id =
+ proto.GetExtension(serialization::HashAntiJoinWorkOrder::join_key_attributes, i);
if (!build_relation.hasAttributeWithId(attr_id) ||
!probe_relation.hasAttributeWithId(attr_id)) {
return false;
}
}
- return proto.HasExtension(serialization::HashJoinWorkOrder::any_join_key_attributes_nullable) &&
- proto.HasExtension(serialization::HashJoinWorkOrder::insert_destination_index) &&
+ return proto.HasExtension(serialization::HashAntiJoinWorkOrder::any_join_key_attributes_nullable) &&
+ proto.HasExtension(serialization::HashAntiJoinWorkOrder::insert_destination_index) &&
query_context.isValidInsertDestinationId(
- proto.GetExtension(serialization::HashJoinWorkOrder::insert_destination_index)) &&
- proto.HasExtension(serialization::HashJoinWorkOrder::join_hash_table_index) &&
+ proto.GetExtension(serialization::HashAntiJoinWorkOrder::insert_destination_index)) &&
+ proto.HasExtension(serialization::HashAntiJoinWorkOrder::join_hash_table_index) &&
query_context.isValidJoinHashTableId(
- proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index)) &&
- proto.HasExtension(serialization::HashJoinWorkOrder::residual_predicate_index) &&
+ proto.GetExtension(serialization::HashAntiJoinWorkOrder::join_hash_table_index)) &&
+ proto.HasExtension(serialization::HashAntiJoinWorkOrder::residual_predicate_index) &&
query_context.isValidPredicate(
- proto.GetExtension(serialization::HashJoinWorkOrder::residual_predicate_index)) &&
- proto.HasExtension(serialization::HashJoinWorkOrder::selection_index) &&
+ proto.GetExtension(serialization::HashAntiJoinWorkOrder::residual_predicate_index)) &&
+ proto.HasExtension(serialization::HashAntiJoinWorkOrder::selection_index) &&
query_context.isValidScalarGroupId(
- proto.GetExtension(serialization::HashJoinWorkOrder::selection_index)) &&
- proto.HasExtension(serialization::HashJoinWorkOrder::block_id);
+ proto.GetExtension(serialization::HashAntiJoinWorkOrder::selection_index)) &&
+ proto.HasExtension(serialization::HashAntiJoinWorkOrder::block_id);
}
case serialization::INSERT: {
return proto.HasExtension(serialization::InsertWorkOrder::insert_destination_index) &&
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index ab74e2c..ef79d11 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -707,6 +707,89 @@ class HashTable : public HashTableBase<resizable,
FunctorT *functor) const;
/**
+ * @brief Lookup (multiple) keys from a ValueAccessor, apply a functor to
+ * the matching values and additionally call a hasMatch() function of
+ * the functor when the first match for a key is found.
+ * @warning This method assumes that no concurrent calls to put(),
+ * putCompositeKey(), putValueAccessor(),
+ * putValueAccessorCompositeKey(), upsert(), upsertCompositeKey(),
+ * upsertValueAccessor(), or upsertValueAccessorCompositeKey() are
+ * taking place (i.e. that this HashTable is immutable for the
+ * duration of the call and as long as the returned pointer may be
+ * dereferenced). Concurrent calls to getSingle(),
+ * getSingleCompositeKey(), getAll(), getAllCompositeKey(),
+ * getAllFromValueAccessor(), getAllFromValueAccessorCompositeKey(),
+ * forEach(), and forEachCompositeKey() are safe.
+ * @note This version is for single scalar keys. See also
+ * getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch().
+ *
+ * @param accessor A ValueAccessor which will be used to access keys.
+ * beginIteration() should be called on accessor before calling this
+ * method.
+ * @param key_attr_id The attribute ID of the keys to be read from accessor.
+ * @param check_for_null_keys If true, each key will be checked to see if it
+ * is null before looking it up (null keys are skipped). This must be
+ * set to true if some of the keys that will be read from accessor may
+ * be null.
+ * @param functor A pointer to a functor, which should provide two functions:
+ * 1) An operator that takes 2 arguments: const ValueAccessor& (or better
+ * yet, a templated call operator which takes a const reference to
+ * some subclass of ValueAccessor as its first argument) and
+ * const ValueT&. The operator will be invoked once for each pair of a
+ * key taken from accessor and matching value.
+ * 2) A function hasMatch that takes 1 argument: const ValueAccessor&.
+ * The function will be called only once for a key from accessor when
+ * the first match is found.
+ */
+ template <typename FunctorT>
+ void getAllFromValueAccessorWithExtraWorkForFirstMatch(
+ ValueAccessor *accessor,
+ const attribute_id key_attr_id,
+ const bool check_for_null_keys,
+ FunctorT *functor) const;
+
+ /**
+ * @brief Lookup (multiple) keys from a ValueAccessor, apply a functor to
+ * the matching values and additionally call a hasMatch() function of
+ * the functor when the first match for a key is found. Composite key
+ * version.
+ * @warning This method assumes that no concurrent calls to put(),
+ * putCompositeKey(), putValueAccessor(),
+ * putValueAccessorCompositeKey(), upsert(), upsertCompositeKey(),
+ * upsertValueAccessor(), or upsertValueAccessorCompositeKey() are
+ * taking place (i.e. that this HashTable is immutable for the
+ * duration of the call and as long as the returned pointer may be
+ * dereferenced). Concurrent calls to getSingle(),
+ * getSingleCompositeKey(), getAll(), getAllCompositeKey(),
+ * getAllFromValueAccessor(), getAllFromValueAccessorCompositeKey(),
+ * forEach(), and forEachCompositeKey() are safe.
+ *
+ * @param accessor A ValueAccessor which will be used to access keys.
+ * beginIteration() should be called on accessor before calling this
+ * method.
+ * @param key_attr_id The attribute ID of the keys to be read from accessor.
+ * @param check_for_null_keys If true, each key will be checked to see if it
+ * is null before looking it up (null keys are skipped). This must be
+ * set to true if some of the keys that will be read from accessor may
+ * be null.
+ * @param functor A pointer to a functor, which should provide two functions:
+ * 1) An operator that takes 2 arguments: const ValueAccessor& (or better
+ * yet, a templated call operator which takes a const reference to
+ * some subclass of ValueAccessor as its first argument) and
+ * const ValueT&. The operator will be invoked once for each pair of a
+ * key taken from accessor and matching value.
+ * 2) A function hasMatch that takes 1 argument: const ValueAccessor&.
+ * The function will be called only once for a key from accessor when
+ * the first match is found.
+ */
+ template <typename FunctorT>
+ void getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch(
+ ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_attr_ids,
+ const bool check_for_null_keys,
+ FunctorT *functor) const;
+
+ /**
* @brief Lookup (multiple) keys from a ValueAccessor and apply a functor to
* the matching values. Composite key version.
*
@@ -746,6 +829,113 @@ class HashTable : public HashTableBase<resizable,
FunctorT *functor) const;
/**
+ * @brief Apply the functor to each key with a match in the hash table.
+ *
+ * @param accessor A ValueAccessor which will be used to access keys.
+ * beginIteration() should be called on accessor before calling this
+ * method.
+ * @param key_attr_id The attribute ID of the keys to be read from accessor.
+ * @param check_for_null_keys If true, each key will be checked to see if it
+ * is null before looking it up (null keys are skipped). This must be
+ * set to true if some of the keys that will be read from accessor may
+ * be null.
+ * @param functor A pointer to a functor which should provide an operator that
+ * takes 1 argument: const ValueAccessor&. The operator will be called
+ * only once for a key from accessor if there is a match.
+ */
+ template <typename FunctorT>
+ void runOverKeysFromValueAccessorIfMatchFound(ValueAccessor *accessor,
+ const attribute_id key_attr_id,
+ const bool check_for_null_keys,
+ FunctorT *functor) const {
+ return runOverKeysFromValueAccessor<true>(accessor,
+ key_attr_id,
+ check_for_null_keys,
+ functor);
+ }
+
+ /**
+ * @brief Apply the functor to each key with a match in the hash table.
+ *
+ * @param accessor A ValueAccessor which will be used to access keys.
+ * beginIteration() should be called on accessor before calling this
+ * method.
+ * @param key_attr_id The attribute ID of the keys to be read from accessor.
+ * @param check_for_null_keys If true, each key will be checked to see if it
+ * is null before looking it up (null keys are skipped). This must be
+ * set to true if some of the keys that will be read from accessor may
+ * be null.
+ * @param functor A pointer to a functor which should provide an operator that
+ * takes 1 argument: const ValueAccessor&. The operator will be called
+ * only once for a key from accessor if there is a match.
+ */
+ template <typename FunctorT>
+ void runOverKeysFromValueAccessorIfMatchFoundCompositeKey(
+ ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_attr_ids,
+ const bool check_for_null_keys,
+ FunctorT *functor) const {
+ return runOverKeysFromValueAccessorCompositeKey<true>(accessor,
+ key_attr_ids,
+ check_for_null_keys,
+ functor);
+ }
+
+ /**
+ * @brief Apply the functor to each key without a match in the hash table.
+ *
+ * @param accessor A ValueAccessor which will be used to access keys.
+ * beginIteration() should be called on accessor before calling this
+ * method.
+ * @param key_attr_id The attribute ID of the keys to be read from accessor.
+ * @param check_for_null_keys If true, each key will be checked to see if it
+ * is null before looking it up (null keys are skipped). This must be
+ * set to true if some of the keys that will be read from accessor may
+ * be null.
+ * @param functor A pointer to a functor which should provide an operator that
+ * takes 1 argument: const ValueAccessor&. The operator will be called
+ * only once for a key from accessor if there is no match.
+ */
+ template <typename FunctorT>
+ void runOverKeysFromValueAccessorIfMatchNotFound(
+ ValueAccessor *accessor,
+ const attribute_id key_attr_id,
+ const bool check_for_null_keys,
+ FunctorT *functor) const {
+ return runOverKeysFromValueAccessor<false>(accessor,
+ key_attr_id,
+ check_for_null_keys,
+ functor);
+ }
+
+ /**
+ * @brief Apply the functor to each key without a match in the hash table.
+ *
+ * @param accessor A ValueAccessor which will be used to access keys.
+ * beginIteration() should be called on accessor before calling this
+ * method.
+ * @param key_attr_id The attribute ID of the keys to be read from accessor.
+ * @param check_for_null_keys If true, each key will be checked to see if it
+ * is null before looking it up (null keys are skipped). This must be
+ * set to true if some of the keys that will be read from accessor may
+ * be null.
+ * @param functor A pointer to a functor which should provide an operator that
+ * takes 1 argument: const ValueAccessor&. The operator will be called
+ * only once for a key from accessor if there is no match.
+ */
+ template <typename FunctorT>
+ void runOverKeysFromValueAccessorIfMatchNotFoundCompositeKey(
+ ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_attr_ids,
+ const bool check_for_null_keys,
+ FunctorT *functor) const {
+ return runOverKeysFromValueAccessorCompositeKey<false>(accessor,
+ key_attr_ids,
+ check_for_null_keys,
+ functor);
+ }
+
+ /**
* @brief Apply a functor to each key, value pair in this hash table.
*
* @warning This method assumes that no concurrent calls to put(),
@@ -965,6 +1155,10 @@ class HashTable : public HashTableBase<resizable,
const ValueT **value,
std::size_t *entry_num) const = 0;
+ // Return true if key exists in the hash table.
+ virtual bool hasKey(const TypedValue &key) const = 0;
+ virtual bool hasCompositeKey(const std::vector<TypedValue> &key) const = 0;
+
// For a resizable HashTable, grow to accomodate more entries. If
// 'extra_buckets' is not zero, it may serve as a "hint" to implementations
// that at least the requested number of extra buckets are required when
@@ -1048,6 +1242,21 @@ class HashTable : public HashTableBase<resizable,
return false;
}
+ // If run_if_match_found is true, apply the functor to each key if a match is
+ // found; otherwise, apply the functor if no match is found.
+ template <bool run_if_match_found, typename FunctorT>
+ void runOverKeysFromValueAccessor(ValueAccessor *accessor,
+ const attribute_id key_attr_id,
+ const bool check_for_null_keys,
+ FunctorT *functor) const;
+
+ template <bool run_if_match_found, typename FunctorT>
+ void runOverKeysFromValueAccessorCompositeKey(
+ ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_attr_ids,
+ const bool check_for_null_keys,
+ FunctorT *functor) const;
+
// Method containing the actual logic implementing getAllFromValueAccessor().
// Has extra template parameters that control behavior to avoid some
// inner-loop branching.
@@ -1678,6 +1887,184 @@ template <typename ValueT,
bool force_key_copy,
bool allow_duplicate_keys>
template <typename FunctorT>
+void HashTable<ValueT,
+ resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ getAllFromValueAccessorWithExtraWorkForFirstMatch(
+ ValueAccessor *accessor,
+ const attribute_id key_attr_id,
+ const bool check_for_null_keys,
+ FunctorT *functor) const {
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ while (accessor->next()) {
+ TypedValue key = accessor->getTypedValue(key_attr_id);
+ if (check_for_null_keys && key.isNull()) {
+ continue;
+ }
+ const std::size_t hash_code = adjust_hashes_ ? AdjustHash(key.getHash())
+ : key.getHash();
+ std::size_t entry_num = 0;
+ const ValueT *value;
+ if (getNextEntryForKey(key, hash_code, &value, &entry_num)) {
+ functor->recordMatch(*accessor);
+ (*functor)(*accessor, *value);
+ if (!allow_duplicate_keys) {
+ continue;
+ }
+ while (getNextEntryForKey(key, hash_code, &value, &entry_num)) {
+ (*functor)(*accessor, *value);
+ }
+ }
+ }
+ }); // NOLINT(whitespace/parens)
+}
+
+template <typename ValueT,
+ bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+template <typename FunctorT>
+void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch(
+ ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_attr_ids,
+ const bool check_for_null_keys,
+ FunctorT *functor) const {
+ DEBUG_ASSERT(key_types_.size() == key_attr_ids.size());
+ std::vector<TypedValue> key_vector;
+ key_vector.resize(key_attr_ids.size());
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ while (accessor->next()) {
+ bool null_key = false;
+ for (std::vector<attribute_id>::size_type key_idx = 0;
+ key_idx < key_types_.size();
+ ++key_idx) {
+ key_vector[key_idx] = accessor->getTypedValue(key_attr_ids[key_idx]);
+ if (check_for_null_keys && key_vector[key_idx].isNull()) {
+ null_key = true;
+ break;
+ }
+ }
+ if (null_key) {
+ continue;
+ }
+
+ const std::size_t hash_code = adjust_hashes_ ? AdjustHash(hashCompositeKey(key_vector))
+ : hashCompositeKey(key_vector);
+ std::size_t entry_num = 0;
+ const ValueT *value;
+ if (getNextEntryForCompositeKey(key_vector, hash_code, &value, &entry_num)) {
+ functor->recordMatch(*accessor);
+ (*functor)(*accessor, *value);
+ if (!allow_duplicate_keys) {
+ continue;
+ }
+ while (getNextEntryForCompositeKey(key_vector, hash_code, &value, &entry_num)) {
+ (*functor)(*accessor, *value);
+ }
+ }
+ }
+ }); // NOLINT(whitespace/parens)
+}
+
+template <typename ValueT,
+ bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+template <bool run_if_match_found, typename FunctorT>
+void HashTable<ValueT,
+ resizable,
+ serializable,
+ force_key_copy,
+ allow_duplicate_keys>::
+ runOverKeysFromValueAccessor(ValueAccessor *accessor,
+ const attribute_id key_attr_id,
+ const bool check_for_null_keys,
+ FunctorT *functor) const {
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ while (accessor->next()) {
+ TypedValue key = accessor->getTypedValue(key_attr_id);
+ if (check_for_null_keys && key.isNull()) {
+ if (!run_if_match_found) {
+ (*functor)(*accessor);
+ continue;
+ }
+ }
+ if (run_if_match_found) {
+ if (this->hasKey(key)) {
+ (*functor)(*accessor);
+ }
+ } else {
+ if (!this->hasKey(key)) {
+ (*functor)(*accessor);
+ }
+ }
+ }
+ }); // NOLINT(whitespace/parens)
+}
+
+template <typename ValueT,
+ bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+template <bool run_if_match_found, typename FunctorT>
+void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
+ ::runOverKeysFromValueAccessorCompositeKey(ValueAccessor *accessor,
+ const std::vector<attribute_id> &key_attr_ids,
+ const bool check_for_null_keys,
+ FunctorT *functor) const {
+ DEBUG_ASSERT(key_types_.size() == key_attr_ids.size());
+ std::vector<TypedValue> key_vector;
+ key_vector.resize(key_attr_ids.size());
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ while (accessor->next()) {
+ bool null_key = false;
+ for (std::vector<attribute_id>::size_type key_idx = 0;
+ key_idx < key_types_.size();
+ ++key_idx) {
+ key_vector[key_idx] = accessor->getTypedValue(key_attr_ids[key_idx]);
+ if (check_for_null_keys && key_vector[key_idx].isNull()) {
+ null_key = true;
+ break;
+ }
+ }
+ if (null_key) {
+ if (!run_if_match_found) {
+ (*functor)(*accessor);
+ continue;
+ }
+ }
+
+ if (run_if_match_found) {
+ if (this->hasCompositeKey(key_vector)) {
+ (*functor)(*accessor);
+ }
+ } else if (!this->hasCompositeKey(key_vector)) {
+ (*functor)(*accessor);
+ }
+ }
+ }); // NOLINT(whitespace/parens)
+}
+
+template <typename ValueT,
+ bool resizable,
+ bool serializable,
+ bool force_key_copy,
+ bool allow_duplicate_keys>
+template <typename FunctorT>
std::size_t HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
::forEach(FunctorT *functor) const {
std::size_t entries_visited = 0;