You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/24 01:19:27 UTC
incubator-quickstep git commit: Added test for Partitioned Hash Join.
Repository: incubator-quickstep
Updated Branches:
refs/heads/partitioned-hash-join-test [created] 24fecfdad
Added test for Partitioned Hash Join.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/24fecfda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/24fecfda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/24fecfda
Branch: refs/heads/partitioned-hash-join-test
Commit: 24fecfdade8f0d7c778592cea5141d007017ae37
Parents: 7bce0b8
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Jan 23 17:19:13 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Jan 23 17:19:13 2017 -0800
----------------------------------------------------------------------
relational_operators/CMakeLists.txt | 2 +
.../tests/HashJoinOperator_unittest.cpp | 330 +++++++++++++++++++
2 files changed, 332 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24fecfda/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 66ea2d1..0d1cf6b 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -609,6 +609,8 @@ target_link_libraries(HashJoinOperator_unittest
quickstep_catalog_CatalogDatabase
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogTypedefs
+ quickstep_catalog_PartitionScheme
+ quickstep_catalog_PartitionSchemeHeader
quickstep_expressions_Expressions_proto
quickstep_expressions_predicate_ComparisonPredicate
quickstep_expressions_predicate_Predicate
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24fecfda/relational_operators/tests/HashJoinOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/HashJoinOperator_unittest.cpp b/relational_operators/tests/HashJoinOperator_unittest.cpp
index 60f05ea..8f890df 100644
--- a/relational_operators/tests/HashJoinOperator_unittest.cpp
+++ b/relational_operators/tests/HashJoinOperator_unittest.cpp
@@ -34,6 +34,8 @@
#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
+#include "catalog/PartitionSchemeHeader.hpp"
#include "expressions/Expressions.pb.h"
#include "expressions/predicate/ComparisonPredicate.hpp"
#include "expressions/predicate/Predicate.hpp"
@@ -83,6 +85,7 @@
using std::snprintf;
#endif
+using std::make_unique;
using std::unique_ptr;
namespace quickstep {
@@ -99,6 +102,9 @@ constexpr int kOpIndex = 0;
constexpr std::size_t kSinglePartition = 1;
+constexpr std::size_t kMultiplePartitions = 4;
+constexpr attribute_id kPartitionAttribute = 0;
+
} // namespace
class HashJoinOperatorTest : public ::testing::TestWithParam<HashTableImplType> {
@@ -1300,6 +1306,330 @@ INSTANTIATE_TEST_CASE_P(
HashTableImplType::kSeparateChaining,
HashTableImplType::kSimpleScalarSeparateChaining),); // NOLINT(whitespace/comma)
+// Unit test for hash joins with partitioned input relations.
+class PartitionedHashJoinOperatorTest : public ::testing::Test {
+ protected:
+ virtual void SetUp() {
+ thread_id_map_ = ClientIDMap::Instance();
+
+ bus_.Initialize();
+
+ const tmb::client_id worker_thread_client_id = bus_.Connect();
+ bus_.RegisterClientAsSender(worker_thread_client_id, kCatalogRelationNewBlockMessage);
+
+ // Usually the worker thread makes the following call. In this test setup,
+ // we don't have a worker thread hence we have to explicitly make the call.
+ thread_id_map_->addValue(worker_thread_client_id);
+
+ foreman_client_id_ = bus_.Connect();
+ bus_.RegisterClientAsSender(foreman_client_id_, kCatalogRelationNewBlockMessage);
+ bus_.RegisterClientAsReceiver(foreman_client_id_, kCatalogRelationNewBlockMessage);
+
+ storage_manager_ = make_unique<StorageManager>("./partitioned_hash_join_operator_test_data/");
+
+ // Create a database.
+ db_ = make_unique<CatalogDatabase>(nullptr, "database");
+
+ // Create tables, owned by db_.
+ dim_table_ = new CatalogRelation(NULL, "dim_table", 100);
+ db_->addRelation(dim_table_);
+
+ fact_table_ = new CatalogRelation(NULL, "fact_table", 101);
+ db_->addRelation(fact_table_);
+
+ // Add attributes.
+ const Type &long_type = LongType::InstanceNonNullable();
+ const Type &int_type = IntType::InstanceNonNullable();
+ const Type &char_type = CharType::InstanceNonNullable(kCharLength);
+ const Type &varchar_type = VarCharType::InstanceNonNullable(kCharLength);
+
+ CHECK_EQ(kPartitionAttribute,
+ dim_table_->addAttribute(new CatalogAttribute(dim_table_, "long", long_type)));
+ dim_table_->addAttribute(new CatalogAttribute(dim_table_, "int", int_type));
+ dim_table_->addAttribute(new CatalogAttribute(dim_table_, "char", char_type));
+ dim_table_->addAttribute(new CatalogAttribute(dim_table_, "varchar", varchar_type));
+
+ CHECK_EQ(kPartitionAttribute,
+ fact_table_->addAttribute(new CatalogAttribute(fact_table_, "long", long_type)));
+ fact_table_->addAttribute(new CatalogAttribute(fact_table_, "int", int_type));
+ fact_table_->addAttribute(new CatalogAttribute(fact_table_, "char", char_type));
+ fact_table_->addAttribute(new CatalogAttribute(fact_table_, "varchar", varchar_type));
+
+ // Set PartitionScheme.
+ dim_part_scheme_ = new PartitionScheme(new HashPartitionSchemeHeader(kMultiplePartitions, kPartitionAttribute));
+ dim_table_->setPartitionScheme(dim_part_scheme_);
+
+ fact_part_scheme_ = new PartitionScheme(new HashPartitionSchemeHeader(kMultiplePartitions, kPartitionAttribute));
+ fact_table_->setPartitionScheme(fact_part_scheme_);
+
+ // Create StorageLayout
+ std::unique_ptr<StorageBlockLayout> dim_layout(createStorageLayout(*dim_table_));
+ std::unique_ptr<StorageBlockLayout> fact_layout(createStorageLayout(*fact_table_));
+
+ // Create blocks per partition. The index is the partition id.
+ std::vector<MutableBlockReference> dim_partitioned_blocks;
+ for (partition_id part_id = 0; part_id < kMultiplePartitions; ++part_id) {
+ const block_id block = storage_manager_->createBlock(*dim_table_, *dim_layout);
+ dim_part_scheme_->addBlockToPartition(block, part_id);
+ // For a simpler teardown.
+ dim_table_->addBlock(block);
+
+ dim_partitioned_blocks.push_back(storage_manager_->getBlockMutable(block, *dim_table_));
+ }
+
+ // Insert tuples to dim table.
+ for (tuple_id tid = 0; tid < kNumDimTuples; ++tid) {
+ // First attribute (long): a sequence id.
+ // Second attribute (int): a looped value to test duplicate keys.
+ // Third attribute (char): an identical value to test Cartesian product.
+ // Forth attribute (varchar): a value to test duplicate variable-length keys.
+ Tuple tuple(createTuple(*dim_table_, tid, tid % kBlockSize, 100, tid / 2 * 2));
+ EXPECT_TRUE(dim_partitioned_blocks[tid % kMultiplePartitions]->insertTupleInBatch(tuple));
+ }
+
+ for (int i = 0; i < dim_partitioned_blocks.size(); ++i) {
+ dim_partitioned_blocks[i]->rebuild();
+ }
+
+ // Create blocks per partition. The index is the partition id.
+ std::vector<MutableBlockReference> fact_partitioned_blocks;
+ for (partition_id part_id = 0; part_id < kMultiplePartitions; ++part_id) {
+ const block_id block = storage_manager_->createBlock(*fact_table_, *fact_layout);
+ fact_part_scheme_->addBlockToPartition(block, part_id);
+ // For a simpler teardown.
+ fact_table_->addBlock(block);
+
+ fact_partitioned_blocks.push_back(storage_manager_->getBlockMutable(block, *fact_table_));
+ }
+
+ // Insert tuples to fact table.
+ for (tuple_id tid = 0; tid < kNumFactTuples; ++tid) {
+ // First attribute (long): a sequence id to join with dim_table.long. Each tuple has
+ // exact one match.
+ // Second attribute (int): a sequence id to join with dim_table.int. Each tuple in the
+ // first kBlockSize tuples has mutiple matches. Other tuples
+ // have no match.
+ // Third attribute (char): an identical value to test Cartesian product.
+ // Forth attribute (varchar): a sequence id to join with dim_table.var_char. Each tuple
+ // has two matches.
+ Tuple tuple(createTuple(*fact_table_, tid, tid, 100, tid));
+ EXPECT_TRUE(fact_partitioned_blocks[tid % kMultiplePartitions]->insertTupleInBatch(tuple));
+ }
+
+ for (int i = 0; i < fact_partitioned_blocks.size(); ++i) {
+ fact_partitioned_blocks[i]->rebuild();
+ }
+ }
+
+ virtual void TearDown() {
+ thread_id_map_->removeValue();
+
+ // Drop blocks from relations.
+ const std::vector<block_id> dim_blocks = dim_table_->getBlocksSnapshot();
+ for (const block_id block : dim_blocks) {
+ storage_manager_->deleteBlockOrBlobFile(block);
+ }
+
+ const std::vector<block_id> fact_blocks = fact_table_->getBlocksSnapshot();
+ for (const block_id block : fact_blocks) {
+ storage_manager_->deleteBlockOrBlobFile(block);
+ }
+ }
+
+ StorageBlockLayout* createStorageLayout(const CatalogRelation &relation) {
+ StorageBlockLayout *layout = new StorageBlockLayout(relation);
+ StorageBlockLayoutDescription *layout_desc = layout->getDescriptionMutable();
+
+ layout_desc->set_num_slots(1);
+
+ layout_desc->mutable_tuple_store_description()->set_sub_block_type(
+ TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE);
+
+ // Attempt to compress variable-length columns.
+ for (CatalogRelation::const_iterator attr_it = relation.begin();
+ attr_it != relation.end();
+ ++attr_it) {
+ if (attr_it->getType().isVariableLength()) {
+ layout_desc->mutable_tuple_store_description()->AddExtension(
+ CompressedPackedRowStoreTupleStorageSubBlockDescription::compressed_attribute_id,
+ attr_it->getID());
+ }
+ }
+
+ layout->finalize();
+ return layout;
+ }
+
+ Tuple createTuple(const CatalogRelation &relation,
+ const std::int64_t long_val,
+ const std::int64_t int_val,
+ const std::int64_t char_val,
+ const std::int64_t varchar_val) {
+ static const Type &char_type = CharType::InstanceNonNullable(kCharLength);
+ static const Type &varchar_type = VarCharType::InstanceNonNullable(kCharLength);
+ char buffer[kCharLength];
+
+ std::vector<TypedValue> attr_values;
+ attr_values.emplace_back(long_val);
+ attr_values.emplace_back(static_cast<int>(int_val));
+
+ snprintf(buffer, kCharLength, "%" PRId64, char_val);
+ attr_values.emplace_back(char_type.getTypeID(), buffer, kCharLength);
+ attr_values.back().ensureNotReference();
+
+ snprintf(buffer, kCharLength, "%" PRId64, varchar_val);
+ attr_values.emplace_back(varchar_type.getTypeID(), buffer, std::strlen(buffer) + 1);
+ attr_values.back().ensureNotReference();
+
+ return Tuple(std::move(attr_values));
+ }
+
+ void fetchAndExecuteWorkOrders(RelationalOperator *op) {
+ // Note: We treat each operator as an individual query plan DAG. The
+ // index for each operator should be set, so that the WorkOrdersContainer
+ // class' checks about operator index are successful.
+ op->setOperatorIndex(kOpIndex);
+ WorkOrdersContainer container(1, 0);
+ const std::size_t op_index = 0;
+ op->getAllWorkOrders(&container,
+ query_context_.get(),
+ storage_manager_.get(),
+ foreman_client_id_,
+ &bus_);
+
+ while (container.hasNormalWorkOrder(op_index)) {
+ WorkOrder *work_order = container.getNormalWorkOrder(op_index);
+ work_order->execute();
+ delete work_order;
+ }
+ }
+
+ // This map is needed for InsertDestination and some WorkOrders that send
+ // messages to Foreman directly. To know the reason behind the design of this
+ // map, see the note in InsertDestination.hpp.
+ ClientIDMap *thread_id_map_;
+
+ MessageBusImpl bus_;
+ tmb::client_id foreman_client_id_;
+
+ unique_ptr<QueryContext> query_context_;
+ unique_ptr<StorageManager> storage_manager_;
+
+ unique_ptr<CatalogDatabase> db_;
+ // The following CatalogRelations are owned by db_.
+ CatalogRelation *dim_table_, *fact_table_;
+ // The following PartitionSchemes are owned by its own CatalogRelation, respectively.
+ PartitionScheme *dim_part_scheme_, *fact_part_scheme_;
+};
+
+TEST_F(PartitionedHashJoinOperatorTest, SingleAttributePartitionedHashJoinTest) {
+ // Setup the hash table proto in the query context proto.
+ serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(kQueryId);
+
+ const QueryContext::join_hash_table_id join_hash_table_index =
+ query_context_proto.join_hash_tables_size();
+
+ serialization::QueryContext::HashTableContext *hash_table_context_proto =
+ query_context_proto.add_join_hash_tables();
+ hash_table_context_proto->set_num_partitions(kMultiplePartitions);
+
+ serialization::HashTable *hash_table_proto =
+ hash_table_context_proto->mutable_join_hash_table();
+ hash_table_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::SEPARATE_CHAINING);
+
+ const Type &long_type = LongType::InstanceNonNullable();
+
+ hash_table_proto->add_key_types()->MergeFrom(long_type.getProto());
+ hash_table_proto->set_estimated_num_entries(kNumDimTuples);
+
+ // Create the builder operator.
+ unique_ptr<BuildHashOperator> builder(
+ new BuildHashOperator(kQueryId, *dim_table_, true /* is_stored */, { kPartitionAttribute },
+ long_type.isNullable(), kMultiplePartitions, join_hash_table_index));
+
+ // Create the prober operator with one selection attribute.
+ const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size();
+ ScalarAttribute scalar_attr(*dim_table_->getAttributeByName("long"));
+ query_context_proto.add_scalar_groups()->add_scalars()->MergeFrom(scalar_attr.getProto());
+
+ // Create result_table, owned by db_.
+ CatalogRelation *result_table = new CatalogRelation(NULL, "result_table", 102);
+ result_table->addAttribute(new CatalogAttribute(result_table, "long", long_type));
+
+ const relation_id output_relation_id = db_->addRelation(result_table);
+
+ // Setup the InsertDestination proto in the query context proto.
+ const QueryContext::insert_destination_id output_destination_index =
+ query_context_proto.insert_destinations_size();
+ serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+
+ insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
+ insert_destination_proto->set_relation_id(output_relation_id);
+ insert_destination_proto->set_relational_op_index(kOpIndex);
+
+ unique_ptr<HashJoinOperator> prober(
+ new HashJoinOperator(kQueryId, *dim_table_, *fact_table_, true /* is_stored */, { kPartitionAttribute },
+ long_type.isNullable(), kMultiplePartitions, *result_table, output_destination_index,
+ join_hash_table_index, QueryContext::kInvalidPredicateId /* residual_predicate_index */,
+ selection_index));
+
+ // Set up the QueryContext.
+ query_context_ =
+ make_unique<QueryContext>(query_context_proto, *db_, storage_manager_.get(), foreman_client_id_, &bus_);
+
+ // Execute the operators.
+ fetchAndExecuteWorkOrders(builder.get());
+
+ prober->informAllBlockingDependenciesMet();
+ fetchAndExecuteWorkOrders(prober.get());
+
+ // Check result values
+ // Note that the results might be in a different order.
+ std::size_t num_result_tuples = 0;
+ std::unique_ptr<std::size_t[]> counts(new std::size_t[kNumDimTuples]);
+ std::memset(counts.get(), 0, sizeof(std::size_t) * kNumDimTuples);
+
+ DCHECK(query_context_);
+ InsertDestination *insert_destination = query_context_->getInsertDestination(prober->getInsertDestinationID());
+ DCHECK(insert_destination);
+
+ const std::vector<block_id> &result_blocks = insert_destination->getTouchedBlocks();
+ for (std::size_t bid = 0; bid < result_blocks.size(); ++bid) {
+ BlockReference result_block = storage_manager_->getBlock(result_blocks[bid],
+ insert_destination->getRelation());
+ const TupleStorageSubBlock &result_tuple_sub_block = result_block->getTupleStorageSubBlock();
+ num_result_tuples += result_tuple_sub_block.numTuples();
+ for (tuple_id i = 0; i <= result_tuple_sub_block.getMaxTupleID(); ++i) {
+ if (result_tuple_sub_block.hasTupleWithID(i)) {
+ TypedValue typed_value = result_tuple_sub_block.getAttributeValueTyped(
+ i, result_table->getAttributeByName("long")->getID());
+ std::int64_t value = typed_value.getLiteral<std::int64_t>();
+ ASSERT_GE(value, 0);
+ ASSERT_LT(value, static_cast<std::int64_t>(kNumDimTuples));
+ ++counts[value];
+ }
+ }
+
+ // Drop the block.
+ result_block.release();
+ storage_manager_->deleteBlockOrBlobFile(result_blocks[bid]);
+ }
+ EXPECT_EQ(static_cast<std::size_t>(kNumDimTuples), num_result_tuples);
+
+ for (tuple_id i = 0; i < kNumDimTuples; ++i) {
+ EXPECT_EQ(1u, counts[i]);
+ }
+
+ // Create cleaner operator.
+ auto cleaner = make_unique<DestroyHashOperator>(kQueryId, kMultiplePartitions, join_hash_table_index);
+ cleaner->informAllBlockingDependenciesMet();
+ fetchAndExecuteWorkOrders(cleaner.get());
+
+ db_->dropRelationById(output_relation_id);
+}
+
} // namespace quickstep
int main(int argc, char* argv[]) {