You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/04/19 18:44:19 UTC

[06/24] incubator-quickstep git commit: Adds backend support for hash semi/anti joins. (#164)

Adds backend support for hash semi/anti joins. (#164)

* Added implementations for HashSemiJoin and HashAntiJoin operators.

* Added component in ExecutionGenerator to convert semi/anti join nodes into relational operators.

* Add 'this' pointer in anonymous functionfor gcc build


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a39ad965
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a39ad965
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a39ad965

Branch: refs/heads/master
Commit: a39ad9654f10fbe67a90cb7b65dc30dc797f55b3
Parents: 914f2d8
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Thu Apr 14 16:28:24 2016 -0500
Committer: Jignesh Patel <pa...@users.noreply.github.com>
Committed: Thu Apr 14 16:28:24 2016 -0500

----------------------------------------------------------------------
 query_optimizer/ExecutionGenerator.cpp          |  42 +-
 .../tests/execution_generator/Select.test       |  43 +++
 relational_operators/CMakeLists.txt             |   2 +
 relational_operators/HashJoinOperator.cpp       | 360 +++++++++++++++--
 relational_operators/HashJoinOperator.hpp       | 350 ++++++++++++++---
 relational_operators/WorkOrder.proto            |  58 ++-
 relational_operators/WorkOrderFactory.cpp       | 211 ++++++++--
 storage/HashTable.hpp                           | 387 +++++++++++++++++++
 storage/LinearOpenAddressingHashTable.hpp       |  72 ++++
 storage/SeparateChainingHashTable.hpp           |  54 +++
 .../SimpleScalarSeparateChainingHashTable.hpp   |  37 ++
 storage/StorageBlock.hpp                        |  20 +
 12 files changed, 1506 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index cf90be7..aa6b0dc 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -23,6 +23,7 @@
 #include <cstddef>
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -570,16 +571,19 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
     key_types.push_back(&left_attribute_type);
   }
 
-  // Choose the smaller table as the inner build table,
-  // and the other one as the outer probe table.
   std::size_t probe_cardinality = cost_model_->estimateCardinality(probe_physical);
   std::size_t build_cardinality = cost_model_->estimateCardinality(build_physical);
-  if (probe_cardinality < build_cardinality) {
-    // Switch the probe and build physical nodes.
-    std::swap(probe_physical, build_physical);
-    std::swap(probe_cardinality, build_cardinality);
-    std::swap(probe_attribute_ids, build_attribute_ids);
-    std::swap(any_probe_attributes_nullable, any_build_attributes_nullable);
+  // For inner join, we may swap the probe table and the build table.
+  if (physical_plan->join_type() == P::HashJoin::JoinType::kInnerJoin)  {
+    // Choose the smaller table as the inner build table,
+    // and the other one as the outer probe table.
+    if (probe_cardinality < build_cardinality) {
+      // Switch the probe and build physical nodes.
+      std::swap(probe_physical, build_physical);
+      std::swap(probe_cardinality, build_cardinality);
+      std::swap(probe_attribute_ids, build_attribute_ids);
+      std::swap(any_probe_attributes_nullable, any_build_attributes_nullable);
+    }
   }
 
   // Convert the residual predicate proto.
@@ -647,6 +651,25 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
                                  &output_relation,
                                  insert_destination_proto);
 
+  // Get JoinType
+  HashJoinOperator::JoinType join_type;
+  switch (physical_plan->join_type()) {
+    case P::HashJoin::JoinType::kInnerJoin:
+      join_type = HashJoinOperator::JoinType::kInnerJoin;
+      break;
+    case P::HashJoin::JoinType::kLeftSemiJoin:
+      join_type = HashJoinOperator::JoinType::kLeftSemiJoin;
+      break;
+    case P::HashJoin::JoinType::kLeftAntiJoin:
+      join_type = HashJoinOperator::JoinType::kLeftAntiJoin;
+      break;
+    default:
+      LOG(FATAL) << "Invalid physical::HashJoin::JoinType: "
+                 << static_cast<typename std::underlying_type<P::HashJoin::JoinType>::type>(
+                        physical_plan->join_type());
+  }
+
+  // Create hash join operator
   const QueryPlan::DAGNodeIndex join_operator_index =
       execution_plan_->addRelationalOperator(
           new HashJoinOperator(
@@ -659,7 +682,8 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
               insert_destination_index,
               join_hash_table_index,
               residual_predicate_index,
-              project_expressions_group_index));
+              project_expressions_group_index,
+              join_type));
   insert_destination_proto->set_relational_op_index(join_operator_index);
 
   const QueryPlan::DAGNodeIndex destroy_operator_index =

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/query_optimizer/tests/execution_generator/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Select.test b/query_optimizer/tests/execution_generator/Select.test
index e1614cb..9bfa27c 100644
--- a/query_optimizer/tests/execution_generator/Select.test
+++ b/query_optimizer/tests/execution_generator/Select.test
@@ -721,6 +721,49 @@ WHERE CASE WHEN i < j THEN i
 +-----------+-----------+
 ==
 
+SELECT i AS odd
+FROM generate_series(0, 10, 1) AS gs1(i)
+WHERE
+  NOT EXISTS (
+    SELECT *
+    FROM generate_series(0, 10, 2) AS gs2(even)
+    WHERE i = even
+  );
+--
++-----------+
+|odd        |
++-----------+
+|          1|
+|          3|
+|          5|
+|          7|
+|          9|
++-----------+
+==
+
+SELECT i
+FROM generate_series(0, 100, 3) AS gs1(i)
+WHERE
+  EXISTS (
+    SELECT *
+    FROM generate_series(0, 100, 5) AS gs2(i)
+    WHERE gs1.i = gs2.i
+  )
+  AND NOT EXISTS (
+    SELECT *
+    FROM generate_series(0, 100, 10) AS gs3(i)
+    WHERE gs1.i = gs3.i
+  )
+  AND (i < 40 OR i > 60);
+--
++-----------+
+|i          |
++-----------+
+|         15|
+|         75|
++-----------+
+==
+
 # TODO(team): Support uncorrelated queries.
 # SELECT COUNT(*)
 # FROM test

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 17a9a6f..b02bc6b 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -176,11 +176,13 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
+                      quickstep_storage_SubBlocksReference
                       quickstep_storage_TupleReference
                       quickstep_storage_TupleStorageSubBlock
                       quickstep_storage_ValueAccessor
                       quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_utility_Macros
