You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/09 16:13:05 UTC
[2/4] incubator-quickstep git commit: Created QueryManager class and
tests.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/tests/SortMergeRunOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortMergeRunOperator_unittest.cpp b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
index fc10671..46fb7ae 100644
--- a/relational_operators/tests/SortMergeRunOperator_unittest.cpp
+++ b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
@@ -189,6 +189,7 @@ class RunTest : public ::testing::Test {
nullptr,
storage_manager_.get(),
kOpIndex,
+ 0, // dummy query ID.
foreman_client_id_,
&bus_));
}
@@ -433,6 +434,7 @@ class RunMergerTest : public ::testing::Test {
nullptr,
storage_manager_.get(),
kOpIndex,
+ 0, // dummy query ID.
foreman_client_id_,
&bus_));
}
@@ -1269,9 +1271,12 @@ class SortMergeRunOperatorTest : public ::testing::Test {
ASSERT_EQ(null_col3_, result_table_->getAttributeByName("null-col-3")->getID());
ASSERT_EQ(tid_col_, result_table_->getAttributeByName("tid")->getID());
+ query_context_proto_.set_query_id(0); // dummy query ID.
+
// Setup the InsertDestination proto in the query context proto.
insert_destination_index_ = query_context_proto_.insert_destinations_size();
serialization::InsertDestination *insert_destination_proto = query_context_proto_.add_insert_destinations();
+ insert_destination_proto->set_query_id(query_context_proto_.query_id());
insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(result_table_id);
@@ -1291,6 +1296,7 @@ class SortMergeRunOperatorTest : public ::testing::Test {
run_destination_index_ = query_context_proto_.insert_destinations_size();
insert_destination_proto = query_context_proto_.add_insert_destinations();
+ insert_destination_proto->set_query_id(query_context_proto_.query_id());
insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(run_table_id);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
index 71a80e4..3eeb7e9 100644
--- a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
+++ b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
@@ -328,10 +328,12 @@ class SortRunGenerationOperatorTest : public ::testing::Test {
const std::vector<bool> &null_ordering) {
// Setup the InsertDestination proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto.insert_destinations_size();
serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+ insert_destination_proto->set_query_id(query_context_proto.query_id());
insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(result_table_->getID());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/tests/TextScanOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/TextScanOperator_unittest.cpp b/relational_operators/tests/TextScanOperator_unittest.cpp
index ef6fc2d..a972a37 100644
--- a/relational_operators/tests/TextScanOperator_unittest.cpp
+++ b/relational_operators/tests/TextScanOperator_unittest.cpp
@@ -180,9 +180,11 @@ TEST_F(TextScanOperatorTest, ScanTest) {
// Setup the InsertDestination proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
QueryContext::insert_destination_id output_destination_index = query_context_proto.insert_destinations_size();
serialization::InsertDestination *output_destination_proto = query_context_proto.add_insert_destinations();
+ output_destination_proto->set_query_id(query_context_proto.query_id());
output_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
output_destination_proto->set_relation_id(relation_->getID());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 354bed4..5e4dd28 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -60,6 +60,7 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation,
const StorageBlockLayout *layout,
StorageManager *storage_manager,
const std::size_t relational_op_index,
+ const std::size_t query_id,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus)
: thread_id_map_(*ClientIDMap::Instance()),
@@ -67,6 +68,7 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation,
relation_(relation),
layout_(layout),
relational_op_index_(relational_op_index),
+ query_id_(query_id),
scheduler_client_id_(scheduler_client_id),
bus_(DCHECK_NOTNULL(bus)) {
if (layout_ == nullptr) {
@@ -74,11 +76,12 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation,
}
}
-InsertDestination* InsertDestination::ReconstructFromProto(const serialization::InsertDestination &proto,
- const CatalogRelationSchema &relation,
- StorageManager *storage_manager,
- const tmb::client_id scheduler_client_id,
- tmb::MessageBus *bus) {
+InsertDestination* InsertDestination::ReconstructFromProto(
+ const serialization::InsertDestination &proto,
+ const CatalogRelationSchema &relation,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) {
DCHECK(ProtoIsValid(proto, relation));
StorageBlockLayout *layout = nullptr;
@@ -93,6 +96,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization::
layout,
storage_manager,
proto.relational_op_index(),
+ proto.query_id(),
scheduler_client_id,
bus);
}
@@ -107,6 +111,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization::
storage_manager,
move(blocks),
proto.relational_op_index(),
+ proto.query_id(),
scheduler_client_id,
bus);
}
@@ -134,6 +139,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization::
storage_manager,
move(partitions),
proto.relational_op_index(),
+ proto.query_id(),
scheduler_client_id,
bus);
}
@@ -262,6 +268,7 @@ MutableBlockReference AlwaysCreateBlockInsertDestination::createNewBlock() {
serialization::CatalogRelationNewBlockMessage proto;
proto.set_relation_id(relation_.getID());
proto.set_block_id(new_id);
+ proto.set_query_id(getQueryID());
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
@@ -309,6 +316,7 @@ MutableBlockReference BlockPoolInsertDestination::createNewBlock() {
serialization::CatalogRelationNewBlockMessage proto;
proto.set_relation_id(relation_.getID());
proto.set_block_id(new_id);
+ proto.set_query_id(getQueryID());
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
@@ -385,21 +393,29 @@ const std::vector<block_id>& BlockPoolInsertDestination::getTouchedBlocksInterna
return done_block_ids_;
}
-PartitionAwareInsertDestination::PartitionAwareInsertDestination(PartitionSchemeHeader *partition_scheme_header,
- const CatalogRelationSchema &relation,
- const StorageBlockLayout *layout,
- StorageManager *storage_manager,
- vector<vector<block_id>> &&partitions,
- const std::size_t relational_op_index,
- const tmb::client_id scheduler_client_id,
- tmb::MessageBus *bus)
- : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus),
+PartitionAwareInsertDestination::PartitionAwareInsertDestination(
+ PartitionSchemeHeader *partition_scheme_header,
+ const CatalogRelationSchema &relation,
+ const StorageBlockLayout *layout,
+ StorageManager *storage_manager,
+ vector<vector<block_id>> &&partitions,
+ const std::size_t relational_op_index,
+ const std::size_t query_id,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus)
+ : InsertDestination(relation,
+ layout,
+ storage_manager,
+ relational_op_index,
+ query_id,
+ scheduler_client_id,
+ bus),
partition_scheme_header_(DCHECK_NOTNULL(partition_scheme_header)),
available_block_refs_(partition_scheme_header_->getNumPartitions()),
available_block_ids_(move(partitions)),
done_block_ids_(partition_scheme_header_->getNumPartitions()),
- mutexes_for_partition_(new SpinMutex[partition_scheme_header_->getNumPartitions()]) {
-}
+ mutexes_for_partition_(
+ new SpinMutex[partition_scheme_header_->getNumPartitions()]) {}
MutableBlockReference PartitionAwareInsertDestination::createNewBlock() {
FATAL_ERROR("PartitionAwareInsertDestination::createNewBlock needs a partition id as an argument.");
@@ -415,6 +431,7 @@ MutableBlockReference PartitionAwareInsertDestination::createNewBlockInPartition
proto.set_relation_id(relation_.getID());
proto.set_block_id(new_id);
proto.set_partition_id(part_id);
+ proto.set_query_id(getQueryID());
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 670cd6c..6968149 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -78,6 +78,7 @@ class InsertDestination : public InsertDestinationInterface {
* @param storage_manager The StorageManager to use.
* @param relational_op_index The index of the relational operator in the
* QueryPlan DAG that has outputs.
+ * @param query_id The ID of this query.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
* @param bus A pointer to the TMB.
**/
@@ -85,6 +86,7 @@ class InsertDestination : public InsertDestinationInterface {
const StorageBlockLayout *layout,
StorageManager *storage_manager,
const std::size_t relational_op_index,
+ const std::size_t query_id,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus);
@@ -211,6 +213,7 @@ class InsertDestination : public InsertDestinationInterface {
proto.set_operator_index(relational_op_index_);
proto.set_block_id(id);
proto.set_relation_id(relation_.getID());
+ proto.set_query_id(query_id_);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();
@@ -253,6 +256,10 @@ class InsertDestination : public InsertDestinationInterface {
" ID " << scheduler_client_id_;
}
+ inline const std::size_t getQueryID() const {
+ return query_id_;
+ }
+
const ClientIDMap &thread_id_map_;
StorageManager *storage_manager_;
@@ -260,6 +267,7 @@ class InsertDestination : public InsertDestinationInterface {
std::unique_ptr<const StorageBlockLayout> layout_;
const std::size_t relational_op_index_;
+ const std::size_t query_id_;
tmb::client_id scheduler_client_id_;
tmb::MessageBus *bus_;
@@ -288,10 +296,16 @@ class AlwaysCreateBlockInsertDestination : public InsertDestination {
const StorageBlockLayout *layout,
StorageManager *storage_manager,
const std::size_t relational_op_index,
+ const std::size_t query_id,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus)
- : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus) {
- }
+ : InsertDestination(relation,
+ layout,
+ storage_manager,
+ relational_op_index,
+ query_id,
+ scheduler_client_id,
+ bus) {}
~AlwaysCreateBlockInsertDestination() override {
}
@@ -334,16 +348,23 @@ class BlockPoolInsertDestination : public InsertDestination {
* @param relational_op_index The index of the relational operator in the
* QueryPlan DAG that has outputs.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
+ * @param query_id The ID of the query.
* @param bus A pointer to the TMB.
**/
BlockPoolInsertDestination(const CatalogRelationSchema &relation,
const StorageBlockLayout *layout,
StorageManager *storage_manager,
const std::size_t relational_op_index,
+ const std::size_t query_id,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus)
- : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus) {
- }
+ : InsertDestination(relation,
+ layout,
+ storage_manager,
+ relational_op_index,
+ query_id,
+ scheduler_client_id,
+ bus) {}
/**
* @brief Constructor.
@@ -363,9 +384,16 @@ class BlockPoolInsertDestination : public InsertDestination {
StorageManager *storage_manager,
std::vector<block_id> &&blocks,
const std::size_t relational_op_index,
+ const std::size_t query_id,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus)
- : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus),
+ : InsertDestination(relation,
+ layout,
+ storage_manager,
+ relational_op_index,
+ query_id,
+ scheduler_client_id,
+ bus),
available_block_ids_(std::move(blocks)) {
// TODO(chasseur): Once block fill statistics are available, replace this
// with something smarter.
@@ -386,7 +414,6 @@ class BlockPoolInsertDestination : public InsertDestination {
MutableBlockReference createNewBlock() override;
private:
- FRIEND_TEST(ForemanTest, TwoNodesDAGPartiallyFilledBlocksTest);
FRIEND_TEST(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest);
// A vector of references to blocks which are loaded in memory.
@@ -416,17 +443,20 @@ class PartitionAwareInsertDestination : public InsertDestination {
* @param partitions The blocks in partitions.
* @param relational_op_index The index of the relational operator in the
* QueryPlan DAG that has outputs.
+ * @param query_id The ID of the query.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
* @param bus A pointer to the TMB.
**/
- PartitionAwareInsertDestination(PartitionSchemeHeader *partition_scheme_header,
- const CatalogRelationSchema &relation,
- const StorageBlockLayout *layout,
- StorageManager *storage_manager,
- std::vector<std::vector<block_id>> &&partitions,
- const std::size_t relational_op_index,
- const tmb::client_id scheduler_client_id,
- tmb::MessageBus *bus);
+ PartitionAwareInsertDestination(
+ PartitionSchemeHeader *partition_scheme_header,
+ const CatalogRelationSchema &relation,
+ const StorageBlockLayout *layout,
+ StorageManager *storage_manager,
+ std::vector<std::vector<block_id>> &&partitions,
+ const std::size_t relational_op_index,
+ const std::size_t query_id,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus);
~PartitionAwareInsertDestination() override {
delete[] mutexes_for_partition_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/storage/InsertDestination.proto
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.proto b/storage/InsertDestination.proto
index 6083539..a3d8acf 100644
--- a/storage/InsertDestination.proto
+++ b/storage/InsertDestination.proto
@@ -34,6 +34,7 @@ message InsertDestination {
optional StorageBlockLayoutDescription layout = 3;
required uint64 relational_op_index = 4;
+ required uint64 query_id = 5;
// The convention for extension numbering is that extensions for a particular
// tInsertDestinationType should begin from (insert_destination_type + 1) * 16.