You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/24 01:19:27 UTC

incubator-quickstep git commit: Added test for Partitioned Hash Join.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/partitioned-hash-join-test [created] 24fecfdad


Added test for Partitioned Hash Join.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/24fecfda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/24fecfda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/24fecfda

Branch: refs/heads/partitioned-hash-join-test
Commit: 24fecfdade8f0d7c778592cea5141d007017ae37
Parents: 7bce0b8
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Jan 23 17:19:13 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Jan 23 17:19:13 2017 -0800

----------------------------------------------------------------------
 relational_operators/CMakeLists.txt             |   2 +
 .../tests/HashJoinOperator_unittest.cpp         | 330 +++++++++++++++++++
 2 files changed, 332 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24fecfda/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 66ea2d1..0d1cf6b 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -609,6 +609,8 @@ target_link_libraries(HashJoinOperator_unittest
                       quickstep_catalog_CatalogDatabase
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_catalog_PartitionScheme
+                      quickstep_catalog_PartitionSchemeHeader
                       quickstep_expressions_Expressions_proto
                       quickstep_expressions_predicate_ComparisonPredicate
                       quickstep_expressions_predicate_Predicate

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/24fecfda/relational_operators/tests/HashJoinOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/HashJoinOperator_unittest.cpp b/relational_operators/tests/HashJoinOperator_unittest.cpp
index 60f05ea..8f890df 100644
--- a/relational_operators/tests/HashJoinOperator_unittest.cpp
+++ b/relational_operators/tests/HashJoinOperator_unittest.cpp
@@ -34,6 +34,8 @@
 #include "catalog/CatalogDatabase.hpp"
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
+#include "catalog/PartitionSchemeHeader.hpp"
 #include "expressions/Expressions.pb.h"
 #include "expressions/predicate/ComparisonPredicate.hpp"
 #include "expressions/predicate/Predicate.hpp"
@@ -83,6 +85,7 @@
 using std::snprintf;
 #endif
 
+using std::make_unique;
 using std::unique_ptr;
 
 namespace quickstep {
@@ -99,6 +102,9 @@ constexpr int kOpIndex = 0;
 
 constexpr std::size_t kSinglePartition = 1;
 
+constexpr std::size_t kMultiplePartitions = 4;
+constexpr attribute_id kPartitionAttribute = 0;
+
 }  // namespace
 
 class HashJoinOperatorTest : public ::testing::TestWithParam<HashTableImplType> {
@@ -1300,6 +1306,330 @@ INSTANTIATE_TEST_CASE_P(
         HashTableImplType::kSeparateChaining,
         HashTableImplType::kSimpleScalarSeparateChaining),);  // NOLINT(whitespace/comma)
 
+// Unit test for hash joins with partitioned input relations.
+class PartitionedHashJoinOperatorTest : public ::testing::Test {
+ protected:
+  virtual void SetUp() {
+    thread_id_map_ = ClientIDMap::Instance();
+
+    bus_.Initialize();
+
+    const tmb::client_id worker_thread_client_id = bus_.Connect();
+    bus_.RegisterClientAsSender(worker_thread_client_id, kCatalogRelationNewBlockMessage);
+
+    // Usually the worker thread makes the following call. In this test setup,
+    // we don't have a worker thread hence we have to explicitly make the call.
+    thread_id_map_->addValue(worker_thread_client_id);
+
+    foreman_client_id_ = bus_.Connect();
+    bus_.RegisterClientAsSender(foreman_client_id_, kCatalogRelationNewBlockMessage);
+    bus_.RegisterClientAsReceiver(foreman_client_id_, kCatalogRelationNewBlockMessage);
+
+    storage_manager_ = make_unique<StorageManager>("./partitioned_hash_join_operator_test_data/");
+
+    // Create a database.
+    db_ = make_unique<CatalogDatabase>(nullptr, "database");
+
+    // Create tables, owned by db_.
+    dim_table_ = new CatalogRelation(NULL, "dim_table", 100);
+    db_->addRelation(dim_table_);
+
+    fact_table_ = new CatalogRelation(NULL, "fact_table", 101);
+    db_->addRelation(fact_table_);
+
+    // Add attributes.
+    const Type &long_type = LongType::InstanceNonNullable();
+    const Type &int_type = IntType::InstanceNonNullable();
+    const Type &char_type = CharType::InstanceNonNullable(kCharLength);
+    const Type &varchar_type = VarCharType::InstanceNonNullable(kCharLength);
+
+    CHECK_EQ(kPartitionAttribute,
+             dim_table_->addAttribute(new CatalogAttribute(dim_table_, "long", long_type)));
+    dim_table_->addAttribute(new CatalogAttribute(dim_table_, "int", int_type));
+    dim_table_->addAttribute(new CatalogAttribute(dim_table_, "char", char_type));
+    dim_table_->addAttribute(new CatalogAttribute(dim_table_, "varchar", varchar_type));
+
+    CHECK_EQ(kPartitionAttribute,
+             fact_table_->addAttribute(new CatalogAttribute(fact_table_, "long", long_type)));
+    fact_table_->addAttribute(new CatalogAttribute(fact_table_, "int", int_type));
+    fact_table_->addAttribute(new CatalogAttribute(fact_table_, "char", char_type));
+    fact_table_->addAttribute(new CatalogAttribute(fact_table_, "varchar", varchar_type));
+
+    // Set PartitionScheme.
+    dim_part_scheme_ = new PartitionScheme(new HashPartitionSchemeHeader(kMultiplePartitions, kPartitionAttribute));
+    dim_table_->setPartitionScheme(dim_part_scheme_);
+
+    fact_part_scheme_ = new PartitionScheme(new HashPartitionSchemeHeader(kMultiplePartitions, kPartitionAttribute));
+    fact_table_->setPartitionScheme(fact_part_scheme_);
+
+    // Create StorageLayout
+    std::unique_ptr<StorageBlockLayout> dim_layout(createStorageLayout(*dim_table_));
+    std::unique_ptr<StorageBlockLayout> fact_layout(createStorageLayout(*fact_table_));
+
+    // Create blocks per partition. The index is the partition id.
+    std::vector<MutableBlockReference> dim_partitioned_blocks;
+    for (partition_id part_id = 0; part_id < kMultiplePartitions; ++part_id) {
+      const block_id block = storage_manager_->createBlock(*dim_table_, *dim_layout);
+      dim_part_scheme_->addBlockToPartition(block, part_id);
+      // For a simpler teardown.
+      dim_table_->addBlock(block);
+
+      dim_partitioned_blocks.push_back(storage_manager_->getBlockMutable(block, *dim_table_));
+    }
+
+    // Insert tuples to dim table.
+    for (tuple_id tid = 0; tid < kNumDimTuples; ++tid) {
+      // First attribute (long): a sequence id.
+      // Second attribute (int): a looped value to test duplicate keys.
+      // Third attribute (char): an identical value to test Cartesian product.
+      // Forth attribute (varchar): a value to test duplicate variable-length keys.
+      Tuple tuple(createTuple(*dim_table_, tid, tid % kBlockSize, 100, tid / 2 * 2));
+      EXPECT_TRUE(dim_partitioned_blocks[tid % kMultiplePartitions]->insertTupleInBatch(tuple));
+    }
+
+    for (int i = 0; i < dim_partitioned_blocks.size(); ++i) {
+      dim_partitioned_blocks[i]->rebuild();
+    }
+
+    // Create blocks per partition. The index is the partition id.
+    std::vector<MutableBlockReference> fact_partitioned_blocks;
+    for (partition_id part_id = 0; part_id < kMultiplePartitions; ++part_id) {
+      const block_id block = storage_manager_->createBlock(*fact_table_, *fact_layout);
+      fact_part_scheme_->addBlockToPartition(block, part_id);
+      // For a simpler teardown.
+      fact_table_->addBlock(block);
+
+      fact_partitioned_blocks.push_back(storage_manager_->getBlockMutable(block, *fact_table_));
+    }
+
+    // Insert tuples to fact table.
+    for (tuple_id tid = 0; tid < kNumFactTuples; ++tid) {
+      // First attribute (long): a sequence id to join with dim_table.long. Each tuple has
+      //                         exact one match.
+      // Second attribute (int): a sequence id to join with dim_table.int. Each tuple in the
+      //                         first kBlockSize tuples has mutiple matches. Other tuples
+      //                         have no match.
+      // Third attribute (char): an identical value to test Cartesian product.
+      // Forth attribute (varchar): a sequence id to join with dim_table.var_char. Each tuple
+      //                            has two matches.
+      Tuple tuple(createTuple(*fact_table_, tid, tid, 100, tid));
+      EXPECT_TRUE(fact_partitioned_blocks[tid % kMultiplePartitions]->insertTupleInBatch(tuple));
+    }
+
+    for (int i = 0; i < fact_partitioned_blocks.size(); ++i) {
+      fact_partitioned_blocks[i]->rebuild();
+    }
+  }
+
+  virtual void TearDown() {
+    thread_id_map_->removeValue();
+
+    // Drop blocks from relations.
+    const std::vector<block_id> dim_blocks = dim_table_->getBlocksSnapshot();
+    for (const block_id block : dim_blocks) {
+      storage_manager_->deleteBlockOrBlobFile(block);
+    }
+
+    const std::vector<block_id> fact_blocks = fact_table_->getBlocksSnapshot();
+    for (const block_id block : fact_blocks) {
+      storage_manager_->deleteBlockOrBlobFile(block);
+    }
+  }
+
+  StorageBlockLayout* createStorageLayout(const CatalogRelation &relation) {
+    StorageBlockLayout *layout = new StorageBlockLayout(relation);
+    StorageBlockLayoutDescription *layout_desc = layout->getDescriptionMutable();
+
+    layout_desc->set_num_slots(1);
+
+    layout_desc->mutable_tuple_store_description()->set_sub_block_type(
+        TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE);
+
+    // Attempt to compress variable-length columns.
+    for (CatalogRelation::const_iterator attr_it = relation.begin();
+         attr_it != relation.end();
+         ++attr_it) {
+      if (attr_it->getType().isVariableLength()) {
+        layout_desc->mutable_tuple_store_description()->AddExtension(
+            CompressedPackedRowStoreTupleStorageSubBlockDescription::compressed_attribute_id,
+            attr_it->getID());
+      }
+    }
+
+    layout->finalize();
+    return layout;
+  }
+
+  Tuple createTuple(const CatalogRelation &relation,
+                    const std::int64_t long_val,
+                    const std::int64_t int_val,
+                    const std::int64_t char_val,
+                    const std::int64_t varchar_val) {
+    static const Type &char_type = CharType::InstanceNonNullable(kCharLength);
+    static const Type &varchar_type = VarCharType::InstanceNonNullable(kCharLength);
+    char buffer[kCharLength];
+
+    std::vector<TypedValue> attr_values;
+    attr_values.emplace_back(long_val);
+    attr_values.emplace_back(static_cast<int>(int_val));
+
+    snprintf(buffer, kCharLength, "%" PRId64, char_val);
+    attr_values.emplace_back(char_type.getTypeID(), buffer, kCharLength);
+    attr_values.back().ensureNotReference();
+
+    snprintf(buffer, kCharLength, "%" PRId64, varchar_val);
+    attr_values.emplace_back(varchar_type.getTypeID(), buffer, std::strlen(buffer) + 1);
+    attr_values.back().ensureNotReference();
+
+    return Tuple(std::move(attr_values));
+  }
+
+  void fetchAndExecuteWorkOrders(RelationalOperator *op) {
+    // Note: We treat each operator as an individual query plan DAG. The
+    // index for each operator should be set, so that the WorkOrdersContainer
+    // class' checks about operator index are successful.
+    op->setOperatorIndex(kOpIndex);
+    WorkOrdersContainer container(1, 0);
+    const std::size_t op_index = 0;
+    op->getAllWorkOrders(&container,
+                         query_context_.get(),
+                         storage_manager_.get(),
+                         foreman_client_id_,
+                         &bus_);
+
+    while (container.hasNormalWorkOrder(op_index)) {
+      WorkOrder *work_order = container.getNormalWorkOrder(op_index);
+      work_order->execute();
+      delete work_order;
+    }
+  }
+
+  // This map is needed for InsertDestination and some WorkOrders that send
+  // messages to Foreman directly. To know the reason behind the design of this
+  // map, see the note in InsertDestination.hpp.
+  ClientIDMap *thread_id_map_;
+
+  MessageBusImpl bus_;
+  tmb::client_id foreman_client_id_;
+
+  unique_ptr<QueryContext> query_context_;
+  unique_ptr<StorageManager> storage_manager_;
+
+  unique_ptr<CatalogDatabase> db_;
+  // The following CatalogRelations are owned by db_.
+  CatalogRelation *dim_table_, *fact_table_;
+  // The following PartitionSchemes are owned by its own CatalogRelation, respectively.
+  PartitionScheme *dim_part_scheme_, *fact_part_scheme_;
+};
+
+TEST_F(PartitionedHashJoinOperatorTest, SingleAttributePartitionedHashJoinTest) {
+  // Setup the hash table proto in the query context proto.
+  serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(kQueryId);
+
+  const QueryContext::join_hash_table_id join_hash_table_index =
+      query_context_proto.join_hash_tables_size();
+
+  serialization::QueryContext::HashTableContext *hash_table_context_proto =
+      query_context_proto.add_join_hash_tables();
+  hash_table_context_proto->set_num_partitions(kMultiplePartitions);
+
+  serialization::HashTable *hash_table_proto =
+      hash_table_context_proto->mutable_join_hash_table();
+  hash_table_proto->set_hash_table_impl_type(
+      serialization::HashTableImplType::SEPARATE_CHAINING);
+
+  const Type &long_type = LongType::InstanceNonNullable();
+
+  hash_table_proto->add_key_types()->MergeFrom(long_type.getProto());
+  hash_table_proto->set_estimated_num_entries(kNumDimTuples);
+
+  // Create the builder operator.
+  unique_ptr<BuildHashOperator> builder(
+      new BuildHashOperator(kQueryId, *dim_table_, true /* is_stored */, { kPartitionAttribute },
+                            long_type.isNullable(), kMultiplePartitions, join_hash_table_index));
+
+  // Create the prober operator with one selection attribute.
+  const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size();
+  ScalarAttribute scalar_attr(*dim_table_->getAttributeByName("long"));
+  query_context_proto.add_scalar_groups()->add_scalars()->MergeFrom(scalar_attr.getProto());
+
+  // Create result_table, owned by db_.
+  CatalogRelation *result_table = new CatalogRelation(NULL, "result_table", 102);
+  result_table->addAttribute(new CatalogAttribute(result_table, "long", long_type));
+
+  const relation_id output_relation_id = db_->addRelation(result_table);
+
+  // Setup the InsertDestination proto in the query context proto.
+  const QueryContext::insert_destination_id output_destination_index =
+      query_context_proto.insert_destinations_size();
+  serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+
+  insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
+  insert_destination_proto->set_relation_id(output_relation_id);
+  insert_destination_proto->set_relational_op_index(kOpIndex);
+
+  unique_ptr<HashJoinOperator> prober(
+      new HashJoinOperator(kQueryId, *dim_table_, *fact_table_, true /* is_stored */, { kPartitionAttribute },
+                           long_type.isNullable(), kMultiplePartitions, *result_table, output_destination_index,
+                           join_hash_table_index, QueryContext::kInvalidPredicateId /* residual_predicate_index */,
+                           selection_index));
+
+  // Set up the QueryContext.
+  query_context_ =
+      make_unique<QueryContext>(query_context_proto, *db_, storage_manager_.get(), foreman_client_id_, &bus_);
+
+  // Execute the operators.
+  fetchAndExecuteWorkOrders(builder.get());
+
+  prober->informAllBlockingDependenciesMet();
+  fetchAndExecuteWorkOrders(prober.get());
+
+  // Check result values
+  // Note that the results might be in a different order.
+  std::size_t num_result_tuples = 0;
+  std::unique_ptr<std::size_t[]> counts(new std::size_t[kNumDimTuples]);
+  std::memset(counts.get(), 0, sizeof(std::size_t) * kNumDimTuples);
+
+  DCHECK(query_context_);
+  InsertDestination *insert_destination = query_context_->getInsertDestination(prober->getInsertDestinationID());
+  DCHECK(insert_destination);
+
+  const std::vector<block_id> &result_blocks = insert_destination->getTouchedBlocks();
+  for (std::size_t bid = 0; bid < result_blocks.size(); ++bid) {
+    BlockReference result_block = storage_manager_->getBlock(result_blocks[bid],
+                                                             insert_destination->getRelation());
+    const TupleStorageSubBlock &result_tuple_sub_block = result_block->getTupleStorageSubBlock();
+    num_result_tuples += result_tuple_sub_block.numTuples();
+    for (tuple_id i = 0; i <= result_tuple_sub_block.getMaxTupleID(); ++i) {
+      if (result_tuple_sub_block.hasTupleWithID(i)) {
+        TypedValue typed_value = result_tuple_sub_block.getAttributeValueTyped(
+            i, result_table->getAttributeByName("long")->getID());
+        std::int64_t value = typed_value.getLiteral<std::int64_t>();
+        ASSERT_GE(value, 0);
+        ASSERT_LT(value, static_cast<std::int64_t>(kNumDimTuples));
+        ++counts[value];
+      }
+    }
+
+    // Drop the block.
+    result_block.release();
+    storage_manager_->deleteBlockOrBlobFile(result_blocks[bid]);
+  }
+  EXPECT_EQ(static_cast<std::size_t>(kNumDimTuples), num_result_tuples);
+
+  for (tuple_id i = 0; i < kNumDimTuples; ++i) {
+    EXPECT_EQ(1u, counts[i]);
+  }
+
+  // Create cleaner operator.
+  auto cleaner = make_unique<DestroyHashOperator>(kQueryId, kMultiplePartitions, join_hash_table_index);
+  cleaner->informAllBlockingDependenciesMet();
+  fetchAndExecuteWorkOrders(cleaner.get());
+
+  db_->dropRelationById(output_relation_id);
+}
+
 }  // namespace quickstep
 
 int main(int argc, char* argv[]) {