+                      quickstep_utility_PtrList
                       tmb)
 target_link_libraries(quickstep_relationaloperators_InsertOperator
                       glog

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index f7bbf38..e0076e3 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -35,6 +35,7 @@
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
 #include "storage/TupleReference.hpp"
 #include "storage/TupleStorageSubBlock.hpp"
 #include "storage/ValueAccessor.hpp"
@@ -85,7 +86,7 @@ class MapBasedJoinedTupleCollector {
   // Consolidation is a no-op for this version, but we provide this trivial
   // call so that MapBasedJoinedTupleCollector and
   // VectorBasedJoinedTupleCollector have the same interface and can both be
-  // used in the templated HashJoinWorkOrder::executeWithCollectorType() method.
+  // used in the templated HashInnerJoinWorkOrder::executeWithCollectorType() method.
   inline void consolidate() const {
   }
 
@@ -183,6 +184,25 @@ class VectorBasedJoinedTupleCollector {
       consolidated_joined_tuples_;
 };
 
+class SemiAntiJoinTupleCollector {
+ public:
+  explicit SemiAntiJoinTupleCollector(const TupleStorageSubBlock &tuple_store) {
+    filter_.reset(tuple_store.getExistenceMap());
+  }
+
+  template <typename ValueAccessorT>
+  inline void operator()(const ValueAccessorT &accessor) {
+    filter_->set(accessor.getCurrentPosition(), false);
+  }
+
+  const TupleIdSequence* filter() const {
+    return filter_.get();
+  }
+
+ private:
+  std::unique_ptr<TupleIdSequence> filter_;
+};
+
 }  // namespace
 
 bool HashJoinOperator::getAllWorkOrders(
@@ -191,31 +211,53 @@ bool HashJoinOperator::getAllWorkOrders(
     StorageManager *storage_manager,
     const tmb::client_id scheduler_client_id,
     tmb::MessageBus *bus) {
+  switch (join_type_) {
+    case JoinType::kInnerJoin:
+      return getAllNonOuterJoinWorkOrders<HashInnerJoinWorkOrder>(
+          container, query_context, storage_manager);
+    case JoinType::kLeftSemiJoin:
+      return getAllNonOuterJoinWorkOrders<HashSemiJoinWorkOrder>(
+          container, query_context, storage_manager);
+    case JoinType::kLeftAntiJoin:
+      return getAllNonOuterJoinWorkOrders<HashAntiJoinWorkOrder>(
+          container, query_context, storage_manager);
+    default:
+      LOG(FATAL) << "Unknown join type in HashJoinOperator::getAllWorkOrders()";
+  }
+}
+
+template <class JoinWorkOrderClass>
+bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager) {
   // We wait until the building of global hash table is complete.
   if (blocking_dependencies_met_) {
     DCHECK(query_context != nullptr);
 
-    const Predicate *residual_predicate = query_context->getPredicate(residual_predicate_index_);
+    const Predicate *residual_predicate =
+        query_context->getPredicate(residual_predicate_index_);
     const vector<unique_ptr<const Scalar>> &selection =
         query_context->getScalarGroup(selection_index_);
     InsertDestination *output_destination =
         query_context->getInsertDestination(output_destination_index_);
-    JoinHashTable *hash_table = query_context->getJoinHashTable(hash_table_index_);
+    const JoinHashTable &hash_table =
+        *(query_context->getJoinHashTable(hash_table_index_));
 
     if (probe_relation_is_stored_) {
       if (!started_) {
         for (const block_id probe_block_id : probe_relation_block_ids_) {
           container->addNormalWorkOrder(
-              new HashJoinWorkOrder(build_relation_,
-                                    probe_relation_,
-                                    join_key_attributes_,
-                                    any_join_key_attributes_nullable_,
-                                    probe_block_id,
-                                    residual_predicate,
-                                    selection,
-                                    output_destination,
-                                    hash_table,
-                                    storage_manager),
+              new JoinWorkOrderClass(build_relation_,
+                                     probe_relation_,
+                                     join_key_attributes_,
+                                     any_join_key_attributes_nullable_,
+                                     probe_block_id,
+                                     residual_predicate,
+                                     selection,
+                                     hash_table,
+                                     output_destination,
+                                     storage_manager),
               op_index_);
         }
         started_ = true;
@@ -224,27 +266,26 @@ bool HashJoinOperator::getAllWorkOrders(
     } else {
       while (num_workorders_generated_ < probe_relation_block_ids_.size()) {
         container->addNormalWorkOrder(
-            new HashJoinWorkOrder(
-                build_relation_,
-                probe_relation_,
-                join_key_attributes_,
-                any_join_key_attributes_nullable_,
-                probe_relation_block_ids_[num_workorders_generated_],
-                residual_predicate,
-                selection,
-                output_destination,
-                hash_table,
-                storage_manager),
+            new JoinWorkOrderClass(build_relation_,
+                                   probe_relation_,
+                                   join_key_attributes_,
+                                   any_join_key_attributes_nullable_,
+                                   probe_relation_block_ids_[num_workorders_generated_],
+                                   residual_predicate,
+                                   selection,
+                                   hash_table,
+                                   output_destination,
+                                   storage_manager),
             op_index_);
         ++num_workorders_generated_;
       }  // end while
       return done_feeding_input_relation_;
-    }  // end else (input_relation_is_stored is false)
-  }  // end if (blocking_dependencies_met)
+    }  // end else (probe_relation_is_stored_)
+  }  // end if (blocking_dependencies_met_)
   return false;
 }
 
-void HashJoinWorkOrder::execute() {
+void HashInnerJoinWorkOrder::execute() {
   if (FLAGS_vector_based_joined_tuple_collector) {
     executeWithCollectorType<VectorBasedJoinedTupleCollector>();
   } else {
@@ -253,7 +294,7 @@ void HashJoinWorkOrder::execute() {
 }
 
 template <typename CollectorT>
-void HashJoinWorkOrder::executeWithCollectorType() {
+void HashInnerJoinWorkOrder::executeWithCollectorType() {
   BlockReference probe_block(
       storage_manager_->getBlock(block_id_, probe_relation_));
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
@@ -261,13 +302,13 @@ void HashJoinWorkOrder::executeWithCollectorType() {
   std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
   CollectorT collector;
   if (join_key_attributes_.size() == 1) {
-    hash_table_->getAllFromValueAccessor(
+    hash_table_.getAllFromValueAccessor(
         probe_accessor.get(),
         join_key_attributes_.front(),
         any_join_key_attributes_nullable_,
         &collector);
   } else {
-    hash_table_->getAllFromValueAccessorCompositeKey(
+    hash_table_.getAllFromValueAccessorCompositeKey(
         probe_accessor.get(),
         join_key_attributes_,
         any_join_key_attributes_nullable_,
@@ -348,4 +389,263 @@ void HashJoinWorkOrder::executeWithCollectorType() {
   }
 }
 
+void HashSemiJoinWorkOrder::execute() {
+  if (residual_predicate_ == nullptr) {
+    executeWithoutResidualPredicate();
+  } else {
+    executeWithResidualPredicate();
+  }
+}
+
+void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
+  const relation_id build_relation_id = build_relation_.getID();
+  const relation_id probe_relation_id = probe_relation_.getID();
+
+  BlockReference probe_block = storage_manager_->getBlock(block_id_,
+                                                          probe_relation_);
+  const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+  std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+
+  // TODO(harshad) - Make this function work with both types of collectors.
+
+  // We collect all the matching probe relation tuples, as there's a residual
+  // preidcate that needs to be applied after collecting these matches.
+  MapBasedJoinedTupleCollector collector;
+  if (join_key_attributes_.size() == 1) {
+    hash_table_.getAllFromValueAccessor(
+        probe_accessor.get(),
+        join_key_attributes_.front(),
+        any_join_key_attributes_nullable_,
+        &collector);
+  } else {
+    hash_table_.getAllFromValueAccessorCompositeKey(
+        probe_accessor.get(),
+        join_key_attributes_,
+        any_join_key_attributes_nullable_,
+        &collector);
+  }
+
+  // Get a filter for tuples in the given probe block.
+  TupleIdSequence filter(probe_store.getMaxTupleID() + 1);
+  filter.setRange(0, filter.length(), false);
+  for (const std::pair<const block_id,
+                       std::vector<std::pair<tuple_id, tuple_id>>>
+           &build_block_entry : *collector.getJoinedTuples()) {
+    // First element of the pair build_block_entry is the build block ID
+    // 2nd element of the pair is a vector of pairs, in each of which -
+    // 1st element is a matching tuple ID from the inner (build) relation.
+    // 2nd element is a matching tuple ID from the outer (probe) relation.
+
+    // Get the block from the build relation for this pair of matched tuples.
+    BlockReference build_block =
+        storage_manager_->getBlock(build_block_entry.first, build_relation_);
+    const TupleStorageSubBlock &build_store =
+        build_block->getTupleStorageSubBlock();
+    std::unique_ptr<ValueAccessor> build_accessor(
+        build_store.createValueAccessor());
+    for (const std::pair<tuple_id, tuple_id> &hash_match
+         : build_block_entry.second) {
+      // For each pair, 1st element is a tuple ID from the build relation in the
+      // given build block, 2nd element is a tuple ID from the probe relation.
+      if (filter.get(hash_match.second)) {
+        // We have already found matches for this tuple that belongs to the
+        // probe side, skip it.
+        continue;
+      }
+      if (residual_predicate_->matchesForJoinedTuples(*build_accessor,
+                                                      build_relation_id,
+                                                      hash_match.first,
+                                                      *probe_accessor,
+                                                      probe_relation_id,
+                                                      hash_match.second)) {
+        filter.set(hash_match.second, true);
+      }
+    }
+  }
+
+  SubBlocksReference sub_blocks_ref(probe_store,
+                                    probe_block->getIndices(),
+                                    probe_block->getIndicesConsistent());
+
+  std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+      probe_store.createValueAccessor(&filter));
+  ColumnVectorsValueAccessor temp_result;
+  for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
+       selection_it != selection_.end();
+       ++selection_it) {
+    temp_result.addColumn((*selection_it)->getAllValues(
+        probe_accessor_with_filter.get(), &sub_blocks_ref));
+  }
+
+  output_destination_->bulkInsertTuples(&temp_result);
+}
+
+void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() {
+  DCHECK(residual_predicate_ == nullptr);
+
+  BlockReference probe_block = storage_manager_->getBlock(block_id_,
+                                                          probe_relation_);
+  const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+  std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+  SemiAntiJoinTupleCollector collector(probe_store);
+  // We collect all the probe relation tuples which have at least one matching
+  // tuple in the build relation. As a performance optimization, the hash table
+  // just looks for the existence of the probing key in the hash table and sets
+  // the bit for the probing key in the collector. The optimization works
+  // because there is no residual predicate in this case, unlike
+  // executeWithResidualPredicate().
+  if (join_key_attributes_.size() == 1u) {
+    // Call the collector to set the bit to 0 for every key without a match.
+    hash_table_.runOverKeysFromValueAccessorIfMatchNotFound(
+        probe_accessor.get(),
+        join_key_attributes_.front(),
+        any_join_key_attributes_nullable_,
+        &collector);
+  } else {
+    // Call the collector to set the bit to 0 for every key without a match.
+    hash_table_.runOverKeysFromValueAccessorIfMatchNotFoundCompositeKey(
+        probe_accessor.get(),
+        join_key_attributes_,
+        any_join_key_attributes_nullable_,
+        &collector);
+  }
+
+  SubBlocksReference sub_blocks_ref(probe_store,
+                                    probe_block->getIndices(),
+                                    probe_block->getIndicesConsistent());
+
+  std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+      probe_store.createValueAccessor(collector.filter()));
+  ColumnVectorsValueAccessor temp_result;
+  for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
+       selection_it != selection_.end(); ++selection_it) {
+    temp_result.addColumn((*selection_it)->getAllValues(
+        probe_accessor_with_filter.get(), &sub_blocks_ref));
+  }
+
+  output_destination_->bulkInsertTuples(&temp_result);
+}
+
+void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() {
+  DCHECK(residual_predicate_ == nullptr);
+
+  BlockReference probe_block = storage_manager_->getBlock(block_id_,
+                                                          probe_relation_);
+  const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+  std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+  SemiAntiJoinTupleCollector collector(probe_store);
+  // We probe the hash table to find the keys which have an entry in the
+  // hash table.
+  if (join_key_attributes_.size() == 1) {
+    // Call the collector to set the bit to 0 for every key with a match.
+    hash_table_.runOverKeysFromValueAccessorIfMatchFound(
+        probe_accessor.get(),
+        join_key_attributes_.front(),
+        any_join_key_attributes_nullable_,
+        &collector);
+  } else {
+    // Call the collector to set the bit to 0 for every key with a match.
+    hash_table_.runOverKeysFromValueAccessorIfMatchFoundCompositeKey(
+        probe_accessor.get(),
+        join_key_attributes_,
+        any_join_key_attributes_nullable_,
+        &collector);
+  }
+
+  SubBlocksReference sub_blocks_ref(probe_store,
+                                    probe_block->getIndices(),
+                                    probe_block->getIndicesConsistent());
+
+  std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+      probe_store.createValueAccessor(collector.filter()));
+  ColumnVectorsValueAccessor temp_result;
+  for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
+       selection_it != selection_.end(); ++selection_it) {
+    temp_result.addColumn((*selection_it)->getAllValues(
+        probe_accessor_with_filter.get(), &sub_blocks_ref));
+  }
+
+  output_destination_->bulkInsertTuples(&temp_result);
+}
+
+void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
+  const relation_id build_relation_id = build_relation_.getID();
+  const relation_id probe_relation_id = probe_relation_.getID();
+
+  BlockReference probe_block = storage_manager_->getBlock(block_id_,
+                                                          probe_relation_);
+  const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+  std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+  // TODO(harshad) - Make the following code work with both types of collectors.
+  MapBasedJoinedTupleCollector collector;
+  // We probe the hash table and get all the matches. Unlike
+  // executeWithoutResidualPredicate(), we have to collect all the matching
+  // tuples, because after this step we still have to evalute the residual
+  // predicate.
+  if (join_key_attributes_.size() == 1) {
+    hash_table_.getAllFromValueAccessor(
+        probe_accessor.get(),
+        join_key_attributes_.front(),
+        any_join_key_attributes_nullable_,
+        &collector);
+  } else {
+    hash_table_.getAllFromValueAccessorCompositeKey(
+        probe_accessor.get(),
+        join_key_attributes_,
+        any_join_key_attributes_nullable_,
+        &collector);
+  }
+
+  // Create a filter for all the tuples from the given probe block.
+  std::unique_ptr<TupleIdSequence> filter(probe_store.getExistenceMap());
+  for (const std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>
+           &build_block_entry : *collector.getJoinedTuples()) {
+    // First element of the pair build_block_entry is the build block ID
+    // 2nd element of the pair is a vector of pairs, in each of which -
+    // 1st element is a matching tuple ID from the inner (build) relation.
+    // 2nd element is a matching tuple ID from the outer (probe) relation.
+    BlockReference build_block = storage_manager_->getBlock(build_block_entry.first,
+                                                            build_relation_);
+    const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock();
+    std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor());
+    for (const std::pair<tuple_id, tuple_id> &hash_match
+         : build_block_entry.second) {
+      if (!filter->get(hash_match.second)) {
+        // We have already seen this tuple, skip it.
+        continue;
+      }
+      if (residual_predicate_->matchesForJoinedTuples(*build_accessor,
+                                                      build_relation_id,
+                                                      hash_match.first,
+                                                      *probe_accessor,
+                                                      probe_relation_id,
+                                                      hash_match.second)) {
+        // Note that the filter marks a match as false, as needed by the anti
+        // join definition.
+        filter->set(hash_match.second, false);
+      }
+    }
+  }
+
+  SubBlocksReference sub_blocks_ref(probe_store,
+                                    probe_block->getIndices(),
+                                    probe_block->getIndicesConsistent());
+
+  std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+      probe_store.createValueAccessor(filter.get()));
+  ColumnVectorsValueAccessor temp_result;
+  for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
+       selection_it != selection_.end();
+       ++selection_it) {
+    temp_result.addColumn((*selection_it)->getAllValues(probe_accessor_with_filter.get(),
+                                                     &sub_blocks_ref));
+  }
+
+  output_destination_->bulkInsertTuples(&temp_result);
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index a00e590..c22f435 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -31,6 +31,7 @@
 #include "storage/HashTable.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
