You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/01/29 17:58:45 UTC

[53/54] [abbrv] incubator-quickstep git commit: Added tests for Partitioned Hash Join.

Added tests for Partitioned Hash Join.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3210500b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3210500b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3210500b

Branch: refs/heads/exact-stat-unittest
Commit: 3210500b87a90a9ed302a0bafbb1dd1d1a6081ed
Parents: 66178d7
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Jan 23 17:19:13 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sat Jan 28 18:14:42 2017 -0800

----------------------------------------------------------------------
 relational_operators/CMakeLists.txt             |   2 +
 .../tests/HashJoinOperator_unittest.cpp         | 693 +++++++++++++++++--
 2 files changed, 645 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3210500b/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index c2db4ec..c1caaa3 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -606,6 +606,8 @@ target_link_libraries(HashJoinOperator_unittest
                       quickstep_catalog_CatalogDatabase
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_catalog_PartitionScheme
+                      quickstep_catalog_PartitionSchemeHeader
                       quickstep_expressions_Expressions_proto
                       quickstep_expressions_predicate_ComparisonPredicate
                       quickstep_expressions_predicate_Predicate

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3210500b/relational_operators/tests/HashJoinOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/HashJoinOperator_unittest.cpp b/relational_operators/tests/HashJoinOperator_unittest.cpp
index 60f05ea..03350d4 100644
--- a/relational_operators/tests/HashJoinOperator_unittest.cpp
+++ b/relational_operators/tests/HashJoinOperator_unittest.cpp
@@ -34,6 +34,8 @@
 #include "catalog/CatalogDatabase.hpp"
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
+#include "catalog/PartitionSchemeHeader.hpp"
 #include "expressions/Expressions.pb.h"
 #include "expressions/predicate/ComparisonPredicate.hpp"
 #include "expressions/predicate/Predicate.hpp"
@@ -83,6 +85,8 @@
 using std::snprintf;
 #endif
 
+using std::make_unique;
+using std::size_t;
 using std::unique_ptr;
 
 namespace quickstep {
@@ -98,6 +102,7 @@ constexpr std::size_t kQueryId = 0;
 constexpr int kOpIndex = 0;
 
 constexpr std::size_t kSinglePartition = 1;
+constexpr std::size_t kMultiplePartitions = 4;
 
 }  // namespace
 
@@ -144,56 +149,6 @@ class HashJoinOperatorTest : public ::testing::TestWithParam<HashTableImplType>
     fact_table_->addAttribute(new CatalogAttribute(fact_table_, "int", int_type));
     fact_table_->addAttribute(new CatalogAttribute(fact_table_, "char", char_type));
     fact_table_->addAttribute(new CatalogAttribute(fact_table_, "varchar", varchar_type));
-
-    // Create StorageLayout
-    std::unique_ptr<StorageBlockLayout> dim_layout(createStorageLayout(*dim_table_));
-    std::unique_ptr<StorageBlockLayout> fact_layout(createStorageLayout(*fact_table_));
-
-    // Insert tuples to dim table.
-    std::unique_ptr<Tuple> tuple;
-    MutableBlockReference storage_block;
-    for (tuple_id i = 0; i < kNumDimTuples; i += kBlockSize) {
-      // Create block.
-      block_id block_id = storage_manager_->createBlock(*dim_table_, *dim_layout);
-      storage_block = storage_manager_->getBlockMutable(block_id, *dim_table_);
-      dim_table_->addBlock(block_id);
-
-      // Insert tuples.
-      tuple_id block_bound = i + kBlockSize < kNumDimTuples ? i + kBlockSize : kNumDimTuples;
-      for (tuple_id tid = i; tid < block_bound; ++tid) {
-        // First attribute (long): a sequence id.
-        // Second attribute (int): a looped value to test duplicate keys.
-        // Third attribute (char): an identical value to test Cartesian product.
-        // Forth attribute (varchar): a value to test duplicate variable-length keys.
-        tuple.reset(createTuple(*dim_table_, tid, tid % kBlockSize, 100, tid / 2 * 2));
-        EXPECT_TRUE(storage_block->insertTupleInBatch(*tuple));
-      }
-      storage_block->rebuild();
-    }
-
-    // Insert tuples to fact table.
-    for (tuple_id i = 0; i < kNumFactTuples; i += kBlockSize) {
-      // Create block
-      block_id block_id = storage_manager_->createBlock(*fact_table_, *fact_layout);
-      storage_block = storage_manager_->getBlockMutable(block_id, *fact_table_);
-      fact_table_->addBlock(block_id);
-
-      // Insert tuples
-      tuple_id block_bound = i + kBlockSize < kNumFactTuples ? i + kBlockSize : kNumFactTuples;
-      for (tuple_id tid = i; tid < block_bound; ++tid) {
-        // First attribute (long): a sequence id to join with dim_table.long. Each tuple has
-        //                         exact one match.
-        // Second attribute (int): a sequence id to join with dim_table.int. Each tuple in the
-        //                         first kBlockSize tuples has mutiple matches. Other tuples
-        //                         have no match.
-        // Third attribute (char): an identical value to test Cartesian product.
-        // Forth attribute (varchar): a sequence id to join with dim_table.var_char. Each tuple
-        //                            has two matches.
-        tuple.reset(createTuple(*fact_table_, tid, tid, 100, tid));
-        EXPECT_TRUE(storage_block->insertTupleInBatch(*tuple));
-      }
-      storage_block->rebuild();
-    }
   }
 
   virtual void TearDown() {
@@ -259,6 +214,127 @@ class HashJoinOperatorTest : public ::testing::TestWithParam<HashTableImplType>
     return new Tuple(std::move(attr_values));
   }
 
