You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/01/29 06:41:10 UTC
[53/54] incubator-quickstep git commit: Added tests for Partitioned
Hash Join.
Added tests for Partitioned Hash Join.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3210500b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3210500b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3210500b
Branch: refs/heads/reorder-attrs
Commit: 3210500b87a90a9ed302a0bafbb1dd1d1a6081ed
Parents: 66178d7
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Jan 23 17:19:13 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sat Jan 28 18:14:42 2017 -0800
----------------------------------------------------------------------
relational_operators/CMakeLists.txt | 2 +
.../tests/HashJoinOperator_unittest.cpp | 693 +++++++++++++++++--
2 files changed, 645 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3210500b/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index c2db4ec..c1caaa3 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -606,6 +606,8 @@ target_link_libraries(HashJoinOperator_unittest
quickstep_catalog_CatalogDatabase
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogTypedefs
+ quickstep_catalog_PartitionScheme
+ quickstep_catalog_PartitionSchemeHeader
quickstep_expressions_Expressions_proto
quickstep_expressions_predicate_ComparisonPredicate
quickstep_expressions_predicate_Predicate
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3210500b/relational_operators/tests/HashJoinOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/HashJoinOperator_unittest.cpp b/relational_operators/tests/HashJoinOperator_unittest.cpp
index 60f05ea..03350d4 100644
--- a/relational_operators/tests/HashJoinOperator_unittest.cpp
+++ b/relational_operators/tests/HashJoinOperator_unittest.cpp
@@ -34,6 +34,8 @@
#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
+#include "catalog/PartitionSchemeHeader.hpp"
#include "expressions/Expressions.pb.h"
#include "expressions/predicate/ComparisonPredicate.hpp"
#include "expressions/predicate/Predicate.hpp"
@@ -83,6 +85,8 @@
using std::snprintf;
#endif
+using std::make_unique;
+using std::size_t;
using std::unique_ptr;
namespace quickstep {
@@ -98,6 +102,7 @@ constexpr std::size_t kQueryId = 0;
constexpr int kOpIndex = 0;
constexpr std::size_t kSinglePartition = 1;
+constexpr std::size_t kMultiplePartitions = 4;
} // namespace
@@ -144,56 +149,6 @@ class HashJoinOperatorTest : public ::testing::TestWithParam<HashTableImplType>
fact_table_->addAttribute(new CatalogAttribute(fact_table_, "int", int_type));
fact_table_->addAttribute(new CatalogAttribute(fact_table_, "char", char_type));
fact_table_->addAttribute(new CatalogAttribute(fact_table_, "varchar", varchar_type));
-
- // Create StorageLayout
- std::unique_ptr<StorageBlockLayout> dim_layout(createStorageLayout(*dim_table_));
- std::unique_ptr<StorageBlockLayout> fact_layout(createStorageLayout(*fact_table_));
-
- // Insert tuples to dim table.
- std::unique_ptr<Tuple> tuple;
- MutableBlockReference storage_block;
- for (tuple_id i = 0; i < kNumDimTuples; i += kBlockSize) {
- // Create block.
- block_id block_id = storage_manager_->createBlock(*dim_table_, *dim_layout);
- storage_block = storage_manager_->getBlockMutable(block_id, *dim_table_);
- dim_table_->addBlock(block_id);
-
- // Insert tuples.
- tuple_id block_bound = i + kBlockSize < kNumDimTuples ? i + kBlockSize : kNumDimTuples;
- for (tuple_id tid = i; tid < block_bound; ++tid) {
- // First attribute (long): a sequence id.
- // Second attribute (int): a looped value to test duplicate keys.
- // Third attribute (char): an identical value to test Cartesian product.
- // Forth attribute (varchar): a value to test duplicate variable-length keys.
- tuple.reset(createTuple(*dim_table_, tid, tid % kBlockSize, 100, tid / 2 * 2));
- EXPECT_TRUE(storage_block->insertTupleInBatch(*tuple));
- }
- storage_block->rebuild();
- }
-
- // Insert tuples to fact table.
- for (tuple_id i = 0; i < kNumFactTuples; i += kBlockSize) {
- // Create block
- block_id block_id = storage_manager_->createBlock(*fact_table_, *fact_layout);
- storage_block = storage_manager_->getBlockMutable(block_id, *fact_table_);
- fact_table_->addBlock(block_id);
-
- // Insert tuples
- tuple_id block_bound = i + kBlockSize < kNumFactTuples ? i + kBlockSize : kNumFactTuples;
- for (tuple_id tid = i; tid < block_bound; ++tid) {
- // First attribute (long): a sequence id to join with dim_table.long. Each tuple has
- // exact one match.
- // Second attribute (int): a sequence id to join with dim_table.int. Each tuple in the
- // first kBlockSize tuples has mutiple matches. Other tuples
- // have no match.
- // Third attribute (char): an identical value to test Cartesian product.
- // Forth attribute (varchar): a sequence id to join with dim_table.var_char. Each tuple
- // has two matches.
- tuple.reset(createTuple(*fact_table_, tid, tid, 100, tid));
- EXPECT_TRUE(storage_block->insertTupleInBatch(*tuple));
- }
- storage_block->rebuild();
- }
}
virtual void TearDown() {
@@ -259,6 +214,127 @@ class HashJoinOperatorTest : public ::testing::TestWithParam<HashTableImplType>
return new Tuple(std::move(attr_values));
}
+ void insertTuplesWithoutPartitions() {
+ // Create StorageLayout
+ std::unique_ptr<StorageBlockLayout> dim_layout(createStorageLayout(*dim_table_));
+ std::unique_ptr<StorageBlockLayout> fact_layout(createStorageLayout(*fact_table_));
+
+ // Insert tuples to dim table.
+ std::unique_ptr<Tuple> tuple;
+ MutableBlockReference storage_block;
+ for (tuple_id i = 0; i < kNumDimTuples; i += kBlockSize) {
+ // Create block.
+ block_id block_id = storage_manager_->createBlock(*dim_table_, *dim_layout);
+ storage_block = storage_manager_->getBlockMutable(block_id, *dim_table_);
+ dim_table_->addBlock(block_id);
+
+ // Insert tuples.
+ tuple_id block_bound = i + kBlockSize < kNumDimTuples ? i + kBlockSize : kNumDimTuples;
+ for (tuple_id tid = i; tid < block_bound; ++tid) {
+ // First attribute (long): a sequence id.
+ // Second attribute (int): a looped value to test duplicate keys.
+ // Third attribute (char): an identical value to test Cartesian product.
+ // Forth attribute (varchar): a value to test duplicate variable-length keys.
+ tuple.reset(createTuple(*dim_table_, tid, tid % kBlockSize, 100, tid / 2 * 2));
+ EXPECT_TRUE(storage_block->insertTupleInBatch(*tuple));
+ }
+ storage_block->rebuild();
+ }
+
+ // Insert tuples to fact table.
+ for (tuple_id i = 0; i < kNumFactTuples; i += kBlockSize) {
+ // Create block
+ block_id block_id = storage_manager_->createBlock(*fact_table_, *fact_layout);
+ storage_block = storage_manager_->getBlockMutable(block_id, *fact_table_);
+ fact_table_->addBlock(block_id);
+
+ // Insert tuples
+ tuple_id block_bound = i + kBlockSize < kNumFactTuples ? i + kBlockSize : kNumFactTuples;
+ for (tuple_id tid = i; tid < block_bound; ++tid) {
+ // First attribute (long): a sequence id to join with dim_table.long. Each tuple has
+ // exact one match.
+ // Second attribute (int): a sequence id to join with dim_table.int. Each tuple in the
+ // first kBlockSize tuples has mutiple matches. Other tuples
+ // have no match.
+ // Third attribute (char): an identical value to test Cartesian product.
+ // Forth attribute (varchar): a sequence id to join with dim_table.var_char. Each tuple
+ // has two matches.
+ tuple.reset(createTuple(*fact_table_, tid, tid, 100, tid));
+ EXPECT_TRUE(storage_block->insertTupleInBatch(*tuple));
+ }
+ storage_block->rebuild();
+ }
+ }
+
+ void insertTuplesWithSingleAttributePartitions() {
+ // Set PartitionScheme.
+ dim_part_scheme_ = new PartitionScheme(
+ new HashPartitionSchemeHeader(kMultiplePartitions, dim_table_->getAttributeByName("long")->getID()));
+ dim_table_->setPartitionScheme(dim_part_scheme_);
+
+ fact_part_scheme_ = new PartitionScheme(
+ new HashPartitionSchemeHeader(kMultiplePartitions, fact_table_->getAttributeByName("long")->getID()));
+ fact_table_->setPartitionScheme(fact_part_scheme_);
+
+ // Create StorageLayout
+ std::unique_ptr<StorageBlockLayout> dim_layout(createStorageLayout(*dim_table_));
+ std::unique_ptr<StorageBlockLayout> fact_layout(createStorageLayout(*fact_table_));
+
+ // Create blocks per partition. The index is the partition id.
+ std::vector<MutableBlockReference> dim_partitioned_blocks;
+ for (partition_id part_id = 0; part_id < kMultiplePartitions; ++part_id) {
+ const block_id block = storage_manager_->createBlock(*dim_table_, *dim_layout);
+ dim_part_scheme_->addBlockToPartition(block, part_id);
+ // For a simpler teardown.
+ dim_table_->addBlock(block);
+
+ dim_partitioned_blocks.push_back(storage_manager_->getBlockMutable(block, *dim_table_));
+ }
+
+ // Insert tuples to dim table.
+ for (tuple_id tid = 0; tid < kNumDimTuples; ++tid) {
+ // First attribute (long): a sequence id.
+ // Second attribute (int): a looped value to test duplicate keys.
+ // Third attribute (char): an identical value to test Cartesian product.
+ // Forth attribute (varchar): a value to test duplicate variable-length keys.
+ unique_ptr<Tuple> tuple(createTuple(*dim_table_, tid, tid % kBlockSize, 100, tid / 2 * 2));
+ EXPECT_TRUE(dim_partitioned_blocks[tid % kMultiplePartitions]->insertTupleInBatch(*tuple));
+ }
+
+ for (size_t i = 0; i < dim_partitioned_blocks.size(); ++i) {
+ dim_partitioned_blocks[i]->rebuild();
+ }
+
+ // Create blocks per partition. The index is the partition id.
+ std::vector<MutableBlockReference> fact_partitioned_blocks;
+ for (partition_id part_id = 0; part_id < kMultiplePartitions; ++part_id) {
+ const block_id block = storage_manager_->createBlock(*fact_table_, *fact_layout);
+ fact_part_scheme_->addBlockToPartition(block, part_id);
+ // For a simpler teardown.
+ fact_table_->addBlock(block);
+
+ fact_partitioned_blocks.push_back(storage_manager_->getBlockMutable(block, *fact_table_));
+ }
+
+ // Insert tuples to fact table.
+ for (tuple_id tid = 0; tid < kNumFactTuples; ++tid) {
+ // First attribute (long): a sequence id to join with dim_table.long. Each tuple has
+ // exact one match.
+ // Second attribute (int): a sequence id to join with dim_table.int. Each tuple in the
+ // first kBlockSize tuples has mutiple matches. Other tuples
+ // have no match.
+ // Third attribute (char): an identical value to test Cartesian product.
+ // Forth attribute (varchar): a sequence id to join with dim_table.var_char. Each tuple
+ // has two matches.
+ unique_ptr<Tuple> tuple(createTuple(*fact_table_, tid, tid, 100, tid));
+ EXPECT_TRUE(fact_partitioned_blocks[tid % kMultiplePartitions]->insertTupleInBatch(*tuple));
+ }
+
+ for (size_t i = 0; i < fact_partitioned_blocks.size(); ++i) {
+ fact_partitioned_blocks[i]->rebuild();
+ }
+ }
+
void fetchAndExecuteWorkOrders(RelationalOperator *op) {
// Note: We treat each operator as an individual query plan DAG. The
// index for each operator should be set, so that the WorkOrdersContainer
@@ -293,9 +369,13 @@ class HashJoinOperatorTest : public ::testing::TestWithParam<HashTableImplType>
unique_ptr<CatalogDatabase> db_;
// The following CatalogRelations are owned by db_.
CatalogRelation *dim_table_, *fact_table_;
+ // The following PartitionSchemes are owned by its own CatalogRelation, respectively.
+ PartitionScheme *dim_part_scheme_ = nullptr, *fact_part_scheme_ = nullptr;
};
TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
+ insertTuplesWithoutPartitions();
+
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
query_context_proto.set_query_id(kQueryId);
@@ -439,6 +519,8 @@ TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
}
TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
+ insertTuplesWithoutPartitions();
+
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
query_context_proto.set_query_id(kQueryId);
@@ -612,6 +694,8 @@ TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
}
TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
+ insertTuplesWithoutPartitions();
+
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
query_context_proto.set_query_id(kQueryId);
@@ -750,6 +834,8 @@ TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
}
TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
+ insertTuplesWithoutPartitions();
+
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
query_context_proto.set_query_id(kQueryId);
@@ -920,6 +1006,8 @@ TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
}
TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
+ insertTuplesWithoutPartitions();
+
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
query_context_proto.set_query_id(kQueryId);
@@ -1100,6 +1188,8 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
// Same as above test, but add an additional residual filter predicate.
TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) {
+ insertTuplesWithoutPartitions();
+
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
query_context_proto.set_query_id(kQueryId);
@@ -1288,6 +1378,509 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) {
db_->dropRelationById(output_relation_id);
}
+// Hash join tests with single attribute partitions.
+TEST_P(HashJoinOperatorTest, SinlgeAttributePartitionedLongKeyHashJoinTest) {
+ insertTuplesWithSingleAttributePartitions();
+
+ // Setup the hash table proto in the query context proto.
+ serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(kQueryId);
+
+ const QueryContext::join_hash_table_id join_hash_table_index =
+ query_context_proto.join_hash_tables_size();
+
+ serialization::QueryContext::HashTableContext *hash_table_context_proto =
+ query_context_proto.add_join_hash_tables();
+ hash_table_context_proto->set_num_partitions(kMultiplePartitions);
+
+ serialization::HashTable *hash_table_proto =
+ hash_table_context_proto->mutable_join_hash_table();
+ switch (GetParam()) {
+ case HashTableImplType::kLinearOpenAddressing:
+ hash_table_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING);
+ break;
+ case HashTableImplType::kSeparateChaining:
+ hash_table_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::SEPARATE_CHAINING);
+ break;
+ case HashTableImplType::kSimpleScalarSeparateChaining:
+ if (TypedValue::HashIsReversible(kLong)) {
+ hash_table_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::SIMPLE_SCALAR_SEPARATE_CHAINING);
+ break;
+ } else {
+ // Can't use SimpleScalarSeparateChainingHashTable for long keys on
+ // this platform.
+ return;
+ }
+ default:
+ FATAL_ERROR("Unknown HashTable type requested for join.");
+ }
+
+ const Type &long_type = LongType::InstanceNonNullable();
+
+ hash_table_proto->add_key_types()->MergeFrom(long_type.getProto());
+ hash_table_proto->set_estimated_num_entries(kNumDimTuples);
+
+ const CatalogAttribute &dim_col_long = *dim_table_->getAttributeByName("long");
+ const CatalogAttribute &fact_col_long = *fact_table_->getAttributeByName("long");
+
+ // Create the builder operator.
+ unique_ptr<BuildHashOperator> builder(new BuildHashOperator(
+ kQueryId,
+ *dim_table_,
+ true /* is_stored */,
+ { dim_col_long.getID() },
+ dim_col_long.getType().isNullable(),
+ kMultiplePartitions,
+ join_hash_table_index));
+
+ // Create the prober operator with one selection attribute.
+ const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size();
+ ScalarAttribute scalar_attr(dim_col_long);
+ query_context_proto.add_scalar_groups()->add_scalars()->MergeFrom(scalar_attr.getProto());
+
+ // Create result_table, owned by db_.
+ CatalogRelation *result_table = new CatalogRelation(NULL, "result_table", 102);
+ result_table->addAttribute(new CatalogAttribute(result_table, "long", long_type));
+
+ const relation_id output_relation_id = db_->addRelation(result_table);
+
+ // Setup the InsertDestination proto in the query context proto.
+ const QueryContext::insert_destination_id output_destination_index =
+ query_context_proto.insert_destinations_size();
+ serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+
+ insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
+ insert_destination_proto->set_relation_id(output_relation_id);
+ insert_destination_proto->set_relational_op_index(kOpIndex);
+
+ unique_ptr<HashJoinOperator> prober(new HashJoinOperator(
+ kQueryId,
+ *dim_table_,
+ *fact_table_,
+ true /* is_stored */,
+ { fact_col_long.getID() },
+ fact_col_long.getType().isNullable(),
+ kMultiplePartitions,
+ *result_table,
+ output_destination_index,
+ join_hash_table_index,
+ QueryContext::kInvalidPredicateId /* residual_predicate_index */,
+ selection_index));
+
+ // Set up the QueryContext.
+ query_context_ =
+ make_unique<QueryContext>(query_context_proto, *db_, storage_manager_.get(), foreman_client_id_, &bus_);
+
+ // Execute the operators.
+ fetchAndExecuteWorkOrders(builder.get());
+
+ prober->informAllBlockingDependenciesMet();
+ fetchAndExecuteWorkOrders(prober.get());
+
+ // Check result values
+ // Note that the results might be in a different order.
+ std::size_t num_result_tuples = 0;
+ std::unique_ptr<std::size_t[]> counts(new std::size_t[kNumDimTuples]);
+ std::memset(counts.get(), 0, sizeof(std::size_t) * kNumDimTuples);
+
+ DCHECK(query_context_);
+ InsertDestination *insert_destination = query_context_->getInsertDestination(prober->getInsertDestinationID());
+ DCHECK(insert_destination);
+
+ const std::vector<block_id> &result_blocks = insert_destination->getTouchedBlocks();
+ for (std::size_t bid = 0; bid < result_blocks.size(); ++bid) {
+ BlockReference result_block = storage_manager_->getBlock(result_blocks[bid],
+ insert_destination->getRelation());
+ const TupleStorageSubBlock &result_tuple_sub_block = result_block->getTupleStorageSubBlock();
+ num_result_tuples += result_tuple_sub_block.numTuples();
+ for (tuple_id i = 0; i <= result_tuple_sub_block.getMaxTupleID(); ++i) {
+ if (result_tuple_sub_block.hasTupleWithID(i)) {
+ TypedValue typed_value = result_tuple_sub_block.getAttributeValueTyped(
+ i, result_table->getAttributeByName("long")->getID());
+ std::int64_t value = typed_value.getLiteral<std::int64_t>();
+ ASSERT_GE(value, 0);
+ ASSERT_LT(value, static_cast<std::int64_t>(kNumDimTuples));
+ ++counts[value];
+ }
+ }
+
+ // Drop the block.
+ result_block.release();
+ storage_manager_->deleteBlockOrBlobFile(result_blocks[bid]);
+ }
+ EXPECT_EQ(static_cast<std::size_t>(kNumDimTuples), num_result_tuples);
+
+ for (tuple_id i = 0; i < kNumDimTuples; ++i) {
+ EXPECT_EQ(1u, counts[i]);
+ }
+
+ // Create cleaner operator.
+ auto cleaner = make_unique<DestroyHashOperator>(kQueryId, kMultiplePartitions, join_hash_table_index);
+ cleaner->informAllBlockingDependenciesMet();
+ fetchAndExecuteWorkOrders(cleaner.get());
+
+ db_->dropRelationById(output_relation_id);
+}
+
+TEST_P(HashJoinOperatorTest, SinlgeAttributePartitionedCompositeKeyHashJoinTest) {
+ insertTuplesWithSingleAttributePartitions();
+
+ // Setup the hash table proto in the query context proto.
+ serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(kQueryId);
+
+ const QueryContext::join_hash_table_id join_hash_table_index =
+ query_context_proto.join_hash_tables_size();
+
+ serialization::QueryContext::HashTableContext *hash_table_context_proto =
+ query_context_proto.add_join_hash_tables();
+ hash_table_context_proto->set_num_partitions(kMultiplePartitions);
+
+ serialization::HashTable *hash_table_proto =
+ hash_table_context_proto->mutable_join_hash_table();
+ switch (GetParam()) {
+ case HashTableImplType::kLinearOpenAddressing:
+ hash_table_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING);
+ break;
+ case HashTableImplType::kSeparateChaining:
+ hash_table_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::SEPARATE_CHAINING);
+ break;
+ case HashTableImplType::kSimpleScalarSeparateChaining:
+ // Can't use SimpleScalarSeparateChainingHashTable with composite keys.
+ return;
+ default:
+ FATAL_ERROR("Unknown HashTable type requested for join.");
+ }
+
+ const Type &long_type = LongType::InstanceNonNullable();
+ const Type &varchar_type = VarCharType::InstanceNonNullable(kCharLength);
+
+ hash_table_proto->add_key_types()->MergeFrom(long_type.getProto());
+ hash_table_proto->add_key_types()->MergeFrom(varchar_type.getProto());
+ hash_table_proto->set_estimated_num_entries(kNumDimTuples);
+
+ const CatalogAttribute &dim_col_long = *dim_table_->getAttributeByName("long");
+ const CatalogAttribute &dim_col_varchar = *dim_table_->getAttributeByName("varchar");
+ const CatalogAttribute &fact_col_long = *fact_table_->getAttributeByName("long");
+ const CatalogAttribute &fact_col_varchar = *fact_table_->getAttributeByName("varchar");
+
+ // Create the builder operator.
+ unique_ptr<BuildHashOperator> builder(new BuildHashOperator(
+ kQueryId,
+ *dim_table_,
+ true /* is_stored */,
+ { dim_col_long.getID(), dim_col_varchar.getID() },
+ dim_col_long.getType().isNullable() || dim_col_varchar.getType().isNullable(),
+ kMultiplePartitions,
+ join_hash_table_index));
+
+ // Create the prober operator with two selection attributes.
+ const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size();
+ serialization::QueryContext::ScalarGroup *scalar_group_proto = query_context_proto.add_scalar_groups();
+
+ ScalarAttribute scalar_attr_dim(dim_col_long);
+ scalar_group_proto->add_scalars()->MergeFrom(scalar_attr_dim.getProto());
+ ScalarAttribute scalar_attr_fact(fact_col_long);
+ scalar_group_proto->add_scalars()->MergeFrom(scalar_attr_fact.getProto());
+
+ // Create result_table, owned by db_.
+ CatalogRelation *result_table = new CatalogRelation(NULL, "result_table", 102);
+ result_table->addAttribute(new CatalogAttribute(result_table, "dim_long", long_type));
+ result_table->addAttribute(new CatalogAttribute(result_table, "fact_long", long_type));
+
+ const relation_id output_relation_id = db_->addRelation(result_table);
+
+ // Setup the InsertDestination proto in the query context proto.
+ const QueryContext::insert_destination_id output_destination_index =
+ query_context_proto.insert_destinations_size();
+ serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+
+ insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
+ insert_destination_proto->set_relation_id(output_relation_id);
+ insert_destination_proto->set_relational_op_index(kOpIndex);
+
+ unique_ptr<HashJoinOperator> prober(new HashJoinOperator(
+ kQueryId,
+ *dim_table_,
+ *fact_table_,
+ true /* is_stored */,
+ { fact_col_long.getID(), fact_col_varchar.getID() },
+ fact_col_long.getType().isNullable() || fact_col_varchar.getType().isNullable(),
+ kMultiplePartitions,
+ *result_table,
+ output_destination_index,
+ join_hash_table_index,
+ QueryContext::kInvalidPredicateId /* residual_predicate_index */,
+ selection_index));
+
+ // Set up the QueryContext.
+ query_context_ =
+ make_unique<QueryContext>(query_context_proto, *db_, storage_manager_.get(), foreman_client_id_, &bus_);
+
+ // Execute the operators.
+ fetchAndExecuteWorkOrders(builder.get());
+
+ prober->informAllBlockingDependenciesMet();
+ fetchAndExecuteWorkOrders(prober.get());
+
+ // Check result values
+ // Note that the results might be in a different order.
+ std::size_t num_result_tuples = 0;
+
+ std::unique_ptr<std::size_t[]> dim_counts(new std::size_t[kNumDimTuples]);
+ std::memset(dim_counts.get(), 0, sizeof(std::size_t) * kNumDimTuples);
+
+ std::unique_ptr<std::size_t[]> fact_counts(new std::size_t[kNumFactTuples]);
+ std::memset(fact_counts.get(), 0, sizeof(std::size_t) * kNumFactTuples);
+
+ InsertDestination *insert_destination = query_context_->getInsertDestination(prober->getInsertDestinationID());
+ DCHECK(insert_destination);
+
+ const std::vector<block_id> &result_blocks = insert_destination->getTouchedBlocks();
+ for (std::size_t bid = 0; bid < result_blocks.size(); ++bid) {
+ BlockReference result_block = storage_manager_->getBlock(result_blocks[bid],
+ insert_destination->getRelation());
+ const TupleStorageSubBlock &result_tuple_sub_block = result_block->getTupleStorageSubBlock();
+ num_result_tuples += result_tuple_sub_block.numTuples();
+ for (tuple_id i = 0; i <= result_tuple_sub_block.getMaxTupleID(); ++i) {
+ if (result_tuple_sub_block.hasTupleWithID(i)) {
+ TypedValue typed_value = result_tuple_sub_block.getAttributeValueTyped(
+ i, result_table->getAttributeByName("dim_long")->getID());
+ std::int64_t value = typed_value.getLiteral<std::int64_t>();
+ ASSERT_GE(value, 0);
+ ASSERT_LT(value, static_cast<std::int64_t>(kNumDimTuples));
+ ++dim_counts[value];
+
+ typed_value = result_tuple_sub_block.getAttributeValueTyped(
+ i, result_table->getAttributeByName("fact_long")->getID());
+ value = typed_value.getLiteral<std::int64_t>();
+ ASSERT_GE(value, 0);
+ ASSERT_LT(value, static_cast<std::int64_t>(kNumFactTuples));
+ ++fact_counts[value];
+ }
+ }
+
+ // Drop the block.
+ result_block.release();
+ storage_manager_->deleteBlockOrBlobFile(result_blocks[bid]);
+ }
+ EXPECT_EQ(static_cast<std::size_t>(kNumDimTuples) / 2, num_result_tuples);
+
+ for (tuple_id i = 0; i < kNumDimTuples; ++i) {
+ if (i & 0x1) {
+ EXPECT_EQ(0u, dim_counts[i]);
+ } else {
+ EXPECT_EQ(1u, dim_counts[i]);
+ }
+ }
+ for (tuple_id i = 0; i < kNumFactTuples; ++i) {
+ if (i >= kNumDimTuples) {
+ EXPECT_EQ(0u, fact_counts[i]);
+ } else {
+ if (i & 0x1) {
+ EXPECT_EQ(0u, fact_counts[i]);
+ } else {
+ EXPECT_EQ(1u, fact_counts[i]);
+ }
+ }
+ }
+
+ // Create cleaner operator.
+ auto cleaner = make_unique<DestroyHashOperator>(kQueryId, kMultiplePartitions, join_hash_table_index);
+ cleaner->informAllBlockingDependenciesMet();
+ fetchAndExecuteWorkOrders(cleaner.get());
+
+ db_->dropRelationById(output_relation_id);
+}
+
+TEST_P(HashJoinOperatorTest, SinlgeAttributePartitionedCompositeKeyHashJoinWithResidualPredicateTest) {
+ insertTuplesWithSingleAttributePartitions();
+
+ // Setup the hash table proto in the query context proto.
+ serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(kQueryId);
+
+ const QueryContext::join_hash_table_id join_hash_table_index =
+ query_context_proto.join_hash_tables_size();
+
+ serialization::QueryContext::HashTableContext *hash_table_context_proto =
+ query_context_proto.add_join_hash_tables();
+ hash_table_context_proto->set_num_partitions(kMultiplePartitions);
+
+ serialization::HashTable *hash_table_proto =
+ hash_table_context_proto->mutable_join_hash_table();
+ switch (GetParam()) {
+ case HashTableImplType::kLinearOpenAddressing:
+ hash_table_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING);
+ break;
+ case HashTableImplType::kSeparateChaining:
+ hash_table_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::SEPARATE_CHAINING);
+ break;
+ case HashTableImplType::kSimpleScalarSeparateChaining:
+ // Can't use SimpleScalarSeparateChainingHashTable with composite keys.
+ return;
+ default:
+ FATAL_ERROR("Unknown HashTable type requested for join.");
+ }
+
+ const Type &long_type = LongType::InstanceNonNullable();
+ const Type &varchar_type = VarCharType::InstanceNonNullable(kCharLength);
+
+ hash_table_proto->add_key_types()->MergeFrom(long_type.getProto());
+ hash_table_proto->add_key_types()->MergeFrom(varchar_type.getProto());
+ hash_table_proto->set_estimated_num_entries(kNumDimTuples);
+
+ const CatalogAttribute &dim_col_long = *dim_table_->getAttributeByName("long");
+ const CatalogAttribute &dim_col_varchar = *dim_table_->getAttributeByName("varchar");
+ const CatalogAttribute &fact_col_long = *fact_table_->getAttributeByName("long");
+ const CatalogAttribute &fact_col_varchar = *fact_table_->getAttributeByName("varchar");
+
+ // Create the builder operator.
+ unique_ptr<BuildHashOperator> builder(new BuildHashOperator(
+ kQueryId,
+ *dim_table_,
+ true /* is_stored */,
+ { dim_col_long.getID(), dim_col_varchar.getID() },
+ dim_col_long.getType().isNullable() || dim_col_varchar.getType().isNullable(),
+ kMultiplePartitions,
+ join_hash_table_index));
+
+ // Create prober operator with two selection attributes.
+ const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size();
+ serialization::QueryContext::ScalarGroup *scalar_group_proto = query_context_proto.add_scalar_groups();
+
+ ScalarAttribute scalar_attr_dim(dim_col_long);
+ scalar_group_proto->add_scalars()->MergeFrom(scalar_attr_dim.getProto());
+ ScalarAttribute scalar_attr_fact(fact_col_long);
+ scalar_group_proto->add_scalars()->MergeFrom(scalar_attr_fact.getProto());
+
+ // Create result_table, owned by db_.
+ CatalogRelation *result_table = new CatalogRelation(NULL, "result_table", 102);
+ result_table->addAttribute(new CatalogAttribute(result_table, "dim_long", long_type));
+ result_table->addAttribute(new CatalogAttribute(result_table, "fact_long", long_type));
+
+ const relation_id output_relation_id = db_->addRelation(result_table);
+
+ // Setup the InsertDestination proto in the query context proto.
+ const QueryContext::insert_destination_id output_destination_index =
+ query_context_proto.insert_destinations_size();
+ serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+
+ insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
+ insert_destination_proto->set_relation_id(output_relation_id);
+ insert_destination_proto->set_relational_op_index(kOpIndex);
+
+ // Include a residual predicate that selects a subset of the joined tuples.
+ unique_ptr<Predicate> residual_pred(new ComparisonPredicate(
+ ComparisonFactory::GetComparison(
+ ComparisonID::kLess),
+ new ScalarAttribute(dim_col_long),
+ new ScalarLiteral(TypedValue(static_cast<std::int64_t>(15)), LongType::InstanceNonNullable())));
+
+ const QueryContext::predicate_id residual_pred_index = query_context_proto.predicates_size();
+ query_context_proto.add_predicates()->MergeFrom(residual_pred->getProto());
+
+ unique_ptr<HashJoinOperator> prober(new HashJoinOperator(
+ kQueryId,
+ *dim_table_,
+ *fact_table_,
+ true /* is_stored */,
+ { fact_col_long.getID(), fact_col_varchar.getID() },
+ fact_col_long.getType().isNullable() || fact_col_varchar.getType().isNullable(),
+ kMultiplePartitions,
+ *result_table,
+ output_destination_index,
+ join_hash_table_index,
+ residual_pred_index,
+ selection_index));
+
+ // Set up the QueryContext.
+ query_context_ =
+ make_unique<QueryContext>(query_context_proto, *db_, storage_manager_.get(), foreman_client_id_, &bus_);
+
+ // Execute the operators.
+ fetchAndExecuteWorkOrders(builder.get());
+
+ prober->informAllBlockingDependenciesMet();
+ fetchAndExecuteWorkOrders(prober.get());
+
+ // Check result values
+ // Note that the results might be in a different order.
+ std::size_t num_result_tuples = 0;
+
+ std::unique_ptr<std::size_t[]> dim_counts(new std::size_t[kNumDimTuples]);
+ std::memset(dim_counts.get(), 0, sizeof(std::size_t) * kNumDimTuples);
+
+ std::unique_ptr<std::size_t[]> fact_counts(new std::size_t[kNumFactTuples]);
+ std::memset(fact_counts.get(), 0, sizeof(std::size_t) * kNumFactTuples);
+
+ InsertDestination *insert_destination = query_context_->getInsertDestination(prober->getInsertDestinationID());
+ DCHECK(insert_destination);
+
+ const std::vector<block_id> &result_blocks = insert_destination->getTouchedBlocks();
+ for (std::size_t bid = 0; bid < result_blocks.size(); ++bid) {
+ BlockReference result_block = storage_manager_->getBlock(result_blocks[bid],
+ insert_destination->getRelation());
+ const TupleStorageSubBlock &result_tuple_sub_block = result_block->getTupleStorageSubBlock();
+ num_result_tuples += result_tuple_sub_block.numTuples();
+ for (tuple_id i = 0; i <= result_tuple_sub_block.getMaxTupleID(); ++i) {
+ if (result_tuple_sub_block.hasTupleWithID(i)) {
+ TypedValue typed_value = result_tuple_sub_block.getAttributeValueTyped(
+ i, result_table->getAttributeByName("dim_long")->getID());
+ std::int64_t value = typed_value.getLiteral<std::int64_t>();
+ ASSERT_GE(value, 0);
+ ASSERT_LT(value, static_cast<std::int64_t>(kNumDimTuples));
+ ++dim_counts[value];
+
+ typed_value = result_tuple_sub_block.getAttributeValueTyped(
+ i, result_table->getAttributeByName("fact_long")->getID());
+ value = typed_value.getLiteral<std::int64_t>();
+ ASSERT_GE(value, 0);
+ ASSERT_LT(value, static_cast<std::int64_t>(kNumFactTuples));
+ ++fact_counts[value];
+ }
+ }
+
+ // Drop the block.
+ result_block.release();
+ storage_manager_->deleteBlockOrBlobFile(result_blocks[bid]);
+ }
+ EXPECT_EQ(8u, num_result_tuples);
+
+ for (tuple_id i = 0; i < kNumDimTuples; ++i) {
+ if ((i & 0x1) || (i >= 15)) {
+ EXPECT_EQ(0u, dim_counts[i]);
+ } else {
+ EXPECT_EQ(1u, dim_counts[i]);
+ }
+ }
+ for (tuple_id i = 0; i < kNumFactTuples; ++i) {
+ if (i >= 15) {
+ EXPECT_EQ(0u, fact_counts[i]);
+ } else {
+ if (i & 0x1) {
+ EXPECT_EQ(0u, fact_counts[i]);
+ } else {
+ EXPECT_EQ(1u, fact_counts[i]);
+ }
+ }
+ }
+
+ // Create cleaner operator.
+ auto cleaner = make_unique<DestroyHashOperator>(kQueryId, kMultiplePartitions, join_hash_table_index);
+ cleaner->informAllBlockingDependenciesMet();
+ fetchAndExecuteWorkOrders(cleaner.get());
+
+ db_->dropRelationById(output_relation_id);
+}
+
// Note: INSTANTIATE_TEST_CASE_P has variadic arguments part. If the variable argument part
// is empty, C++11 standard says it should produce a warning. A warning is converted
// to an error since we use -Werror as a compiler parameter. It causes Travis to build.