+#include "utility/PtrList.hpp"
 
 #include "glog/logging.h"
 
@@ -52,11 +53,17 @@ class WorkOrdersContainer;
  */
 
 /**
- * @brief An operator which performs a hash-join on two
- *        relations.
+ * @brief An operator which performs a hash-join, including inner-join,
+ *        semi-join, anti-join and outer-join on two relations.
  **/
 class HashJoinOperator : public RelationalOperator {
  public:
+  enum class JoinType {
+    kInnerJoin = 0,
+    kLeftSemiJoin,
+    kLeftAntiJoin
+  };
+
   /**
    * @brief Constructor.
    *
@@ -98,6 +105,7 @@ class HashJoinOperator : public RelationalOperator {
    *        corresponding to the attributes of the relation referred by
    *        output_relation_id. Each Scalar is evaluated for the joined tuples,
    *        and the resulting value is inserted into the join result.
+   * @param join_type The type of join corresponding to this operator.
    **/
   HashJoinOperator(const CatalogRelation &build_relation,
                    const CatalogRelation &probe_relation,
@@ -108,21 +116,24 @@ class HashJoinOperator : public RelationalOperator {
                    const QueryContext::insert_destination_id output_destination_index,
                    const QueryContext::join_hash_table_id hash_table_index,
                    const QueryContext::predicate_id residual_predicate_index,
-                   const QueryContext::scalar_group_id selection_index)
-    : build_relation_(build_relation),
-      probe_relation_(probe_relation),
-      probe_relation_is_stored_(probe_relation_is_stored),
-      join_key_attributes_(join_key_attributes),
-      any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
-      output_relation_(output_relation),
-      output_destination_index_(output_destination_index),
-      hash_table_index_(hash_table_index),
-      residual_predicate_index_(residual_predicate_index),
-      selection_index_(selection_index),
-      probe_relation_block_ids_(probe_relation_is_stored ? probe_relation.getBlocksSnapshot()
-                                                         : std::vector<block_id>()),
-      num_workorders_generated_(0),
-      started_(false) {}
+                   const QueryContext::scalar_group_id selection_index,
+                   const JoinType join_type = JoinType::kInnerJoin)
+      : build_relation_(build_relation),
+        probe_relation_(probe_relation),
+        probe_relation_is_stored_(probe_relation_is_stored),
+        join_key_attributes_(join_key_attributes),
+        any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+        output_relation_(output_relation),
+        output_destination_index_(output_destination_index),
+        hash_table_index_(hash_table_index),
+        residual_predicate_index_(residual_predicate_index),
+        selection_index_(selection_index),
+        join_type_(join_type),
+        probe_relation_block_ids_(probe_relation_is_stored
+                                      ? probe_relation.getBlocksSnapshot()
+                                      : std::vector<block_id>()),
+        num_workorders_generated_(0),
+        started_(false) {}
 
   ~HashJoinOperator() override {}
 
@@ -164,6 +175,11 @@ class HashJoinOperator : public RelationalOperator {
   }
 
  private:
+  template <class JoinWorkOrderClass>
+  bool getAllNonOuterJoinWorkOrders(WorkOrdersContainer *container,
+                                    QueryContext *query_context,
+                                    StorageManager *storage_manager);
+
   const CatalogRelation &build_relation_;
   const CatalogRelation &probe_relation_;
   const bool probe_relation_is_stored_;
@@ -174,6 +190,7 @@ class HashJoinOperator : public RelationalOperator {
   const QueryContext::join_hash_table_id hash_table_index_;
   const QueryContext::predicate_id residual_predicate_index_;
   const QueryContext::scalar_group_id selection_index_;
+  const JoinType join_type_;
 
   std::vector<block_id> probe_relation_block_ids_;
   std::size_t num_workorders_generated_;
@@ -184,9 +201,9 @@ class HashJoinOperator : public RelationalOperator {
 };
 
 /**
- * @brief A WorkOrder produced by HashJoinOperator.
+ * @brief An inner join WorkOrder produced by HashJoinOperator.
  **/
-class HashJoinWorkOrder : public WorkOrder {
+class HashInnerJoinWorkOrder : public WorkOrder {
  public:
   /**
    * @brief Constructor.
@@ -206,20 +223,20 @@ class HashJoinWorkOrder : public WorkOrder {
    * @param selection A list of Scalars corresponding to the relation attributes
    *        in \c output_destination. Each Scalar is evaluated for the joined
    *        tuples, and the resulting value is inserted into the join result.
-   * @param output_destination The InsertDestination to insert the join results.
    * @param hash_table The JoinHashTable to use.
+   * @param output_destination The InsertDestination to insert the join results.
    * @param storage_manager The StorageManager to use.
    **/
-  HashJoinWorkOrder(const CatalogRelationSchema &build_relation,
-                    const CatalogRelationSchema &probe_relation,
-                    const std::vector<attribute_id> &join_key_attributes,
-                    const bool any_join_key_attributes_nullable,
-                    const block_id lookup_block_id,
-                    const Predicate *residual_predicate,
-                    const std::vector<std::unique_ptr<const Scalar>> &selection,
-                    InsertDestination *output_destination,
-                    JoinHashTable *hash_table,
-                    StorageManager *storage_manager)
+  HashInnerJoinWorkOrder(const CatalogRelationSchema &build_relation,
+                         const CatalogRelationSchema &probe_relation,
+                         const std::vector<attribute_id> &join_key_attributes,
+                         const bool any_join_key_attributes_nullable,
+                         const block_id lookup_block_id,
+                         const Predicate *residual_predicate,
+                         const std::vector<std::unique_ptr<const Scalar>> &selection,
+                         const JoinHashTable &hash_table,
+                         InsertDestination *output_destination,
+                         StorageManager *storage_manager)
       : build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(join_key_attributes),
@@ -227,8 +244,8 @@ class HashJoinWorkOrder : public WorkOrder {
         block_id_(lookup_block_id),
         residual_predicate_(residual_predicate),
         selection_(selection),
+        hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        hash_table_(DCHECK_NOTNULL(hash_table)),
         storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
 
   /**
@@ -249,20 +266,20 @@ class HashJoinWorkOrder : public WorkOrder {
    * @param selection A list of Scalars corresponding to the relation attributes
    *        in \c output_destination. Each Scalar is evaluated for the joined
    *        tuples, and the resulting value is inserted into the join result.
-   * @param output_destination The InsertDestination to insert the join results.
    * @param hash_table The JoinHashTable to use.
+   * @param output_destination The InsertDestination to insert the join results.
    * @param storage_manager The StorageManager to use.
    **/
-  HashJoinWorkOrder(const CatalogRelationSchema &build_relation,
-                    const CatalogRelationSchema &probe_relation,
-                    std::vector<attribute_id> &&join_key_attributes,
-                    const bool any_join_key_attributes_nullable,
-                    const block_id lookup_block_id,
-                    const Predicate *residual_predicate,
-                    const std::vector<std::unique_ptr<const Scalar>> &selection,
-                    InsertDestination *output_destination,
-                    JoinHashTable *hash_table,
-                    StorageManager *storage_manager)
+  HashInnerJoinWorkOrder(const CatalogRelationSchema &build_relation,
+                         const CatalogRelationSchema &probe_relation,
+                         std::vector<attribute_id> &&join_key_attributes,
+                         const bool any_join_key_attributes_nullable,
+                         const block_id lookup_block_id,
+                         const Predicate *residual_predicate,
+                         const std::vector<std::unique_ptr<const Scalar>> &selection,
+                         const JoinHashTable &hash_table,
+                         InsertDestination *output_destination,
+                         StorageManager *storage_manager)
       : build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(std::move(join_key_attributes)),
@@ -270,11 +287,11 @@ class HashJoinWorkOrder : public WorkOrder {
         block_id_(lookup_block_id),
         residual_predicate_(residual_predicate),
         selection_(selection),
+        hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        hash_table_(DCHECK_NOTNULL(hash_table)),
         storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
 
-  ~HashJoinWorkOrder() override {}
+  ~HashInnerJoinWorkOrder() override {}
 
   /**
    * @exception TupleTooLargeForBlock A tuple produced by this join was too
@@ -290,18 +307,257 @@ class HashJoinWorkOrder : public WorkOrder {
   template <typename CollectorT>
   void executeWithCollectorType();
 
-  const CatalogRelationSchema &build_relation_, &probe_relation_;
+  const CatalogRelationSchema &build_relation_;
+  const CatalogRelationSchema &probe_relation_;
+  const std::vector<attribute_id> join_key_attributes_;
+  const bool any_join_key_attributes_nullable_;
+  const block_id block_id_;
+  const Predicate *residual_predicate_;
+  const std::vector<std::unique_ptr<const Scalar>> &selection_;
+  const JoinHashTable &hash_table_;
+
+  InsertDestination *output_destination_;
+  StorageManager *storage_manager_;
+
+  DISALLOW_COPY_AND_ASSIGN(HashInnerJoinWorkOrder);
+};
+
+/**
+ * @brief A left semi-join WorkOrder produced by the HashJoinOperator to execute
+ *        EXISTS() clause.
+ **/
+class HashSemiJoinWorkOrder : public WorkOrder {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param build_relation The relation that the hash table was originally built
+   *        on (i.e. the inner relation in the join).
+   * @param probe_relation The relation to probe the hash table with (i.e. the
+   *        outer relation in the join).
+   * @param join_key_attributes The IDs of equijoin attributes in \c
+   *        probe_relation.
+   * @param any_join_key_attributes_nullable If any attribute is nullable.
+   * @param lookup_block_id The block id of the probe_relation.
+   * @param residual_predicate If non-null, apply as an additional filter to
+   *        pairs of tuples that match the hash-join (i.e. key equality)
+   *        predicate. Effectively, this makes the join predicate the
+   *        conjunction of the key-equality predicate and residual_predicate.
+   * @param selection A list of Scalars corresponding to the relation attributes
+   *        in \c output_destination. Each Scalar is evaluated for the joined
+   *        tuples, and the resulting value is inserted into the join result.
+   * @param hash_table The JoinHashTable to use.
+   * @param output_destination The InsertDestination to insert the join results.
+   * @param storage_manager The StorageManager to use.
+   **/
+  HashSemiJoinWorkOrder(const CatalogRelationSchema &build_relation,
+                        const CatalogRelationSchema &probe_relation,
+                        const std::vector<attribute_id> &join_key_attributes,
+                        const bool any_join_key_attributes_nullable,
+                        const block_id lookup_block_id,
+                        const Predicate *residual_predicate,
+                        const std::vector<std::unique_ptr<const Scalar>> &selection,
+                        const JoinHashTable &hash_table,
+                        InsertDestination *output_destination,
+                        StorageManager *storage_manager)
+      : build_relation_(build_relation),
+        probe_relation_(probe_relation),
+        join_key_attributes_(join_key_attributes),
+        any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+        block_id_(lookup_block_id),
+        residual_predicate_(residual_predicate),
+        selection_(selection),
+        hash_table_(hash_table),
+        output_destination_(DCHECK_NOTNULL(output_destination)),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
+  /**
+   * @brief Constructor for the distributed version.
+   *
+   * @param build_relation The relation that the hash table was originally built
+   *        on (i.e. the inner relation in the join).
+   * @param probe_relation The relation to probe the hash table with (i.e. the
+   *        outer relation in the join).
+   * @param join_key_attributes The IDs of equijoin attributes in \c
+   *        probe_relation.
+   * @param any_join_key_attributes_nullable If any attribute is nullable.
+   * @param lookup_block_id The block id of the probe_relation.
+   * @param residual_predicate If non-null, apply as an additional filter to
+   *        pairs of tuples that match the hash-join (i.e. key equality)
+   *        predicate. Effectively, this makes the join predicate the
+   *        conjunction of the key-equality predicate and residual_predicate.
+   * @param selection A list of Scalars corresponding to the relation attributes
+   *        in \c output_destination. Each Scalar is evaluated for the joined
+   *        tuples, and the resulting value is inserted into the join result.
+   * @param hash_table The JoinHashTable to use.
+   * @param output_destination The InsertDestination to insert the join results.
+   * @param storage_manager The StorageManager to use.
+   **/
+  HashSemiJoinWorkOrder(const CatalogRelationSchema &build_relation,
+                        const CatalogRelationSchema &probe_relation,
+                        std::vector<attribute_id> &&join_key_attributes,
+                        const bool any_join_key_attributes_nullable,
+                        const block_id lookup_block_id,
+                        const Predicate *residual_predicate,
+                        const std::vector<std::unique_ptr<const Scalar>> &selection,
+                        const JoinHashTable &hash_table,
+                        InsertDestination *output_destination,
+                        StorageManager *storage_manager)
+      : build_relation_(build_relation),
+        probe_relation_(probe_relation),
+        join_key_attributes_(std::move(join_key_attributes)),
+        any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+        block_id_(lookup_block_id),
+        residual_predicate_(residual_predicate),
+        selection_(selection),
+        hash_table_(hash_table),
+        output_destination_(DCHECK_NOTNULL(output_destination)),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
+  ~HashSemiJoinWorkOrder() override {}
+
+  void execute() override;
+
+ private:
+  void executeWithoutResidualPredicate();
+
+  void executeWithResidualPredicate();
+
+  const CatalogRelationSchema &build_relation_;
+  const CatalogRelationSchema &probe_relation_;
+  const std::vector<attribute_id> join_key_attributes_;
+  const bool any_join_key_attributes_nullable_;
+  const block_id block_id_;
+  const Predicate *residual_predicate_;
+  const std::vector<std::unique_ptr<const Scalar>> &selection_;
+  const JoinHashTable &hash_table_;
+
+  InsertDestination *output_destination_;
+  StorageManager *storage_manager_;
+
+  DISALLOW_COPY_AND_ASSIGN(HashSemiJoinWorkOrder);
+};
+
+/**
+ * @brief A left anti-join WorkOrder produced by the HashJoinOperator to execute
+ *        NOT EXISTS() clause.
+ **/
+class HashAntiJoinWorkOrder : public WorkOrder {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param build_relation The relation that the hash table was originally built
+   *        on (i.e. the inner relation in the join).
+   * @param probe_relation The relation to probe the hash table with (i.e. the
+   *        outer relation in the join).
+   * @param join_key_attributes The IDs of equijoin attributes in \c
+   *        probe_relation.
+   * @param any_join_key_attributes_nullable If any attribute is nullable.
+   * @param lookup_block_id The block id of the probe_relation.
+   * @param residual_predicate If non-null, apply as an additional filter to
+   *        pairs of tuples that match the hash-join (i.e. key equality)
+   *        predicate. Effectively, this makes the join predicate the
+   *        conjunction of the key-equality predicate and residual_predicate.
+   * @param selection A list of Scalars corresponding to the relation attributes
+   *        in \c output_destination. Each Scalar is evaluated for the joined
+   *        tuples, and the resulting value is inserted into the join result.
+   * @param hash_table The JoinHashTable to use.
+   * @param output_destination The InsertDestination to insert the join results.
+   * @param storage_manager The StorageManager to use.
+   **/
+  HashAntiJoinWorkOrder(const CatalogRelationSchema &build_relation,
+                        const CatalogRelationSchema &probe_relation,
+                        const std::vector<attribute_id> &join_key_attributes,
+                        const bool any_join_key_attributes_nullable,
+                        const block_id lookup_block_id,
+                        const Predicate *residual_predicate,
+                        const std::vector<std::unique_ptr<const Scalar>> &selection,
+                        const JoinHashTable &hash_table,
+                        InsertDestination *output_destination,
+                        StorageManager *storage_manager)
+      : build_relation_(build_relation),
+        probe_relation_(probe_relation),
+        join_key_attributes_(join_key_attributes),
+        any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+        block_id_(lookup_block_id),
+        residual_predicate_(residual_predicate),
+        selection_(selection),
+        hash_table_(hash_table),
+        output_destination_(DCHECK_NOTNULL(output_destination)),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
+  /**
+   * @brief Constructor for the distributed version.
+   *
+   * @param build_relation The relation that the hash table was originally built
+   *        on (i.e. the inner relation in the join).
+   * @param probe_relation The relation to probe the hash table with (i.e. the
+   *        outer relation in the join).
+   * @param join_key_attributes The IDs of equijoin attributes in \c
+   *        probe_relation.
+   * @param any_join_key_attributes_nullable If any attribute is nullable.
+   * @param lookup_block_id The block id of the probe_relation.
+   * @param residual_predicate If non-null, apply as an additional filter to
+   *        pairs of tuples that match the hash-join (i.e. key equality)
+   *        predicate. Effectively, this makes the join predicate the
+   *        conjunction of the key-equality predicate and residual_predicate.
+   * @param selection A list of Scalars corresponding to the relation attributes
+   *        in \c output_destination. Each Scalar is evaluated for the joined
+   *        tuples, and the resulting value is inserted into the join result.
+   * @param hash_table The JoinHashTable to use.
+   * @param output_destination The InsertDestination to insert the join results.
+   * @param storage_manager The StorageManager to use.
+   **/
+  HashAntiJoinWorkOrder(const CatalogRelationSchema &build_relation,
+                        const CatalogRelationSchema &probe_relation,
+                        std::vector<attribute_id> &&join_key_attributes,
+                        const bool any_join_key_attributes_nullable,
+                        const block_id lookup_block_id,
+                        const Predicate *residual_predicate,
+                        const std::vector<std::unique_ptr<const Scalar>> &selection,
+                        const JoinHashTable &hash_table,
+                        InsertDestination *output_destination,
+                        StorageManager *storage_manager)
+      : build_relation_(build_relation),
+        probe_relation_(probe_relation),
+        join_key_attributes_(join_key_attributes),
+        any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+        block_id_(lookup_block_id),
+        residual_predicate_(residual_predicate),
+        selection_(selection),
+        hash_table_(hash_table),
+        output_destination_(DCHECK_NOTNULL(output_destination)),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
+  ~HashAntiJoinWorkOrder() override {}
+
+  void execute() override {
+    if (residual_predicate_ == nullptr) {
+      executeWithoutResidualPredicate();
+    } else {
+      executeWithResidualPredicate();
+    }
+  }
+
+ private:
+  void executeWithoutResidualPredicate();
+
+  void executeWithResidualPredicate();
+
+  const CatalogRelationSchema &build_relation_;
+  const CatalogRelationSchema &probe_relation_;
   const std::vector<attribute_id> join_key_attributes_;
   const bool any_join_key_attributes_nullable_;
   const block_id block_id_;
   const Predicate *residual_predicate_;
   const std::vector<std::unique_ptr<const Scalar>> &selection_;
+  const JoinHashTable &hash_table_;
 
   InsertDestination *output_destination_;
-  JoinHashTable *hash_table_;
   StorageManager *storage_manager_;
 
-  DISALLOW_COPY_AND_ASSIGN(HashJoinWorkOrder);
+  DISALLOW_COPY_AND_ASSIGN(HashAntiJoinWorkOrder);
 };
 
 /** @} */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index e0ec19d..1a0bcd1 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -29,18 +29,20 @@ enum WorkOrderType {
   DESTROY_HASH = 6;
   DROP_TABLE = 7;
   FINALIZE_AGGREGATION = 8;
-  HASH_JOIN = 9;
-  INSERT = 10;
-  NESTED_LOOP_JOIN = 11;
-  SAMPLE = 12;
-  SAVE_BLOCKS = 13;
-  SELECT = 14;
-  SORT_MERGE_RUN = 15;
-  SORT_RUN_GENERATION = 16;
-  TABLE_GENERATOR = 17;
-  TEXT_SCAN = 18;
-  TEXT_SPLIT = 19;
-  UPDATE = 20;
+  HASH_INNER_JOIN = 9;
+  HASH_SEMI_JOIN = 10;
+  HASH_ANTI_JOIN = 11;
+  INSERT = 12;
+  NESTED_LOOP_JOIN = 13;
+  SAMPLE = 14;
+  SAVE_BLOCKS = 15;
+  SELECT = 16;
+  SORT_MERGE_RUN = 17;
+  SORT_RUN_GENERATION = 18;
+  TABLE_GENERATOR = 19;
+  TEXT_SCAN = 20;
+  TEXT_SPLIT = 21;
+  UPDATE = 22;
 }
 
 message WorkOrder {
@@ -104,7 +106,7 @@ message FinalizeAggregationWorkOrder {
   }
 }
 
-message HashJoinWorkOrder {
+message HashInnerJoinWorkOrder {
   extend WorkOrder {
     // All required.
     optional int32 build_relation_id = 160;
@@ -119,6 +121,36 @@ message HashJoinWorkOrder {
   }
 }
 
+message HashAntiJoinWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional int32 build_relation_id = 350;
+    optional int32 probe_relation_id = 351;
+    repeated int32 join_key_attributes = 352;
+    optional bool any_join_key_attributes_nullable = 353;
+    optional int32 insert_destination_index = 354;
+    optional uint32 join_hash_table_index = 355;
+    optional int32 residual_predicate_index = 356;
+    optional int32 selection_index = 357;
+    optional fixed64 block_id = 358;
+  }
+}
+
+message HashSemiJoinWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional int32 build_relation_id = 360;
+    optional int32 probe_relation_id = 361;
+    repeated int32 join_key_attributes = 362;
+    optional bool any_join_key_attributes_nullable = 363;
+    optional int32 insert_destination_index = 364;
+    optional uint32 join_hash_table_index = 365;
+    optional int32 residual_predicate_index = 366;
+    optional int32 selection_index = 367;
+    optional fixed64 block_id = 368;
+  }
+}
+
 message InsertWorkOrder {
   extend WorkOrder {
     // All required.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 4713681..92c1140 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -135,30 +135,88 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           query_context->getInsertDestination(
               proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index)));
     }
-    case serialization::HASH_JOIN: {
-      LOG(INFO) << "Creating HashJoinWorkOrder";
+    case serialization::HASH_INNER_JOIN: {
+      LOG(INFO) << "Creating HashInnerJoinWorkOrder";
       vector<attribute_id> join_key_attributes;
-      for (int i = 0; i < proto.ExtensionSize(serialization::HashJoinWorkOrder::join_key_attributes); ++i) {
+      const int join_key_attributes_size =
+          proto.ExtensionSize(serialization::HashAntiJoinWorkOrder::join_key_attributes);
+      for (int i = 0; i < join_key_attributes_size; ++i) {
         join_key_attributes.push_back(
-            proto.GetExtension(serialization::HashJoinWorkOrder::join_key_attributes, i));
+            proto.GetExtension(serialization::HashInnerJoinWorkOrder::join_key_attributes, i));
       }
 
-      return new HashJoinWorkOrder(
+      return new HashInnerJoinWorkOrder(
           catalog_database->getRelationSchemaById(
-              proto.GetExtension(serialization::HashJoinWorkOrder::build_relation_id)),
+              proto.GetExtension(serialization::HashInnerJoinWorkOrder::build_relation_id)),
           catalog_database->getRelationSchemaById(
-              proto.GetExtension(serialization::HashJoinWorkOrder::probe_relation_id)),
+              proto.GetExtension(serialization::HashInnerJoinWorkOrder::probe_relation_id)),
           move(join_key_attributes),
-          proto.GetExtension(serialization::HashJoinWorkOrder::any_join_key_attributes_nullable),
-          proto.GetExtension(serialization::HashJoinWorkOrder::block_id),
+          proto.GetExtension(serialization::HashInnerJoinWorkOrder::any_join_key_attributes_nullable),
+          proto.GetExtension(serialization::HashInnerJoinWorkOrder::block_id),
           query_context->getPredicate(
-              proto.GetExtension(serialization::HashJoinWorkOrder::residual_predicate_index)),
+              proto.GetExtension(serialization::HashInnerJoinWorkOrder::residual_predicate_index)),
           query_context->getScalarGroup(
-              proto.GetExtension(serialization::HashJoinWorkOrder::selection_index)),
+              proto.GetExtension(serialization::HashInnerJoinWorkOrder::selection_index)),
+          *query_context->getJoinHashTable(
+              proto.GetExtension(serialization::HashInnerJoinWorkOrder::join_hash_table_index)),
           query_context->getInsertDestination(
-              proto.GetExtension(serialization::HashJoinWorkOrder::insert_destination_index)),
-          query_context->getJoinHashTable(
-              proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index)),
+              proto.GetExtension(serialization::HashInnerJoinWorkOrder::insert_destination_index)),
+          storage_manager);
+    }
+    case serialization::HASH_SEMI_JOIN: {
+      LOG(INFO) << "Creating HashSemiJoinWorkOrder";
+      vector<attribute_id> join_key_attributes;
+      const int join_key_attributes_size =
+          proto.ExtensionSize(serialization::HashAntiJoinWorkOrder::join_key_attributes);
+      for (int i = 0; i < join_key_attributes_size; ++i) {
+        join_key_attributes.push_back(
+            proto.GetExtension(serialization::HashSemiJoinWorkOrder::join_key_attributes, i));
+      }
+
+      return new HashSemiJoinWorkOrder(
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::HashSemiJoinWorkOrder::build_relation_id)),
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::HashSemiJoinWorkOrder::probe_relation_id)),
+          move(join_key_attributes),
+          proto.GetExtension(serialization::HashSemiJoinWorkOrder::any_join_key_attributes_nullable),
+          proto.GetExtension(serialization::HashSemiJoinWorkOrder::block_id),
+          query_context->getPredicate(
+              proto.GetExtension(serialization::HashSemiJoinWorkOrder::residual_predicate_index)),
+          query_context->getScalarGroup(
+              proto.GetExtension(serialization::HashSemiJoinWorkOrder::selection_index)),
+          *query_context->getJoinHashTable(
+              proto.GetExtension(serialization::HashSemiJoinWorkOrder::join_hash_table_index)),
+          query_context->getInsertDestination(
+              proto.GetExtension(serialization::HashSemiJoinWorkOrder::insert_destination_index)),
+          storage_manager);
+    }
+    case serialization::HASH_ANTI_JOIN: {
+      LOG(INFO) << "Creating HashAntiJoinWorkOrder";
+      vector<attribute_id> join_key_attributes;
+      const int join_key_attributes_size =
+          proto.ExtensionSize(serialization::HashAntiJoinWorkOrder::join_key_attributes);
+      for (int i = 0; i < join_key_attributes_size; ++i) {
+        join_key_attributes.push_back(
+            proto.GetExtension(serialization::HashAntiJoinWorkOrder::join_key_attributes, i));
+      }
+
+      return new HashAntiJoinWorkOrder(
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::HashAntiJoinWorkOrder::build_relation_id)),
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::HashAntiJoinWorkOrder::probe_relation_id)),
+          move(join_key_attributes),
+          proto.GetExtension(serialization::HashAntiJoinWorkOrder::any_join_key_attributes_nullable),
+          proto.GetExtension(serialization::HashAntiJoinWorkOrder::block_id),
+          query_context->getPredicate(
+              proto.GetExtension(serialization::HashAntiJoinWorkOrder::residual_predicate_index)),
+          query_context->getScalarGroup(
+              proto.GetExtension(serialization::HashAntiJoinWorkOrder::selection_index)),
+          *query_context->getJoinHashTable(
+              proto.GetExtension(serialization::HashAntiJoinWorkOrder::join_hash_table_index)),
+          query_context->getInsertDestination(
+              proto.GetExtension(serialization::HashAntiJoinWorkOrder::insert_destination_index)),
           storage_manager);
     }
     case serialization::INSERT: {
@@ -394,46 +452,137 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
              query_context.isValidInsertDestinationId(
                  proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index));
     }