+  void insertTuplesWithoutPartitions() {
+    // Create StorageLayout
+    std::unique_ptr<StorageBlockLayout> dim_layout(createStorageLayout(*dim_table_));
+    std::unique_ptr<StorageBlockLayout> fact_layout(createStorageLayout(*fact_table_));
+
+    // Insert tuples to dim table.
+    std::unique_ptr<Tuple> tuple;
+    MutableBlockReference storage_block;
+    for (tuple_id i = 0; i < kNumDimTuples; i += kBlockSize) {
+      // Create block.
+      block_id block_id = storage_manager_->createBlock(*dim_table_, *dim_layout);
+      storage_block = storage_manager_->getBlockMutable(block_id, *dim_table_);
+      dim_table_->addBlock(block_id);
+
+      // Insert tuples.
+      tuple_id block_bound = i + kBlockSize < kNumDimTuples ? i + kBlockSize : kNumDimTuples;
+      for (tuple_id tid = i; tid < block_bound; ++tid) {
+        // First attribute (long): a sequence id.
+        // Second attribute (int): a looped value to test duplicate keys.
+        // Third attribute (char): an identical value to test Cartesian product.
+        // Forth attribute (varchar): a value to test duplicate variable-length keys.
+        tuple.reset(createTuple(*dim_table_, tid, tid % kBlockSize, 100, tid / 2 * 2));
+        EXPECT_TRUE(storage_block->insertTupleInBatch(*tuple));
+      }
+      storage_block->rebuild();
+    }
+
+    // Insert tuples to fact table.
+    for (tuple_id i = 0; i < kNumFactTuples; i += kBlockSize) {
+      // Create block
+      block_id block_id = storage_manager_->createBlock(*fact_table_, *fact_layout);
+      storage_block = storage_manager_->getBlockMutable(block_id, *fact_table_);
+      fact_table_->addBlock(block_id);
+
+      // Insert tuples
+      tuple_id block_bound = i + kBlockSize < kNumFactTuples ? i + kBlockSize : kNumFactTuples;
+      for (tuple_id tid = i; tid < block_bound; ++tid) {
+        // First attribute (long): a sequence id to join with dim_table.long. Each tuple has
+        //                         exact one match.
+        // Second attribute (int): a sequence id to join with dim_table.int. Each tuple in the
+        //                         first kBlockSize tuples has mutiple matches. Other tuples
+        //                         have no match.
+        // Third attribute (char): an identical value to test Cartesian product.
+        // Forth attribute (varchar): a sequence id to join with dim_table.var_char. Each tuple
+        //                            has two matches.
+        tuple.reset(createTuple(*fact_table_, tid, tid, 100, tid));
+        EXPECT_TRUE(storage_block->insertTupleInBatch(*tuple));
+      }
+      storage_block->rebuild();
+    }
+  }
+
+  void insertTuplesWithSingleAttributePartitions() {
+    // Set PartitionScheme.
+    dim_part_scheme_ = new PartitionScheme(
+        new HashPartitionSchemeHeader(kMultiplePartitions, dim_table_->getAttributeByName("long")->getID()));
+    dim_table_->setPartitionScheme(dim_part_scheme_);
+
+    fact_part_scheme_ = new PartitionScheme(
+        new HashPartitionSchemeHeader(kMultiplePartitions, fact_table_->getAttributeByName("long")->getID()));
+    fact_table_->setPartitionScheme(fact_part_scheme_);
+
+    // Create StorageLayout
+    std::unique_ptr<StorageBlockLayout> dim_layout(createStorageLayout(*dim_table_));
+    std::unique_ptr<StorageBlockLayout> fact_layout(createStorageLayout(*fact_table_));
+
+    // Create blocks per partition. The index is the partition id.
+    std::vector<MutableBlockReference> dim_partitioned_blocks;
+    for (partition_id part_id = 0; part_id < kMultiplePartitions; ++part_id) {
+      const block_id block = storage_manager_->createBlock(*dim_table_, *dim_layout);
+      dim_part_scheme_->addBlockToPartition(block, part_id);
+      // For a simpler teardown.
+      dim_table_->addBlock(block);
+
+      dim_partitioned_blocks.push_back(storage_manager_->getBlockMutable(block, *dim_table_));
+    }
+
+    // Insert tuples to dim table.
+    for (tuple_id tid = 0; tid < kNumDimTuples; ++tid) {
+      // First attribute (long): a sequence id.
+      // Second attribute (int): a looped value to test duplicate keys.
+      // Third attribute (char): an identical value to test Cartesian product.
+      // Forth attribute (varchar): a value to test duplicate variable-length keys.
+      unique_ptr<Tuple> tuple(createTuple(*dim_table_, tid, tid % kBlockSize, 100, tid / 2 * 2));
+      EXPECT_TRUE(dim_partitioned_blocks[tid % kMultiplePartitions]->insertTupleInBatch(*tuple));
+    }
+
+    for (size_t i = 0; i < dim_partitioned_blocks.size(); ++i) {
+      dim_partitioned_blocks[i]->rebuild();
+    }
+
+    // Create blocks per partition. The index is the partition id.
+    std::vector<MutableBlockReference> fact_partitioned_blocks;
+    for (partition_id part_id = 0; part_id < kMultiplePartitions; ++part_id) {
+      const block_id block = storage_manager_->createBlock(*fact_table_, *fact_layout);
+      fact_part_scheme_->addBlockToPartition(block, part_id);
+      // For a simpler teardown.
+      fact_table_->addBlock(block);
+
+      fact_partitioned_blocks.push_back(storage_manager_->getBlockMutable(block, *fact_table_));
+    }
+
+    // Insert tuples to fact table.
+    for (tuple_id tid = 0; tid < kNumFactTuples; ++tid) {
+      // First attribute (long): a sequence id to join with dim_table.long. Each tuple has
+      //                         exact one match.
+      // Second attribute (int): a sequence id to join with dim_table.int. Each tuple in the
+      //                         first kBlockSize tuples has mutiple matches. Other tuples
+      //                         have no match.
+      // Third attribute (char): an identical value to test Cartesian product.
+      // Forth attribute (varchar): a sequence id to join with dim_table.var_char. Each tuple
+      //                            has two matches.
+      unique_ptr<Tuple> tuple(createTuple(*fact_table_, tid, tid, 100, tid));
+      EXPECT_TRUE(fact_partitioned_blocks[tid % kMultiplePartitions]->insertTupleInBatch(*tuple));
+    }
+
+    for (size_t i = 0; i < fact_partitioned_blocks.size(); ++i) {
+      fact_partitioned_blocks[i]->rebuild();
+    }
+  }
+
   void fetchAndExecuteWorkOrders(RelationalOperator *op) {
     // Note: We treat each operator as an individual query plan DAG. The
     // index for each operator should be set, so that the WorkOrdersContainer
@@ -293,9 +369,13 @@ class HashJoinOperatorTest : public ::testing::TestWithParam<HashTableImplType>
   unique_ptr<CatalogDatabase> db_;
   // The following CatalogRelations are owned by db_.
   CatalogRelation *dim_table_, *fact_table_;
+  // The following PartitionSchemes are owned by its own CatalogRelation, respectively.
+  PartitionScheme *dim_part_scheme_ = nullptr, *fact_part_scheme_ = nullptr;
 };
 
 TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
