You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/06/16 04:20:08 UTC
[09/20] incubator-quickstep git commit: Long lived Foreman thread
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
index 8352d55..9204073 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
@@ -61,9 +61,11 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
bus_.Initialize();
- foreman_.reset(new Foreman(&bus_,
- test_database_loader_.catalog_database(),
- test_database_loader_.storage_manager()));
+ main_thread_client_id_ = bus_.Connect();
+ bus_.RegisterClientAsSender(main_thread_client_id_, kAdmitRequestMessage);
+ bus_.RegisterClientAsSender(main_thread_client_id_, kPoisonMessage);
+ bus_.RegisterClientAsReceiver(main_thread_client_id_, kWorkloadCompletionMessage);
+
worker_.reset(new Worker(0, &bus_));
std::vector<client_id> worker_client_ids;
@@ -75,27 +77,20 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
workers_.reset(new WorkerDirectory(1 /* number of workers */,
worker_client_ids, numa_nodes));
- foreman_->setWorkerDirectory(workers_.get());
+ foreman_.reset(new Foreman(main_thread_client_id_,
+ workers_.get(),
+ &bus_,
+ test_database_loader_.catalog_database(),
+ test_database_loader_.storage_manager()));
+ foreman_->start();
worker_->start();
}
~ExecutionGeneratorTestRunner() {
- std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
- TaggedMessage poison_tagged_message(poison_message.get(),
- sizeof(*poison_message),
- quickstep::kPoisonMessage);
-
- Address worker_address;
- MessageStyle single_receiver_style;
-
- worker_address.AddRecipient(worker_->getBusClientID());
- bus_.Send(foreman_->getBusClientID(),
- worker_address,
- single_receiver_style,
- std::move(poison_tagged_message));
-
+ QueryExecutionUtil::BroadcastPoisonMessage(main_thread_client_id_, &bus_);
worker_->join();
+ foreman_->join();
}
void runTestCase(const std::string &input,
@@ -112,6 +107,8 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
std::unique_ptr<WorkerDirectory> workers_;
+ tmb::client_id main_thread_client_id_;
+
// This map is needed for InsertDestination and some operators that send
// messages to Foreman directly. To know the reason behind the design of this
// map, see the note in InsertDestination.hpp.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_optimizer/tests/TestDatabaseLoader.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/TestDatabaseLoader.cpp b/query_optimizer/tests/TestDatabaseLoader.cpp
index 2de69b6..764ff2f 100644
--- a/query_optimizer/tests/TestDatabaseLoader.cpp
+++ b/query_optimizer/tests/TestDatabaseLoader.cpp
@@ -122,6 +122,7 @@ void TestDatabaseLoader::loadTestRelation() {
nullptr,
&storage_manager_,
0 /* dummy op index */,
+ 0, // dummy query ID.
scheduler_client_id_,
&bus_);
int sign = 1;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/DeleteOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp
index 47e36e9..933918b 100644
--- a/relational_operators/DeleteOperator.cpp
+++ b/relational_operators/DeleteOperator.cpp
@@ -132,6 +132,7 @@ void DeleteWorkOrder::execute() {
proto.set_operator_index(delete_operator_index_);
proto.set_block_id(input_block_id_);
proto.set_relation_id(input_relation_.getID());
+ proto.set_query_id(query_id_);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/DeleteOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.hpp b/relational_operators/DeleteOperator.hpp
index aa8a688..74da8c1 100644
--- a/relational_operators/DeleteOperator.hpp
+++ b/relational_operators/DeleteOperator.hpp
@@ -174,6 +174,7 @@ class DeleteWorkOrder : public WorkOrder {
StorageManager *storage_manager_;
const std::size_t delete_operator_index_;
+
const tmb::client_id scheduler_client_id_;
MessageBus *bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 6f4271d..9762f04 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -644,14 +644,14 @@ class HashOuterJoinWorkOrder : public WorkOrder {
* @param join_key_attributes The IDs of equijoin attributes in \c
* probe_relation.
* @param any_join_key_attributes_nullable If any attribute is nullable.
- * @param hash_table The JoinHashTable to use.
+ * @param lookup_block_id The block id of the probe_relation.
* @param selection A list of Scalars corresponding to the relation attributes
* in \c output_destination. Each Scalar is evaluated for the joined
* tuples, and the resulting value is inserted into the join result.
* @param is_selection_on_build Whether each Scalar in the \p selection vector
* is using attributes from the build relation as input. Note that the
* length of this vector should equal the length of \p selection.
- * @param lookup_block_id The block id of the probe_relation.
+ * @param hash_table The JoinHashTable to use.
* @param output_destination The InsertDestination to insert the join results.
* @param storage_manager The StorageManager to use.
**/
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/RebuildWorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RebuildWorkOrder.hpp b/relational_operators/RebuildWorkOrder.hpp
index 86f8eaf..3125447 100644
--- a/relational_operators/RebuildWorkOrder.hpp
+++ b/relational_operators/RebuildWorkOrder.hpp
@@ -85,6 +85,7 @@ class RebuildWorkOrder : public WorkOrder {
proto.set_operator_index(input_operator_index_);
proto.set_block_id(block_ref_->getID());
proto.set_relation_id(input_relation_id_);
+ proto.set_query_id(query_id_);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/SortMergeRunOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.cpp b/relational_operators/SortMergeRunOperator.cpp
index e398d62..1603b78 100644
--- a/relational_operators/SortMergeRunOperator.cpp
+++ b/relational_operators/SortMergeRunOperator.cpp
@@ -327,6 +327,7 @@ void SortMergeRunWorkOrder::execute() {
// Send completion message to operator.
FeedbackMessage msg(SortMergeRunOperator::kRunOutputMessage,
+ getQueryID(),
operator_index_,
serialized_output.first,
serialized_output.second);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index 1b2979e..f103b0e 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -114,6 +114,7 @@ void UpdateWorkOrder::execute() {
proto.set_operator_index(update_operator_index_);
proto.set_block_id(input_block_id_);
proto.set_relation_id(relation_.getID());
+ proto.set_query_id(query_id_);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/UpdateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.hpp b/relational_operators/UpdateOperator.hpp
index f6c5053..4471a17 100644
--- a/relational_operators/UpdateOperator.hpp
+++ b/relational_operators/UpdateOperator.hpp
@@ -184,6 +184,7 @@ class UpdateWorkOrder : public WorkOrder {
StorageManager *storage_manager_;
const std::size_t update_operator_index_;
+
const tmb::client_id scheduler_client_id_;
MessageBus *bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp
index 059865d..df195cc 100644
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@ -65,20 +65,25 @@ class WorkOrder {
* relational operator.
*/
struct FeedbackMessageHeader {
+ std::size_t query_id;
std::size_t rel_op_index;
std::size_t payload_size;
FeedbackMessageType payload_type;
/**
* @brief Header constructor.
+ *
+ * @param query_id The ID of the query.
* @param relational_op_index Index of the relation operator.
* @param payload_size Size of the payload of the message.
* @param payload_type Type of payload.
*/
- FeedbackMessageHeader(const std::size_t relational_op_index,
+ FeedbackMessageHeader(const std::size_t query_id,
+ const std::size_t relational_op_index,
const std::size_t payload_size,
const FeedbackMessageType payload_type)
- : rel_op_index(relational_op_index),
+ : query_id(query_id),
+ rel_op_index(relational_op_index),
payload_size(payload_size),
payload_type(payload_type) {}
};
@@ -93,17 +98,19 @@ class WorkOrder {
* @brief Feedback message constructor.
*
* @param type Type of the message.
+ * @param query_id The ID of the query.
* @param rel_op_index Relational operator index.
* @param payload Blob of payload.
* @param payload_size Size of the payload blob.
* @param ownership Whether to take ownership of the payload blob.
*/
FeedbackMessage(const FeedbackMessageType type,
+ const std::size_t query_id,
const std::size_t rel_op_index,
void *payload,
const std::size_t payload_size,
const bool ownership = true)
- : header_(rel_op_index, payload_size, type),
+ : header_(query_id, rel_op_index, payload_size, type),
payload_(payload),
ownership_(ownership) {}
@@ -285,6 +292,13 @@ class WorkOrder {
" receiver thread with TMB client ID " << receiver_id;
}
+ /**
+ * @brief Get the ID of the query which this WorkOder belongs to.
+ **/
+ inline const std::size_t getQueryID() const {
+ return query_id_;
+ }
+
protected:
/**
* @brief Constructor.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index fdcc54f..fd4692a 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -228,6 +228,8 @@ class AggregationOperatorTest : public ::testing::Test {
// Setup the aggregation state proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
+
const QueryContext::aggregation_state_id aggr_state_index = query_context_proto.aggregation_states_size();
serialization::AggregationOperationState *aggr_state_proto = query_context_proto.add_aggregation_states();
aggr_state_proto->set_relation_id(table_->getID());
@@ -319,6 +321,8 @@ class AggregationOperatorTest : public ::testing::Test {
// Setup the aggregation state proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
+
const QueryContext::aggregation_state_id aggr_state_index = query_context_proto.aggregation_states_size();
serialization::AggregationOperationState *aggr_state_proto = query_context_proto.add_aggregation_states();
aggr_state_proto->set_relation_id(table_->getID());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/tests/HashJoinOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/HashJoinOperator_unittest.cpp b/relational_operators/tests/HashJoinOperator_unittest.cpp
index 074b603..9c34170 100644
--- a/relational_operators/tests/HashJoinOperator_unittest.cpp
+++ b/relational_operators/tests/HashJoinOperator_unittest.cpp
@@ -294,6 +294,7 @@ class HashJoinOperatorTest : public ::testing::TestWithParam<HashTableImplType>
TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::join_hash_table_id join_hash_table_index =
query_context_proto.join_hash_tables_size();
@@ -434,6 +435,7 @@ TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::join_hash_table_id join_hash_table_index =
query_context_proto.join_hash_tables_size();
@@ -604,6 +606,7 @@ TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::join_hash_table_id join_hash_table_index =
query_context_proto.join_hash_tables_size();
@@ -739,6 +742,7 @@ TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::join_hash_table_id join_hash_table_index =
query_context_proto.join_hash_tables_size();
@@ -906,6 +910,7 @@ TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::join_hash_table_id join_hash_table_index =
query_context_proto.join_hash_tables_size();
@@ -1083,6 +1088,7 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) {
// Setup the hash table proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::join_hash_table_id join_hash_table_index =
query_context_proto.join_hash_tables_size();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/tests/SortMergeRunOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortMergeRunOperator_unittest.cpp b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
index fc10671..46bce5b 100644
--- a/relational_operators/tests/SortMergeRunOperator_unittest.cpp
+++ b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
@@ -189,6 +189,7 @@ class RunTest : public ::testing::Test {
nullptr,
storage_manager_.get(),
kOpIndex,
+ 0, // dummy query ID.
foreman_client_id_,
&bus_));
}
@@ -433,6 +434,7 @@ class RunMergerTest : public ::testing::Test {
nullptr,
storage_manager_.get(),
kOpIndex,
+ 0, // dummy query ID.
foreman_client_id_,
&bus_));
}
@@ -1269,6 +1271,8 @@ class SortMergeRunOperatorTest : public ::testing::Test {
ASSERT_EQ(null_col3_, result_table_->getAttributeByName("null-col-3")->getID());
ASSERT_EQ(tid_col_, result_table_->getAttributeByName("tid")->getID());
+ query_context_proto_.set_query_id(0); // dummy query ID.
+
// Setup the InsertDestination proto in the query context proto.
insert_destination_index_ = query_context_proto_.insert_destinations_size();
serialization::InsertDestination *insert_destination_proto = query_context_proto_.add_insert_destinations();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
index 71a80e4..bd682c2 100644
--- a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
+++ b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
@@ -328,6 +328,7 @@ class SortRunGenerationOperatorTest : public ::testing::Test {
const std::vector<bool> &null_ordering) {
// Setup the InsertDestination proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto.insert_destinations_size();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/tests/TextScanOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/TextScanOperator_unittest.cpp b/relational_operators/tests/TextScanOperator_unittest.cpp
index 5860745..5bcbee5 100644
--- a/relational_operators/tests/TextScanOperator_unittest.cpp
+++ b/relational_operators/tests/TextScanOperator_unittest.cpp
@@ -180,6 +180,7 @@ TEST_F(TextScanOperatorTest, ScanTest) {
// Setup the InsertDestination proto in the query context proto.
serialization::QueryContext query_context_proto;
+ query_context_proto.set_query_id(0); // dummy query ID.
QueryContext::insert_destination_id output_destination_index = query_context_proto.insert_destinations_size();
serialization::InsertDestination *output_destination_proto = query_context_proto.add_insert_destinations();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 354bed4..2866c5f 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -60,6 +60,7 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation,
const StorageBlockLayout *layout,
StorageManager *storage_manager,
const std::size_t relational_op_index,
+ const std::size_t query_id,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus)
: thread_id_map_(*ClientIDMap::Instance()),
@@ -67,6 +68,7 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation,
relation_(relation),
layout_(layout),
relational_op_index_(relational_op_index),
+ query_id_(query_id),
scheduler_client_id_(scheduler_client_id),
bus_(DCHECK_NOTNULL(bus)) {
if (layout_ == nullptr) {
@@ -74,11 +76,13 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation,
}
}
-InsertDestination* InsertDestination::ReconstructFromProto(const serialization::InsertDestination &proto,
- const CatalogRelationSchema &relation,
- StorageManager *storage_manager,
- const tmb::client_id scheduler_client_id,
- tmb::MessageBus *bus) {
+InsertDestination* InsertDestination::ReconstructFromProto(
+ const std::size_t query_id,
+ const serialization::InsertDestination &proto,
+ const CatalogRelationSchema &relation,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) {
DCHECK(ProtoIsValid(proto, relation));
StorageBlockLayout *layout = nullptr;
@@ -93,6 +97,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization::
layout,
storage_manager,
proto.relational_op_index(),
+ query_id,
scheduler_client_id,
bus);
}
@@ -107,6 +112,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization::
storage_manager,
move(blocks),
proto.relational_op_index(),
+ query_id,
scheduler_client_id,
bus);
}
@@ -134,6 +140,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization::
storage_manager,
move(partitions),
proto.relational_op_index(),
+ query_id,
scheduler_client_id,
bus);
}
@@ -262,6 +269,7 @@ MutableBlockReference AlwaysCreateBlockInsertDestination::createNewBlock() {
serialization::CatalogRelationNewBlockMessage proto;
proto.set_relation_id(relation_.getID());
proto.set_block_id(new_id);
+ proto.set_query_id(getQueryID());
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
@@ -309,6 +317,7 @@ MutableBlockReference BlockPoolInsertDestination::createNewBlock() {
serialization::CatalogRelationNewBlockMessage proto;
proto.set_relation_id(relation_.getID());
proto.set_block_id(new_id);
+ proto.set_query_id(getQueryID());
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
@@ -385,21 +394,29 @@ const std::vector<block_id>& BlockPoolInsertDestination::getTouchedBlocksInterna
return done_block_ids_;
}
-PartitionAwareInsertDestination::PartitionAwareInsertDestination(PartitionSchemeHeader *partition_scheme_header,
- const CatalogRelationSchema &relation,
- const StorageBlockLayout *layout,
- StorageManager *storage_manager,
- vector<vector<block_id>> &&partitions,
- const std::size_t relational_op_index,
- const tmb::client_id scheduler_client_id,
- tmb::MessageBus *bus)
- : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus),
+PartitionAwareInsertDestination::PartitionAwareInsertDestination(
+ PartitionSchemeHeader *partition_scheme_header,
+ const CatalogRelationSchema &relation,
+ const StorageBlockLayout *layout,
+ StorageManager *storage_manager,
+ vector<vector<block_id>> &&partitions,
+ const std::size_t relational_op_index,
+ const std::size_t query_id,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus)
+ : InsertDestination(relation,
+ layout,
+ storage_manager,
+ relational_op_index,
+ query_id,
+ scheduler_client_id,
+ bus),
partition_scheme_header_(DCHECK_NOTNULL(partition_scheme_header)),
available_block_refs_(partition_scheme_header_->getNumPartitions()),
available_block_ids_(move(partitions)),
done_block_ids_(partition_scheme_header_->getNumPartitions()),
- mutexes_for_partition_(new SpinMutex[partition_scheme_header_->getNumPartitions()]) {
-}
+ mutexes_for_partition_(
+ new SpinMutex[partition_scheme_header_->getNumPartitions()]) {}
MutableBlockReference PartitionAwareInsertDestination::createNewBlock() {
FATAL_ERROR("PartitionAwareInsertDestination::createNewBlock needs a partition id as an argument.");
@@ -415,6 +432,7 @@ MutableBlockReference PartitionAwareInsertDestination::createNewBlockInPartition
proto.set_relation_id(relation_.getID());
proto.set_block_id(new_id);
proto.set_partition_id(part_id);
+ proto.set_query_id(getQueryID());
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 670cd6c..5ff33f5 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -78,6 +78,7 @@ class InsertDestination : public InsertDestinationInterface {
* @param storage_manager The StorageManager to use.
* @param relational_op_index The index of the relational operator in the
* QueryPlan DAG that has outputs.
+ * @param query_id The ID of this query.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
* @param bus A pointer to the TMB.
**/
@@ -85,6 +86,7 @@ class InsertDestination : public InsertDestinationInterface {
const StorageBlockLayout *layout,
StorageManager *storage_manager,
const std::size_t relational_op_index,
+ const std::size_t query_id,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus);
@@ -98,6 +100,7 @@ class InsertDestination : public InsertDestinationInterface {
* @brief A factory method to generate the InsertDestination from the
* serialized Protocol Buffer representation.
*
+ * @param query_id The ID of this query.
* @param proto A serialized Protocol Buffer representation of an
* InsertDestination, originally generated by the optimizer.
* @param relation The relation to insert tuples into.
@@ -107,11 +110,13 @@ class InsertDestination : public InsertDestinationInterface {
*
* @return The constructed InsertDestination.
*/
- static InsertDestination* ReconstructFromProto(const serialization::InsertDestination &proto,
- const CatalogRelationSchema &relation,
- StorageManager *storage_manager,
- const tmb::client_id scheduler_client_id,
- tmb::MessageBus *bus);
+ static InsertDestination* ReconstructFromProto(
+ const std::size_t query_id,
+ const serialization::InsertDestination &proto,
+ const CatalogRelationSchema &relation,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus);
/**
* @brief Check whether a serialized InsertDestination is fully-formed and
@@ -211,6 +216,7 @@ class InsertDestination : public InsertDestinationInterface {
proto.set_operator_index(relational_op_index_);
proto.set_block_id(id);
proto.set_relation_id(relation_.getID());
+ proto.set_query_id(query_id_);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();
@@ -253,6 +259,10 @@ class InsertDestination : public InsertDestinationInterface {
" ID " << scheduler_client_id_;
}
+ inline const std::size_t getQueryID() const {
+ return query_id_;
+ }
+
const ClientIDMap &thread_id_map_;
StorageManager *storage_manager_;
@@ -260,6 +270,7 @@ class InsertDestination : public InsertDestinationInterface {
std::unique_ptr<const StorageBlockLayout> layout_;
const std::size_t relational_op_index_;
+ const std::size_t query_id_;
tmb::client_id scheduler_client_id_;
tmb::MessageBus *bus_;
@@ -288,10 +299,16 @@ class AlwaysCreateBlockInsertDestination : public InsertDestination {
const StorageBlockLayout *layout,
StorageManager *storage_manager,
const std::size_t relational_op_index,
+ const std::size_t query_id,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus)
- : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus) {
- }
+ : InsertDestination(relation,
+ layout,
+ storage_manager,
+ relational_op_index,
+ query_id,
+ scheduler_client_id,
+ bus) {}
~AlwaysCreateBlockInsertDestination() override {
}
@@ -334,16 +351,23 @@ class BlockPoolInsertDestination : public InsertDestination {
* @param relational_op_index The index of the relational operator in the
* QueryPlan DAG that has outputs.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
+ * @param query_id The ID of the query.
* @param bus A pointer to the TMB.
**/
BlockPoolInsertDestination(const CatalogRelationSchema &relation,
const StorageBlockLayout *layout,
StorageManager *storage_manager,
const std::size_t relational_op_index,
+ const std::size_t query_id,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus)
- : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus) {
- }
+ : InsertDestination(relation,
+ layout,
+ storage_manager,
+ relational_op_index,
+ query_id,
+ scheduler_client_id,
+ bus) {}
/**
* @brief Constructor.
@@ -363,9 +387,16 @@ class BlockPoolInsertDestination : public InsertDestination {
StorageManager *storage_manager,
std::vector<block_id> &&blocks,
const std::size_t relational_op_index,
+ const std::size_t query_id,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus)
- : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus),
+ : InsertDestination(relation,
+ layout,
+ storage_manager,
+ relational_op_index,
+ query_id,
+ scheduler_client_id,
+ bus),
available_block_ids_(std::move(blocks)) {
// TODO(chasseur): Once block fill statistics are available, replace this
// with something smarter.
@@ -386,7 +417,6 @@ class BlockPoolInsertDestination : public InsertDestination {
MutableBlockReference createNewBlock() override;
private:
- FRIEND_TEST(ForemanTest, TwoNodesDAGPartiallyFilledBlocksTest);
FRIEND_TEST(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest);
// A vector of references to blocks which are loaded in memory.
@@ -416,17 +446,20 @@ class PartitionAwareInsertDestination : public InsertDestination {
* @param partitions The blocks in partitions.
* @param relational_op_index The index of the relational operator in the
* QueryPlan DAG that has outputs.
+ * @param query_id The ID of the query.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
* @param bus A pointer to the TMB.
**/
- PartitionAwareInsertDestination(PartitionSchemeHeader *partition_scheme_header,
- const CatalogRelationSchema &relation,
- const StorageBlockLayout *layout,
- StorageManager *storage_manager,
- std::vector<std::vector<block_id>> &&partitions,
- const std::size_t relational_op_index,
- const tmb::client_id scheduler_client_id,
- tmb::MessageBus *bus);
+ PartitionAwareInsertDestination(
+ PartitionSchemeHeader *partition_scheme_header,
+ const CatalogRelationSchema &relation,
+ const StorageBlockLayout *layout,
+ StorageManager *storage_manager,
+ std::vector<std::vector<block_id>> &&partitions,
+ const std::size_t relational_op_index,
+ const std::size_t query_id,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus);
~PartitionAwareInsertDestination() override {
delete[] mutexes_for_partition_;