-    case serialization::HASH_JOIN: {
-      if (!proto.HasExtension(serialization::HashJoinWorkOrder::build_relation_id) ||
-          !proto.HasExtension(serialization::HashJoinWorkOrder::probe_relation_id)) {
+    case serialization::HASH_INNER_JOIN: {
+      if (!proto.HasExtension(serialization::HashInnerJoinWorkOrder::build_relation_id) ||
+          !proto.HasExtension(serialization::HashInnerJoinWorkOrder::probe_relation_id)) {
+        return false;
+      }
+
+      const relation_id build_relation_id =
+          proto.GetExtension(serialization::HashInnerJoinWorkOrder::build_relation_id);
+      if (!catalog_database.hasRelationWithId(build_relation_id)) {
+        return false;
+      }
+
+      const relation_id probe_relation_id =
+          proto.GetExtension(serialization::HashInnerJoinWorkOrder::probe_relation_id);
+      if (!catalog_database.hasRelationWithId(probe_relation_id)) {
+        return false;
+      }
+
+      const CatalogRelationSchema &build_relation = catalog_database.getRelationSchemaById(build_relation_id);
+      const CatalogRelationSchema &probe_relation = catalog_database.getRelationSchemaById(probe_relation_id);
+      for (int i = 0; i < proto.ExtensionSize(serialization::HashInnerJoinWorkOrder::join_key_attributes); ++i) {
+        const attribute_id attr_id =
+            proto.GetExtension(serialization::HashInnerJoinWorkOrder::join_key_attributes, i);
+        if (!build_relation.hasAttributeWithId(attr_id) ||
+            !probe_relation.hasAttributeWithId(attr_id)) {
+          return false;
+        }
+      }
+
+      return proto.HasExtension(serialization::HashInnerJoinWorkOrder::any_join_key_attributes_nullable) &&
+             proto.HasExtension(serialization::HashInnerJoinWorkOrder::insert_destination_index) &&
+             query_context.isValidInsertDestinationId(
+                 proto.GetExtension(serialization::HashInnerJoinWorkOrder::insert_destination_index)) &&
+             proto.HasExtension(serialization::HashInnerJoinWorkOrder::join_hash_table_index) &&
+             query_context.isValidJoinHashTableId(
+                 proto.GetExtension(serialization::HashInnerJoinWorkOrder::join_hash_table_index)) &&
+             proto.HasExtension(serialization::HashInnerJoinWorkOrder::residual_predicate_index) &&
+             query_context.isValidPredicate(
+                 proto.GetExtension(serialization::HashInnerJoinWorkOrder::residual_predicate_index)) &&
+             proto.HasExtension(serialization::HashInnerJoinWorkOrder::selection_index) &&
+             query_context.isValidScalarGroupId(
+                 proto.GetExtension(serialization::HashInnerJoinWorkOrder::selection_index)) &&
+             proto.HasExtension(serialization::HashInnerJoinWorkOrder::block_id);
+    }
+    case serialization::HASH_SEMI_JOIN: {
+      if (!proto.HasExtension(serialization::HashSemiJoinWorkOrder::build_relation_id) ||
+          !proto.HasExtension(serialization::HashSemiJoinWorkOrder::probe_relation_id)) {
+        return false;
+      }
+
+      const relation_id build_relation_id =
+          proto.GetExtension(serialization::HashSemiJoinWorkOrder::build_relation_id);
+      if (!catalog_database.hasRelationWithId(build_relation_id)) {
+        return false;
+      }
+
+      const relation_id probe_relation_id =
+          proto.GetExtension(serialization::HashSemiJoinWorkOrder::probe_relation_id);
+      if (!catalog_database.hasRelationWithId(probe_relation_id)) {
+        return false;
+      }
+
+      const CatalogRelationSchema &build_relation = catalog_database.getRelationSchemaById(build_relation_id);
+      const CatalogRelationSchema &probe_relation = catalog_database.getRelationSchemaById(probe_relation_id);
+      for (int i = 0; i < proto.ExtensionSize(serialization::HashSemiJoinWorkOrder::join_key_attributes); ++i) {
+        const attribute_id attr_id =
+            proto.GetExtension(serialization::HashSemiJoinWorkOrder::join_key_attributes, i);
+        if (!build_relation.hasAttributeWithId(attr_id) ||
+            !probe_relation.hasAttributeWithId(attr_id)) {
+          return false;
+        }
+      }
+
+      return proto.HasExtension(serialization::HashSemiJoinWorkOrder::any_join_key_attributes_nullable) &&
+             proto.HasExtension(serialization::HashSemiJoinWorkOrder::insert_destination_index) &&
+             query_context.isValidInsertDestinationId(
+                 proto.GetExtension(serialization::HashSemiJoinWorkOrder::insert_destination_index)) &&
+             proto.HasExtension(serialization::HashSemiJoinWorkOrder::join_hash_table_index) &&
+             query_context.isValidJoinHashTableId(
+                 proto.GetExtension(serialization::HashSemiJoinWorkOrder::join_hash_table_index)) &&
+             proto.HasExtension(serialization::HashSemiJoinWorkOrder::residual_predicate_index) &&
+             query_context.isValidPredicate(
+                 proto.GetExtension(serialization::HashSemiJoinWorkOrder::residual_predicate_index)) &&
+             proto.HasExtension(serialization::HashSemiJoinWorkOrder::selection_index) &&
+             query_context.isValidScalarGroupId(
+                 proto.GetExtension(serialization::HashSemiJoinWorkOrder::selection_index)) &&
+             proto.HasExtension(serialization::HashSemiJoinWorkOrder::block_id);
+    }
+    case serialization::HASH_ANTI_JOIN: {
+      if (!proto.HasExtension(serialization::HashAntiJoinWorkOrder::build_relation_id) ||
+          !proto.HasExtension(serialization::HashAntiJoinWorkOrder::probe_relation_id)) {
         return false;
       }
 
-      const relation_id build_relation_id = proto.GetExtension(serialization::HashJoinWorkOrder::build_relation_id);
+      const relation_id build_relation_id =
+          proto.GetExtension(serialization::HashAntiJoinWorkOrder::build_relation_id);
       if (!catalog_database.hasRelationWithId(build_relation_id)) {
         return false;
       }
 
-      const relation_id probe_relation_id = proto.GetExtension(serialization::HashJoinWorkOrder::probe_relation_id);
+      const relation_id probe_relation_id =
+          proto.GetExtension(serialization::HashAntiJoinWorkOrder::probe_relation_id);
       if (!catalog_database.hasRelationWithId(probe_relation_id)) {
         return false;
       }
 
       const CatalogRelationSchema &build_relation = catalog_database.getRelationSchemaById(build_relation_id);
       const CatalogRelationSchema &probe_relation = catalog_database.getRelationSchemaById(probe_relation_id);
-      for (int i = 0; i < proto.ExtensionSize(serialization::HashJoinWorkOrder::join_key_attributes); ++i) {
-        const attribute_id attr_id = proto.GetExtension(serialization::HashJoinWorkOrder::join_key_attributes, i);
+      for (int i = 0; i < proto.ExtensionSize(serialization::HashAntiJoinWorkOrder::join_key_attributes); ++i) {
+        const attribute_id attr_id =
+            proto.GetExtension(serialization::HashAntiJoinWorkOrder::join_key_attributes, i);
         if (!build_relation.hasAttributeWithId(attr_id) ||
             !probe_relation.hasAttributeWithId(attr_id)) {
           return false;
         }
       }
 
-      return proto.HasExtension(serialization::HashJoinWorkOrder::any_join_key_attributes_nullable) &&
-             proto.HasExtension(serialization::HashJoinWorkOrder::insert_destination_index) &&
+      return proto.HasExtension(serialization::HashAntiJoinWorkOrder::any_join_key_attributes_nullable) &&
+             proto.HasExtension(serialization::HashAntiJoinWorkOrder::insert_destination_index) &&
              query_context.isValidInsertDestinationId(
-                 proto.GetExtension(serialization::HashJoinWorkOrder::insert_destination_index)) &&
-             proto.HasExtension(serialization::HashJoinWorkOrder::join_hash_table_index) &&
+                 proto.GetExtension(serialization::HashAntiJoinWorkOrder::insert_destination_index)) &&
+             proto.HasExtension(serialization::HashAntiJoinWorkOrder::join_hash_table_index) &&
              query_context.isValidJoinHashTableId(
-                 proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index)) &&
-             proto.HasExtension(serialization::HashJoinWorkOrder::residual_predicate_index) &&
+                 proto.GetExtension(serialization::HashAntiJoinWorkOrder::join_hash_table_index)) &&
+             proto.HasExtension(serialization::HashAntiJoinWorkOrder::residual_predicate_index) &&
              query_context.isValidPredicate(
-                 proto.GetExtension(serialization::HashJoinWorkOrder::residual_predicate_index)) &&
-             proto.HasExtension(serialization::HashJoinWorkOrder::selection_index) &&
+                 proto.GetExtension(serialization::HashAntiJoinWorkOrder::residual_predicate_index)) &&
+             proto.HasExtension(serialization::HashAntiJoinWorkOrder::selection_index) &&
              query_context.isValidScalarGroupId(
-                 proto.GetExtension(serialization::HashJoinWorkOrder::selection_index)) &&
-             proto.HasExtension(serialization::HashJoinWorkOrder::block_id);
+                 proto.GetExtension(serialization::HashAntiJoinWorkOrder::selection_index)) &&
+             proto.HasExtension(serialization::HashAntiJoinWorkOrder::block_id);
     }
     case serialization::INSERT: {
       return proto.HasExtension(serialization::InsertWorkOrder::insert_destination_index) &&

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a39ad965/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index ab74e2c..ef79d11 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -707,6 +707,89 @@ class HashTable : public HashTableBase<resizable,
                                FunctorT *functor) const;
 
   /**
+   * @brief Lookup (multiple) keys from a ValueAccessor, apply a functor to
+   *        the matching values and additionally call a hasMatch() function of
+   *        the functor when the first match for a key is found.
+   * @warning This method assumes that no concurrent calls to put(),
+   *          putCompositeKey(), putValueAccessor(),
+   *          putValueAccessorCompositeKey(), upsert(), upsertCompositeKey(),
+   *          upsertValueAccessor(), or upsertValueAccessorCompositeKey() are
+   *          taking place (i.e. that this HashTable is immutable for the
+   *          duration of the call and as long as the returned pointer may be
+   *          dereferenced). Concurrent calls to getSingle(),
+   *          getSingleCompositeKey(), getAll(), getAllCompositeKey(),
+   *          getAllFromValueAccessor(), getAllFromValueAccessorCompositeKey(),
+   *          forEach(), and forEachCompositeKey() are safe.
+   * @note This version is for single scalar keys. See also
+   *       getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch().
+   *
+   * @param accessor A ValueAccessor which will be used to access keys.
+   *        beginIteration() should be called on accessor before calling this
+   *        method.
+   * @param key_attr_id The attribute ID of the keys to be read from accessor.
+   * @param check_for_null_keys If true, each key will be checked to see if it
+   *        is null before looking it up (null keys are skipped). This must be
+   *        set to true if some of the keys that will be read from accessor may
+   *        be null.
+   * @param functor A pointer to a functor, which should provide two functions:
+   *        1) An operator that takes 2 arguments: const ValueAccessor& (or better
+   *        yet, a templated call operator which takes a const reference to
+   *        some subclass of ValueAccessor as its first argument) and
+   *        const ValueT&. The operator will be invoked once for each pair of a
+   *        key taken from accessor and matching value.
+   *        2) A function hasMatch that takes 1 argument: const ValueAccessor&.
+   *        The function will be called only once for a key from accessor when
+   *        the first match is found.
+   */
+  template <typename FunctorT>
+  void getAllFromValueAccessorWithExtraWorkForFirstMatch(
+      ValueAccessor *accessor,
+      const attribute_id key_attr_id,
+      const bool check_for_null_keys,
+      FunctorT *functor) const;
+
+  /**
+   * @brief Lookup (multiple) keys from a ValueAccessor, apply a functor to
+   *        the matching values and additionally call a hasMatch() function of
+   *        the functor when the first match for a key is found. Composite key
+   *        version.
+   * @warning This method assumes that no concurrent calls to put(),
+   *          putCompositeKey(), putValueAccessor(),
+   *          putValueAccessorCompositeKey(), upsert(), upsertCompositeKey(),
+   *          upsertValueAccessor(), or upsertValueAccessorCompositeKey() are
+   *          taking place (i.e. that this HashTable is immutable for the
+   *          duration of the call and as long as the returned pointer may be
+   *          dereferenced). Concurrent calls to getSingle(),
+   *          getSingleCompositeKey(), getAll(), getAllCompositeKey(),
+   *          getAllFromValueAccessor(), getAllFromValueAccessorCompositeKey(),
+   *          forEach(), and forEachCompositeKey() are safe.
+   *
+   * @param accessor A ValueAccessor which will be used to access keys.
+   *        beginIteration() should be called on accessor before calling this
+   *        method.
+   * @param key_attr_id The attribute ID of the keys to be read from accessor.
+   * @param check_for_null_keys If true, each key will be checked to see if it
+   *        is null before looking it up (null keys are skipped). This must be
+   *        set to true if some of the keys that will be read from accessor may
+   *        be null.
+   * @param functor A pointer to a functor, which should provide two functions:
+   *        1) An operator that takes 2 arguments: const ValueAccessor& (or better
+   *        yet, a templated call operator which takes a const reference to
+   *        some subclass of ValueAccessor as its first argument) and
+   *        const ValueT&. The operator will be invoked once for each pair of a
+   *        key taken from accessor and matching value.
+   *        2) A function hasMatch that takes 1 argument: const ValueAccessor&.
+   *        The function will be called only once for a key from accessor when
+   *        the first match is found.
+   */
+  template <typename FunctorT>
+  void getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch(
+      ValueAccessor *accessor,
+      const std::vector<attribute_id> &key_attr_ids,
+      const bool check_for_null_keys,
+      FunctorT *functor) const;
+
+  /**
    * @brief Lookup (multiple) keys from a ValueAccessor and apply a functor to
    *        the matching values. Composite key version.
    *
@@ -746,6 +829,113 @@ class HashTable : public HashTableBase<resizable,
                                            FunctorT *functor) const;
 
   /**
+   * @brief Apply the functor to each key with a match in the hash table.
+   *
+   * @param accessor A ValueAccessor which will be used to access keys.
+   *        beginIteration() should be called on accessor before calling this
+   *        method.
+   * @param key_attr_id The attribute ID of the keys to be read from accessor.
+   * @param check_for_null_keys If true, each key will be checked to see if it
+   *        is null before looking it up (null keys are skipped). This must be
+   *        set to true if some of the keys that will be read from accessor may
+   *        be null.
+   * @param functor A pointer to a functor which should provide an operator that
+   *        takes 1 argument: const ValueAccessor&. The operator will be called
+   *        only once for a key from accessor if there is a match.
+   */
+  template <typename FunctorT>
+  void runOverKeysFromValueAccessorIfMatchFound(ValueAccessor *accessor,
+                                                const attribute_id key_attr_id,
+                                                const bool check_for_null_keys,
+                                                FunctorT *functor) const {
+    return runOverKeysFromValueAccessor<true>(accessor,
+                                              key_attr_id,
+                                              check_for_null_keys,
+                                              functor);
+  }
+
+  /**
+   * @brief Apply the functor to each key with a match in the hash table.
+   *
+   * @param accessor A ValueAccessor which will be used to access keys.
+   *        beginIteration() should be called on accessor before calling this
+   *        method.
+   * @param key_attr_id The attribute ID of the keys to be read from accessor.
+   * @param check_for_null_keys If true, each key will be checked to see if it
+   *        is null before looking it up (null keys are skipped). This must be
+   *        set to true if some of the keys that will be read from accessor may
+   *        be null.
+   * @param functor A pointer to a functor which should provide an operator that
+   *        takes 1 argument: const ValueAccessor&. The operator will be called
+   *        only once for a key from accessor if there is a match.
+   */
+  template <typename FunctorT>
+  void runOverKeysFromValueAccessorIfMatchFoundCompositeKey(
+      ValueAccessor *accessor,
+      const std::vector<attribute_id> &key_attr_ids,
+      const bool check_for_null_keys,
+      FunctorT *functor) const {
+    return runOverKeysFromValueAccessorCompositeKey<true>(accessor,
+                                                          key_attr_ids,
+                                                          check_for_null_keys,
+                                                          functor);
+  }
+
+  /**
+   * @brief Apply the functor to each key without a match in the hash table.
+   *
+   * @param accessor A ValueAccessor which will be used to access keys.
+   *        beginIteration() should be called on accessor before calling this
+   *        method.
+   * @param key_attr_id The attribute ID of the keys to be read from accessor.
+   * @param check_for_null_keys If true, each key will be checked to see if it
+   *        is null before looking it up (null keys are skipped). This must be
+   *        set to true if some of the keys that will be read from accessor may
+   *        be null.
+   * @param functor A pointer to a functor which should provide an operator that
+   *        takes 1 argument: const ValueAccessor&. The operator will be called
+   *        only once for a key from accessor if there is no match.
+   */
+  template <typename FunctorT>
+  void runOverKeysFromValueAccessorIfMatchNotFound(
+      ValueAccessor *accessor,
+      const attribute_id key_attr_id,
+      const bool check_for_null_keys,
+      FunctorT *functor) const {
+    return runOverKeysFromValueAccessor<false>(accessor,
+                                               key_attr_id,
+                                               check_for_null_keys,
+                                               functor);
+  }
+
+  /**
+   * @brief Apply the functor to each key without a match in the hash table.
+   *
+   * @param accessor A ValueAccessor which will be used to access keys.
+   *        beginIteration() should be called on accessor before calling this
+   *        method.
+   * @param key_attr_id The attribute ID of the keys to be read from accessor.
+   * @param check_for_null_keys If true, each key will be checked to see if it
+   *        is null before looking it up (null keys are skipped). This must be
+   *        set to true if some of the keys that will be read from accessor may
+   *        be null.
+   * @param functor A pointer to a functor which should provide an operator that
+   *        takes 1 argument: const ValueAccessor&. The operator will be called
+   *        only once for a key from accessor if there is no match.
+   */
+  template <typename FunctorT>
+  void runOverKeysFromValueAccessorIfMatchNotFoundCompositeKey(
+      ValueAccessor *accessor,
+      const std::vector<attribute_id> &key_attr_ids,
+      const bool check_for_null_keys,
+      FunctorT *functor) const {
+    return runOverKeysFromValueAccessorCompositeKey<false>(accessor,
+                                                           key_attr_ids,
+                                                           check_for_null_keys,
+                                                           functor);
+  }
+
+  /**
    * @brief Apply a functor to each key, value pair in this hash table.
    *
    * @warning This method assumes that no concurrent calls to put(),
@@ -965,6 +1155,10 @@ class HashTable : public HashTableBase<resizable,
                                            const ValueT **value,
                                            std::size_t *entry_num) const = 0;
 
+  // Return true if key exists in the hash table.
+  virtual bool hasKey(const TypedValue &key) const = 0;
+  virtual bool hasCompositeKey(const std::vector<TypedValue> &key) const = 0;
+
   // For a resizable HashTable, grow to accomodate more entries. If
   // 'extra_buckets' is not zero, it may serve as a "hint" to implementations
   // that at least the requested number of extra buckets are required when
@@ -1048,6 +1242,21 @@ class HashTable : public HashTableBase<resizable,
     return false;
   }
 
+  // If run_if_match_found is true, apply the functor to each key if a match is
+  // found; otherwise, apply the functor if no match is found.
+  template <bool run_if_match_found, typename FunctorT>
+  void runOverKeysFromValueAccessor(ValueAccessor *accessor,
+                                    const attribute_id key_attr_id,
+                                    const bool check_for_null_keys,
+                                    FunctorT *functor) const;
+
+  template <bool run_if_match_found, typename FunctorT>
+  void runOverKeysFromValueAccessorCompositeKey(
+      ValueAccessor *accessor,
+      const std::vector<attribute_id> &key_attr_ids,
+      const bool check_for_null_keys,
+      FunctorT *functor) const;
+
   // Method containing the actual logic implementing getAllFromValueAccessor().
   // Has extra template parameters that control behavior to avoid some
   // inner-loop branching.
@@ -1678,6 +1887,184 @@ template <typename ValueT,
           bool force_key_copy,
           bool allow_duplicate_keys>
 template <typename FunctorT>
+void HashTable<ValueT,
+               resizable,
+               serializable,
+               force_key_copy,
+               allow_duplicate_keys>::
+    getAllFromValueAccessorWithExtraWorkForFirstMatch(
+        ValueAccessor *accessor,
+        const attribute_id key_attr_id,
+        const bool check_for_null_keys,
+        FunctorT *functor) const {
+  InvokeOnAnyValueAccessor(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    while (accessor->next()) {
+      TypedValue key = accessor->getTypedValue(key_attr_id);
+      if (check_for_null_keys && key.isNull()) {
+        continue;
+      }
+      const std::size_t hash_code = adjust_hashes_ ? AdjustHash(key.getHash())
+                                                   : key.getHash();
+      std::size_t entry_num = 0;
+      const ValueT *value;
+      if (getNextEntryForKey(key, hash_code, &value, &entry_num)) {
+        functor->recordMatch(*accessor);
+        (*functor)(*accessor, *value);
+        if (!allow_duplicate_keys) {
+           continue;
+        }
+        while (getNextEntryForKey(key, hash_code, &value, &entry_num)) {
+          (*functor)(*accessor, *value);
+        }
+      }
+    }
+  });  // NOLINT(whitespace/parens)
+}
+
+template <typename ValueT,
+          bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+template <typename FunctorT>
+void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
+    ::getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch(
+        ValueAccessor *accessor,
+        const std::vector<attribute_id> &key_attr_ids,
+        const bool check_for_null_keys,
+        FunctorT *functor) const {
+  DEBUG_ASSERT(key_types_.size() == key_attr_ids.size());
+  std::vector<TypedValue> key_vector;
+  key_vector.resize(key_attr_ids.size());
+  InvokeOnAnyValueAccessor(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    while (accessor->next()) {
+      bool null_key = false;
+      for (std::vector<attribute_id>::size_type key_idx = 0;
+           key_idx < key_types_.size();
+           ++key_idx) {
+        key_vector[key_idx] = accessor->getTypedValue(key_attr_ids[key_idx]);
+        if (check_for_null_keys && key_vector[key_idx].isNull()) {
+          null_key = true;
+          break;
+        }
+      }
+      if (null_key) {
+        continue;
+      }
+
+      const std::size_t hash_code = adjust_hashes_ ? AdjustHash(hashCompositeKey(key_vector))
+                                                   : hashCompositeKey(key_vector);
+      std::size_t entry_num = 0;
+      const ValueT *value;
+      if (getNextEntryForCompositeKey(key_vector, hash_code, &value, &entry_num)) {
+        functor->recordMatch(*accessor);
+        (*functor)(*accessor, *value);
+        if (!allow_duplicate_keys) {
+          continue;
+        }
+        while (getNextEntryForCompositeKey(key_vector, hash_code, &value, &entry_num)) {
+          (*functor)(*accessor, *value);
+        }
+      }
+    }
+  });  // NOLINT(whitespace/parens)
+}
+
+template <typename ValueT,
+          bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+template <bool run_if_match_found, typename FunctorT>
+void HashTable<ValueT,
+               resizable,
+               serializable,
+               force_key_copy,
+               allow_duplicate_keys>::
+    runOverKeysFromValueAccessor(ValueAccessor *accessor,
+                                 const attribute_id key_attr_id,
+                                 const bool check_for_null_keys,
+                                 FunctorT *functor) const {
+  InvokeOnAnyValueAccessor(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    while (accessor->next()) {
+      TypedValue key = accessor->getTypedValue(key_attr_id);
+      if (check_for_null_keys && key.isNull()) {
+        if (!run_if_match_found) {
+          (*functor)(*accessor);
+          continue;
+        }
+      }
+      if (run_if_match_found) {
+        if (this->hasKey(key)) {
+          (*functor)(*accessor);
+        }
+      } else {
+        if (!this->hasKey(key)) {
+          (*functor)(*accessor);
+        }
+      }
+    }
+  });  // NOLINT(whitespace/parens)
+}
+
+template <typename ValueT,
+          bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+template <bool run_if_match_found, typename FunctorT>
+void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
+    ::runOverKeysFromValueAccessorCompositeKey(ValueAccessor *accessor,
+                                               const std::vector<attribute_id> &key_attr_ids,
+                                               const bool check_for_null_keys,
+                                               FunctorT *functor) const {
+  DEBUG_ASSERT(key_types_.size() == key_attr_ids.size());
+  std::vector<TypedValue> key_vector;
+  key_vector.resize(key_attr_ids.size());
+  InvokeOnAnyValueAccessor(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    while (accessor->next()) {
+      bool null_key = false;
+      for (std::vector<attribute_id>::size_type key_idx = 0;
+           key_idx < key_types_.size();
+           ++key_idx) {
+        key_vector[key_idx] = accessor->getTypedValue(key_attr_ids[key_idx]);
+        if (check_for_null_keys && key_vector[key_idx].isNull()) {
+          null_key = true;
+          break;
+        }
+      }
+      if (null_key) {
+        if (!run_if_match_found) {
+          (*functor)(*accessor);
+          continue;
+        }
+      }
+
+      if (run_if_match_found) {
+        if (this->hasCompositeKey(key_vector)) {
+          (*functor)(*accessor);
+        }
+      } else if (!this->hasCompositeKey(key_vector)) {
+        (*functor)(*accessor);
+      }
+    }
+  });  // NOLINT(whitespace/parens)
+}
+
+template <typename ValueT,
+          bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+template <typename FunctorT>
 std::size_t HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
     ::forEach(FunctorT *functor) const {
   std::size_t entries_visited = 0;