+  insertTuplesWithoutPartitions();
+
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
   query_context_proto.set_query_id(kQueryId);
@@ -439,6 +519,8 @@ TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
 }
 
 TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
+  insertTuplesWithoutPartitions();
+
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
   query_context_proto.set_query_id(kQueryId);
@@ -612,6 +694,8 @@ TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
 }
 
 TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
+  insertTuplesWithoutPartitions();
+
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
   query_context_proto.set_query_id(kQueryId);
@@ -750,6 +834,8 @@ TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
 }
 
 TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
+  insertTuplesWithoutPartitions();
+
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
   query_context_proto.set_query_id(kQueryId);
@@ -920,6 +1006,8 @@ TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
 }
 
 TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
+  insertTuplesWithoutPartitions();
+
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
   query_context_proto.set_query_id(kQueryId);
@@ -1100,6 +1188,8 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
 
 // Same as above test, but add an additional residual filter predicate.
 TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) {
+  insertTuplesWithoutPartitions();
+
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
   query_context_proto.set_query_id(kQueryId);
@@ -1288,6 +1378,509 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) {
   db_->dropRelationById(output_relation_id);
 }
 
+// Hash join tests with single attribute partitions.
+TEST_P(HashJoinOperatorTest, SinlgeAttributePartitionedLongKeyHashJoinTest) {
+  insertTuplesWithSingleAttributePartitions();
+
+  // Setup the hash table proto in the query context proto.
+  serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(kQueryId);
+
+  const QueryContext::join_hash_table_id join_hash_table_index =
+      query_context_proto.join_hash_tables_size();
+
+  serialization::QueryContext::HashTableContext *hash_table_context_proto =
+      query_context_proto.add_join_hash_tables();
+  hash_table_context_proto->set_num_partitions(kMultiplePartitions);
+
+  serialization::HashTable *hash_table_proto =
+      hash_table_context_proto->mutable_join_hash_table();
+  switch (GetParam()) {
+    case HashTableImplType::kLinearOpenAddressing:
+      hash_table_proto->set_hash_table_impl_type(
+          serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING);
+      break;
+    case HashTableImplType::kSeparateChaining:
+      hash_table_proto->set_hash_table_impl_type(
+          serialization::HashTableImplType::SEPARATE_CHAINING);
+      break;
+    case HashTableImplType::kSimpleScalarSeparateChaining:
+      if (TypedValue::HashIsReversible(kLong)) {
+        hash_table_proto->set_hash_table_impl_type(
+            serialization::HashTableImplType::SIMPLE_SCALAR_SEPARATE_CHAINING);
+        break;
+      } else {
+        // Can't use SimpleScalarSeparateChainingHashTable for long keys on
+        // this platform.
+        return;
+      }
+    default:
+      FATAL_ERROR("Unknown HashTable type requested for join.");
+  }
+
+  const Type &long_type = LongType::InstanceNonNullable();
+
+  hash_table_proto->add_key_types()->MergeFrom(long_type.getProto());
+  hash_table_proto->set_estimated_num_entries(kNumDimTuples);
+
+  const CatalogAttribute &dim_col_long = *dim_table_->getAttributeByName("long");
+  const CatalogAttribute &fact_col_long = *fact_table_->getAttributeByName("long");
+
+  // Create the builder operator.
+  unique_ptr<BuildHashOperator> builder(new BuildHashOperator(
+      kQueryId,
+      *dim_table_,
+      true /* is_stored */,
+      { dim_col_long.getID() },
+      dim_col_long.getType().isNullable(),
+      kMultiplePartitions,
+      join_hash_table_index));
+
+  // Create the prober operator with one selection attribute.
+  const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size();
+  ScalarAttribute scalar_attr(dim_col_long);
+  query_context_proto.add_scalar_groups()->add_scalars()->MergeFrom(scalar_attr.getProto());
+
+  // Create result_table, owned by db_.
+  CatalogRelation *result_table = new CatalogRelation(NULL, "result_table", 102);
+  result_table->addAttribute(new CatalogAttribute(result_table, "long", long_type));
+
+  const relation_id output_relation_id = db_->addRelation(result_table);
+
+  // Setup the InsertDestination proto in the query context proto.
+  const QueryContext::insert_destination_id output_destination_index =
+      query_context_proto.insert_destinations_size();
+  serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+
+  insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
+  insert_destination_proto->set_relation_id(output_relation_id);
+  insert_destination_proto->set_relational_op_index(kOpIndex);
+
+  unique_ptr<HashJoinOperator> prober(new HashJoinOperator(
+      kQueryId,
+      *dim_table_,
+      *fact_table_,
+      true /* is_stored */,
+      { fact_col_long.getID() },
+      fact_col_long.getType().isNullable(),
+      kMultiplePartitions,
+      *result_table,
+      output_destination_index,
+      join_hash_table_index,
+      QueryContext::kInvalidPredicateId /* residual_predicate_index */,
+      selection_index));
+
+  // Set up the QueryContext.
+  query_context_ =
+      make_unique<QueryContext>(query_context_proto, *db_, storage_manager_.get(), foreman_client_id_, &bus_);
+
+  // Execute the operators.
+  fetchAndExecuteWorkOrders(builder.get());
+
+  prober->informAllBlockingDependenciesMet();
+  fetchAndExecuteWorkOrders(prober.get());
+
+  // Check result values
+  // Note that the results might be in a different order.
+  std::size_t num_result_tuples = 0;
+  std::unique_ptr<std::size_t[]> counts(new std::size_t[kNumDimTuples]);
+  std::memset(counts.get(), 0, sizeof(std::size_t) * kNumDimTuples);
+
+  DCHECK(query_context_);
+  InsertDestination *insert_destination = query_context_->getInsertDestination(prober->getInsertDestinationID());
+  DCHECK(insert_destination);
+
+  const std::vector<block_id> &result_blocks = insert_destination->getTouchedBlocks();
+  for (std::size_t bid = 0; bid < result_blocks.size(); ++bid) {
+    BlockReference result_block = storage_manager_->getBlock(result_blocks[bid],
+                                                             insert_destination->getRelation());
+    const TupleStorageSubBlock &result_tuple_sub_block = result_block->getTupleStorageSubBlock();
+    num_result_tuples += result_tuple_sub_block.numTuples();
+    for (tuple_id i = 0; i <= result_tuple_sub_block.getMaxTupleID(); ++i) {
+      if (result_tuple_sub_block.hasTupleWithID(i)) {
+        TypedValue typed_value = result_tuple_sub_block.getAttributeValueTyped(
+            i, result_table->getAttributeByName("long")->getID());
+        std::int64_t value = typed_value.getLiteral<std::int64_t>();
+        ASSERT_GE(value, 0);
+        ASSERT_LT(value, static_cast<std::int64_t>(kNumDimTuples));
+        ++counts[value];
+      }
+    }
+
+    // Drop the block.
+    result_block.release();
+    storage_manager_->deleteBlockOrBlobFile(result_blocks[bid]);
+  }
+  EXPECT_EQ(static_cast<std::size_t>(kNumDimTuples), num_result_tuples);
+
+  for (tuple_id i = 0; i < kNumDimTuples; ++i) {
+    EXPECT_EQ(1u, counts[i]);
+  }
+
+  // Create cleaner operator.
+  auto cleaner = make_unique<DestroyHashOperator>(kQueryId, kMultiplePartitions, join_hash_table_index);
+  cleaner->informAllBlockingDependenciesMet();
+  fetchAndExecuteWorkOrders(cleaner.get());
+
+  db_->dropRelationById(output_relation_id);
+}
+
+TEST_P(HashJoinOperatorTest, SinlgeAttributePartitionedCompositeKeyHashJoinTest) {
+  insertTuplesWithSingleAttributePartitions();
+
+  // Setup the hash table proto in the query context proto.
+  serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(kQueryId);
+
+  const QueryContext::join_hash_table_id join_hash_table_index =
+      query_context_proto.join_hash_tables_size();
+
+  serialization::QueryContext::HashTableContext *hash_table_context_proto =
+      query_context_proto.add_join_hash_tables();
+  hash_table_context_proto->set_num_partitions(kMultiplePartitions);
+
+  serialization::HashTable *hash_table_proto =
+      hash_table_context_proto->mutable_join_hash_table();
+  switch (GetParam()) {
+    case HashTableImplType::kLinearOpenAddressing:
+      hash_table_proto->set_hash_table_impl_type(
+          serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING);
+      break;
+    case HashTableImplType::kSeparateChaining:
+      hash_table_proto->set_hash_table_impl_type(
+          serialization::HashTableImplType::SEPARATE_CHAINING);
+      break;
+    case HashTableImplType::kSimpleScalarSeparateChaining:
+      // Can't use SimpleScalarSeparateChainingHashTable with composite keys.
+      return;
+    default:
+      FATAL_ERROR("Unknown HashTable type requested for join.");
+  }
+
+  const Type &long_type = LongType::InstanceNonNullable();
+  const Type &varchar_type = VarCharType::InstanceNonNullable(kCharLength);
+
+  hash_table_proto->add_key_types()->MergeFrom(long_type.getProto());
+  hash_table_proto->add_key_types()->MergeFrom(varchar_type.getProto());
+  hash_table_proto->set_estimated_num_entries(kNumDimTuples);
+
+  const CatalogAttribute &dim_col_long = *dim_table_->getAttributeByName("long");
+  const CatalogAttribute &dim_col_varchar = *dim_table_->getAttributeByName("varchar");
+  const CatalogAttribute &fact_col_long = *fact_table_->getAttributeByName("long");
+  const CatalogAttribute &fact_col_varchar = *fact_table_->getAttributeByName("varchar");
+
+  // Create the builder operator.
+  unique_ptr<BuildHashOperator> builder(new BuildHashOperator(
+      kQueryId,
+      *dim_table_,
+      true /* is_stored */,
+      { dim_col_long.getID(), dim_col_varchar.getID() },
+      dim_col_long.getType().isNullable() || dim_col_varchar.getType().isNullable(),
+      kMultiplePartitions,
+      join_hash_table_index));
+
+  // Create the prober operator with two selection attributes.
+  const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size();
+  serialization::QueryContext::ScalarGroup *scalar_group_proto = query_context_proto.add_scalar_groups();
+
+  ScalarAttribute scalar_attr_dim(dim_col_long);
+  scalar_group_proto->add_scalars()->MergeFrom(scalar_attr_dim.getProto());
+  ScalarAttribute scalar_attr_fact(fact_col_long);
+  scalar_group_proto->add_scalars()->MergeFrom(scalar_attr_fact.getProto());
+
+  // Create result_table, owned by db_.
+  CatalogRelation *result_table = new CatalogRelation(NULL, "result_table", 102);
+  result_table->addAttribute(new CatalogAttribute(result_table, "dim_long", long_type));
+  result_table->addAttribute(new CatalogAttribute(result_table, "fact_long", long_type));
+
+  const relation_id output_relation_id = db_->addRelation(result_table);
+
+  // Setup the InsertDestination proto in the query context proto.
+  const QueryContext::insert_destination_id output_destination_index =
+      query_context_proto.insert_destinations_size();
+  serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+
+  insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
+  insert_destination_proto->set_relation_id(output_relation_id);
+  insert_destination_proto->set_relational_op_index(kOpIndex);
+
+  unique_ptr<HashJoinOperator> prober(new HashJoinOperator(
+      kQueryId,
+      *dim_table_,
+      *fact_table_,
+      true /* is_stored */,
+      { fact_col_long.getID(), fact_col_varchar.getID() },
+      fact_col_long.getType().isNullable() || fact_col_varchar.getType().isNullable(),
+      kMultiplePartitions,
+      *result_table,
+      output_destination_index,
+      join_hash_table_index,
+      QueryContext::kInvalidPredicateId /* residual_predicate_index */,
+      selection_index));
+
+  // Set up the QueryContext.
+  query_context_ =
+     make_unique<QueryContext>(query_context_proto, *db_, storage_manager_.get(), foreman_client_id_, &bus_);
+
+  // Execute the operators.
+  fetchAndExecuteWorkOrders(builder.get());
+
+  prober->informAllBlockingDependenciesMet();
+  fetchAndExecuteWorkOrders(prober.get());
+
+  // Check result values
+  // Note that the results might be in a different order.
+  std::size_t num_result_tuples = 0;
+
+  std::unique_ptr<std::size_t[]> dim_counts(new std::size_t[kNumDimTuples]);
+  std::memset(dim_counts.get(), 0, sizeof(std::size_t) * kNumDimTuples);
+
+  std::unique_ptr<std::size_t[]> fact_counts(new std::size_t[kNumFactTuples]);
+  std::memset(fact_counts.get(), 0, sizeof(std::size_t) * kNumFactTuples);
+
+  InsertDestination *insert_destination = query_context_->getInsertDestination(prober->getInsertDestinationID());
+  DCHECK(insert_destination);
+
+  const std::vector<block_id> &result_blocks = insert_destination->getTouchedBlocks();
+  for (std::size_t bid = 0; bid < result_blocks.size(); ++bid) {
+    BlockReference result_block = storage_manager_->getBlock(result_blocks[bid],
+                                                             insert_destination->getRelation());
+    const TupleStorageSubBlock &result_tuple_sub_block = result_block->getTupleStorageSubBlock();
+    num_result_tuples += result_tuple_sub_block.numTuples();
+    for (tuple_id i = 0; i <= result_tuple_sub_block.getMaxTupleID(); ++i) {
+      if (result_tuple_sub_block.hasTupleWithID(i)) {
+        TypedValue typed_value = result_tuple_sub_block.getAttributeValueTyped(
+            i, result_table->getAttributeByName("dim_long")->getID());
+        std::int64_t value = typed_value.getLiteral<std::int64_t>();
+        ASSERT_GE(value, 0);
+        ASSERT_LT(value, static_cast<std::int64_t>(kNumDimTuples));
+        ++dim_counts[value];
+
+        typed_value = result_tuple_sub_block.getAttributeValueTyped(
+            i, result_table->getAttributeByName("fact_long")->getID());
+        value = typed_value.getLiteral<std::int64_t>();
+        ASSERT_GE(value, 0);
+        ASSERT_LT(value, static_cast<std::int64_t>(kNumFactTuples));
+        ++fact_counts[value];
+      }
+    }
+
+    // Drop the block.
+    result_block.release();
+    storage_manager_->deleteBlockOrBlobFile(result_blocks[bid]);
+  }
+  EXPECT_EQ(static_cast<std::size_t>(kNumDimTuples) / 2, num_result_tuples);
+
+  for (tuple_id i = 0; i < kNumDimTuples; ++i) {
+    if (i & 0x1) {
+      EXPECT_EQ(0u, dim_counts[i]);
+    } else {
+      EXPECT_EQ(1u, dim_counts[i]);
+    }
+  }
+  for (tuple_id i = 0; i < kNumFactTuples; ++i) {
+    if (i >= kNumDimTuples) {
+      EXPECT_EQ(0u, fact_counts[i]);
+    } else {
+      if (i & 0x1) {
+        EXPECT_EQ(0u, fact_counts[i]);
+      } else {
+        EXPECT_EQ(1u, fact_counts[i]);
+      }
+    }
+  }
+
+  // Create cleaner operator.
+  auto cleaner = make_unique<DestroyHashOperator>(kQueryId, kMultiplePartitions, join_hash_table_index);
+  cleaner->informAllBlockingDependenciesMet();
+  fetchAndExecuteWorkOrders(cleaner.get());
+
+  db_->dropRelationById(output_relation_id);
+}
+
+TEST_P(HashJoinOperatorTest, SinlgeAttributePartitionedCompositeKeyHashJoinWithResidualPredicateTest) {
+  insertTuplesWithSingleAttributePartitions();
+
+  // Setup the hash table proto in the query context proto.
+  serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(kQueryId);
+
+  const QueryContext::join_hash_table_id join_hash_table_index =
+      query_context_proto.join_hash_tables_size();
+
+  serialization::QueryContext::HashTableContext *hash_table_context_proto =
+      query_context_proto.add_join_hash_tables();
+  hash_table_context_proto->set_num_partitions(kMultiplePartitions);
+
+  serialization::HashTable *hash_table_proto =
+      hash_table_context_proto->mutable_join_hash_table();
+  switch (GetParam()) {
+    case HashTableImplType::kLinearOpenAddressing:
+      hash_table_proto->set_hash_table_impl_type(
+          serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING);
+      break;
+    case HashTableImplType::kSeparateChaining:
+      hash_table_proto->set_hash_table_impl_type(
+          serialization::HashTableImplType::SEPARATE_CHAINING);
+      break;
+    case HashTableImplType::kSimpleScalarSeparateChaining:
+      // Can't use SimpleScalarSeparateChainingHashTable with composite keys.
+      return;
+    default:
+      FATAL_ERROR("Unknown HashTable type requested for join.");
+  }
+
+  const Type &long_type = LongType::InstanceNonNullable();
+  const Type &varchar_type = VarCharType::InstanceNonNullable(kCharLength);
+
+  hash_table_proto->add_key_types()->MergeFrom(long_type.getProto());
+  hash_table_proto->add_key_types()->MergeFrom(varchar_type.getProto());
+  hash_table_proto->set_estimated_num_entries(kNumDimTuples);
+
+  const CatalogAttribute &dim_col_long = *dim_table_->getAttributeByName("long");
+  const CatalogAttribute &dim_col_varchar = *dim_table_->getAttributeByName("varchar");
+  const CatalogAttribute &fact_col_long = *fact_table_->getAttributeByName("long");
+  const CatalogAttribute &fact_col_varchar = *fact_table_->getAttributeByName("varchar");
+
+  // Create the builder operator.
+  unique_ptr<BuildHashOperator> builder(new BuildHashOperator(
+      kQueryId,
+      *dim_table_,
+      true /* is_stored */,
+      { dim_col_long.getID(), dim_col_varchar.getID() },
+      dim_col_long.getType().isNullable() || dim_col_varchar.getType().isNullable(),
+      kMultiplePartitions,
+      join_hash_table_index));
+
+  // Create prober operator with two selection attributes.
+  const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size();
+  serialization::QueryContext::ScalarGroup *scalar_group_proto = query_context_proto.add_scalar_groups();
+
+  ScalarAttribute scalar_attr_dim(dim_col_long);
+  scalar_group_proto->add_scalars()->MergeFrom(scalar_attr_dim.getProto());
+  ScalarAttribute scalar_attr_fact(fact_col_long);
+  scalar_group_proto->add_scalars()->MergeFrom(scalar_attr_fact.getProto());
+
+  // Create result_table, owned by db_.
+  CatalogRelation *result_table = new CatalogRelation(NULL, "result_table", 102);
+  result_table->addAttribute(new CatalogAttribute(result_table, "dim_long", long_type));
+  result_table->addAttribute(new CatalogAttribute(result_table, "fact_long", long_type));
+
+  const relation_id output_relation_id = db_->addRelation(result_table);
+
+  // Setup the InsertDestination proto in the query context proto.
+  const QueryContext::insert_destination_id output_destination_index =
+      query_context_proto.insert_destinations_size();
+  serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+
+  insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
+  insert_destination_proto->set_relation_id(output_relation_id);
+  insert_destination_proto->set_relational_op_index(kOpIndex);
+
+  // Include a residual predicate that selects a subset of the joined tuples.
+  unique_ptr<Predicate> residual_pred(new ComparisonPredicate(
+      ComparisonFactory::GetComparison(
+          ComparisonID::kLess),
+          new ScalarAttribute(dim_col_long),
+          new ScalarLiteral(TypedValue(static_cast<std::int64_t>(15)), LongType::InstanceNonNullable())));
+
+  const QueryContext::predicate_id residual_pred_index = query_context_proto.predicates_size();
+  query_context_proto.add_predicates()->MergeFrom(residual_pred->getProto());
+
+  unique_ptr<HashJoinOperator> prober(new HashJoinOperator(
+      kQueryId,
+      *dim_table_,
+      *fact_table_,
+      true /* is_stored */,
+      { fact_col_long.getID(), fact_col_varchar.getID() },
+      fact_col_long.getType().isNullable() || fact_col_varchar.getType().isNullable(),
+      kMultiplePartitions,
+      *result_table,
+      output_destination_index,
+      join_hash_table_index,
+      residual_pred_index,
+      selection_index));
+
+  // Set up the QueryContext.
+  query_context_ =
+      make_unique<QueryContext>(query_context_proto, *db_, storage_manager_.get(), foreman_client_id_, &bus_);
+
+  // Execute the operators.
+  fetchAndExecuteWorkOrders(builder.get());
+
+  prober->informAllBlockingDependenciesMet();
+  fetchAndExecuteWorkOrders(prober.get());
+
+  // Check result values
+  // Note that the results might be in a different order.
+  std::size_t num_result_tuples = 0;
+
+  std::unique_ptr<std::size_t[]> dim_counts(new std::size_t[kNumDimTuples]);
+  std::memset(dim_counts.get(), 0, sizeof(std::size_t) * kNumDimTuples);
+
+  std::unique_ptr<std::size_t[]> fact_counts(new std::size_t[kNumFactTuples]);
+  std::memset(fact_counts.get(), 0, sizeof(std::size_t) * kNumFactTuples);
+
+  InsertDestination *insert_destination = query_context_->getInsertDestination(prober->getInsertDestinationID());
+  DCHECK(insert_destination);
+
+  const std::vector<block_id> &result_blocks = insert_destination->getTouchedBlocks();
+  for (std::size_t bid = 0; bid < result_blocks.size(); ++bid) {
+    BlockReference result_block = storage_manager_->getBlock(result_blocks[bid],
+                                                             insert_destination->getRelation());
+    const TupleStorageSubBlock &result_tuple_sub_block = result_block->getTupleStorageSubBlock();
+    num_result_tuples += result_tuple_sub_block.numTuples();
+    for (tuple_id i = 0; i <= result_tuple_sub_block.getMaxTupleID(); ++i) {
+      if (result_tuple_sub_block.hasTupleWithID(i)) {
+        TypedValue typed_value = result_tuple_sub_block.getAttributeValueTyped(
+            i, result_table->getAttributeByName("dim_long")->getID());
+        std::int64_t value = typed_value.getLiteral<std::int64_t>();
+        ASSERT_GE(value, 0);
+        ASSERT_LT(value, static_cast<std::int64_t>(kNumDimTuples));
+        ++dim_counts[value];
+
+        typed_value = result_tuple_sub_block.getAttributeValueTyped(
+            i, result_table->getAttributeByName("fact_long")->getID());
+        value = typed_value.getLiteral<std::int64_t>();
+        ASSERT_GE(value, 0);
+        ASSERT_LT(value, static_cast<std::int64_t>(kNumFactTuples));
+        ++fact_counts[value];
+      }
+    }
+
+    // Drop the block.
+    result_block.release();
+    storage_manager_->deleteBlockOrBlobFile(result_blocks[bid]);
+  }
+  EXPECT_EQ(8u, num_result_tuples);
+
+  for (tuple_id i = 0; i < kNumDimTuples; ++i) {
+    if ((i & 0x1) || (i >= 15)) {
+      EXPECT_EQ(0u, dim_counts[i]);
+    } else {
+      EXPECT_EQ(1u, dim_counts[i]);
+    }
+  }
+  for (tuple_id i = 0; i < kNumFactTuples; ++i) {
+    if (i >= 15) {
+      EXPECT_EQ(0u, fact_counts[i]);
+    } else {
+      if (i & 0x1) {
+        EXPECT_EQ(0u, fact_counts[i]);
+      } else {
+        EXPECT_EQ(1u, fact_counts[i]);
+      }
+    }
+  }
+
+  // Create cleaner operator.
+  auto cleaner = make_unique<DestroyHashOperator>(kQueryId, kMultiplePartitions, join_hash_table_index);
+  cleaner->informAllBlockingDependenciesMet();
+  fetchAndExecuteWorkOrders(cleaner.get());
+
+  db_->dropRelationById(output_relation_id);
+}
+
 // Note: INSTANTIATE_TEST_CASE_P has variadic arguments part. If the variable argument part
 //       is empty, C++11 standard says it should produce a warning. A warning is converted
 //       to an error since we use -Werror as a compiler parameter. It causes Travis to build.