You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/03 10:56:12 UTC
[4/7] incubator-quickstep git commit: Cleaned up the messages w/ a
dummy payload.
Cleaned up the messages w/ a dummy payload.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ccea2ff8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ccea2ff8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ccea2ff8
Branch: refs/heads/execution-dag-visualizer
Commit: ccea2ff83ea73e950d52c152cc422a9e93cf6aad
Parents: 52a32a3
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Fri Jul 29 23:52:33 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Mon Aug 1 10:11:19 2016 -0700
----------------------------------------------------------------------
query_execution/ForemanSingleNode.cpp | 4 +-
query_execution/QueryExecutionMessages.proto | 4 -
query_execution/QueryExecutionUtil.hpp | 5 +-
query_execution/README.md | 110 +++++++++----------
query_execution/WorkerMessage.hpp | 13 +--
query_execution/tests/BlockLocator_unittest.cpp | 11 +-
storage/tests/DataExchange_unittest.cpp | 11 +-
7 files changed, 60 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index cda02a7..d2b56ae 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -167,9 +167,7 @@ void ForemanSingleNode::run() {
if (!policy_enforcer_->hasQueries()) {
// Signal the main thread that there are no queries to be executed.
// Currently the message doesn't have any real content.
- const int dummy_payload = 0;
- TaggedMessage completion_tagged_message(
- &dummy_payload, sizeof(dummy_payload), kWorkloadCompletionMessage);
+ TaggedMessage completion_tagged_message(kWorkloadCompletionMessage);
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(
bus_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 308d736..f2219f6 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -20,10 +20,6 @@ import "catalog/Catalog.proto";
import "query_execution/QueryContext.proto";
import "relational_operators/WorkOrder.proto";
-// Used for any messages that do not carry payloads.
-message EmptyMessage {
-}
-
// Note: There are different types of completion messages for normal work orders
// rebuild work orders. This can be potentially helpful when we want to collect
// different statistics for executing different types of work orders.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 6ea4a29..5994f22 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -123,10 +123,7 @@ class QueryExecutionUtil {
style.Broadcast(true);
Address address;
address.All(true);
- std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
- TaggedMessage poison_tagged_message(poison_message.get(),
- sizeof(*poison_message),
- kPoisonMessage);
+ TaggedMessage poison_tagged_message(kPoisonMessage);
const tmb::MessageBus::SendStatus send_status = bus->Send(
sender_id, address, style, std::move(poison_tagged_message));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/README.md
----------------------------------------------------------------------
diff --git a/query_execution/README.md b/query_execution/README.md
index 22ad91d..12e0f57 100644
--- a/query_execution/README.md
+++ b/query_execution/README.md
@@ -3,19 +3,19 @@
## Types of threads
There are two kinds of threads in Quickstep - Foreman and Worker. The foreman
thread controls the query execution progress, finds schedulable work (called as
-WorkOrder) and assigns (or schedules) it for execution to the Worker threads.
+WorkOrder) and assigns (or schedules) it for execution to the Worker threads.
The Worker threads receive the WorkOrders and execute them. After execution they
-send a completion message (or response message) back to Foreman.
+send a completion message (or response message) back to Foreman.
## High level functionality of Foreman
-Foreman requests all the RelationalOperators in the physical query plan
+Foreman requests all the RelationalOperators in the physical query plan
represented as a DAG to give any schedulable work (in the form of WorkOrders).
While doing so, Foreman has to respect dependencies between operators. There are
-two kinds of dependencies between operators - pipeline breaking (or blocking)
+two kinds of dependencies between operators - pipeline breaking (or blocking)
and pipeline non-breaking (or non-blocking). In the first case, the output of
the producer operator can't be pipelined to the consumer operator. In the second
case, the Foreman will facilitate the pipelining of the intermediate output
-produced by the producer operator to the consumer operator.
+produced by the producer operator to the consumer operator.
## Messages in execution engine
@@ -26,110 +26,110 @@ of the message.
Foreman -> Worker : WorkerMessage which consists of the following things
- A pointer to the WorkOrder to be executed. The WorkOrder could be a normal
WorkOrder or a rebuild WorkOrder. A normal WorkOrder involves the invocation of
-WorkOrder::execute() method which is overriden by all of the RelationalOperator
-classes. A rebuild WorkOrder has one StorageBlock as input and calls a
+WorkOrder::execute() method which is overriden by all of the RelationalOperator
+classes. A rebuild WorkOrder has one StorageBlock as input and calls a
rebuild() method on the block. More details about rebuild() can be found in the
-storage module.
+storage module.
- The index of the relational operator in the query plan DAG that produced the
-WorkOrder.
-
-Main thread -> Worker : WorkerMessage of type PoisonMessage. This message is
-used to terminate the Worker thread, typically when shutting down the Quickstep
-process.
+WorkOrder.
### ForemanMessage
-Multiple senders are possible for this message. There are multiple types of
+Multiple senders are possible for this message. There are multiple types of
ForemanMessages, each of which indicates the purpose of the message.
-Worker -> Foreman : ForemanMessage of types WorkOrderCompletion and
-RebuildCompletion are sent after a Worker finishes executing a respective type
-of WorkOrder. This message helps the Foreman track the progress of individual
-operators as well as the whole query.
+Worker -> Foreman : ForemanMessage of types WorkOrderCompletion and
+RebuildCompletion are sent after a Worker finishes executing a respective type
+of WorkOrder. This message helps the Foreman track the progress of individual
+operators as well as the whole query.
Some relational operators and InsertDestination -> Foreman : ForemanMessage of
-types DataPipeline and WorkOrdersAvailable. InsertDestination first determines
+types DataPipeline and WorkOrdersAvailable. InsertDestination first determines
when an output block of a relational operator gets full. Once a block is full,
-it streams the unique block ID of the filled block along with the index of the
-relational operator that produced the block to Foreman with the message type
+it streams the unique block ID of the filled block along with the index of the
+relational operator that produced the block to Foreman with the message type
DataPipeline. Some operators which modify the block in place also send similar
-messages to Foreman.
+messages to Foreman.
### FeedbackMessage
This message is sent from Workers to the Foreman during a WorkOrder execution.
In certain operators, e.g. TextScan (used for bulk loading data from text files)
-and Sort, there is a communication between the relational operator and its
-WorkOrders. In such cases, when a WorkOrder is under execution on a Worker
+and Sort, there is a communication between the relational operator and its
+WorkOrders. In such cases, when a WorkOrder is under execution on a Worker
thread, a FeedbackMessage is sent from the WorkOrder via the Worker to Foreman.
Foreman relays this message to the relational operator that produced the sender
-WorkOrder. The relational operator uses this message to update its internal
-state to potentially generate newer WorkOrders.
+WorkOrder. The relational operator uses this message to update its internal
+state to potentially generate newer WorkOrders.
+
+### PoisonMessage
+This message is used to terminate a thread (i.e., Foreman and Worker), typically
+when shutting down the Quickstep process.
## How does the Foreman react after receiving various messages?
### WorkOrder completion message
* Update the book-keeping of pending WorkOrders per Worker and per operator.
* Fetch new WorkOrders if available for the operator of whose WorkOrder was
-just executed.
+just executed.
* Update the state of an operator - the possible options are:
- Normal WorkOrders are still under execution
- All normal WorkOrders have finished execution and rebuild WorkOrders are yet
- to be generated.
+ to be generated.
- All normal WorkOrders have finished execution, rebuild WorkOrders have been
- generated and issued to Workers.
+ generated and issued to Workers.
- All normal and rebuild WorkOrders have been executed AND all the dependency
- operators for the given operator have finished execution, therefore the given
- operator has finished its execution.
-* Fetch the WorkOrders from the dependents of the given operator.
+ operators for the given operator have finished execution, therefore the given
+ operator has finished its execution.
+* Fetch the WorkOrders from the dependents of the given operator.
### Rebuild WorkOrder completion message
* Update the book-keeping of pending WorkOrders per Worker and per operator.
* If all the rebuild WorkOrders have finished their execution, try to fetch the
WorkOrders of the dependent operators of the operator whose rebuild WorkOrder
-was just executed.
+was just executed.
### Data pipeline message
-* Find the consumer operators (i.e. operators which have a non
-pipeline-breaking link) of the producer operator.
-* Stream the block ID to the eligible consumer operators.
-* Fetch new WorkOrders from these consumer operators which may have become
-available because of the streaming of data.
+* Find the consumer operators (i.e. operators which have a non
+pipeline-breaking link) of the producer operator.
+* Stream the block ID to the eligible consumer operators.
+* Fetch new WorkOrders from these consumer operators which may have become
+available because of the streaming of data.
### WorkOrder available message
* Fetch new WorkOrders that may have become available.
### Feedback message
-* Relay the feedback message to a specified relational operator. The recipient
-operator is specified in the header of the message.
+* Relay the feedback message to a specified relational operator. The recipient
+operator is specified in the header of the message.
## Example
-We look at a sample query to better describe the flow of messages -
+We look at a sample query to better describe the flow of messages -
SELECT R.a, S.b from R, S where R.a = S.a and R.c < 20;
-This is an equi-join query which can be implemented using a hash join. We assume
-that S is a larger relation and the build relation is the output of the
+This is an equi-join query which can be implemented using a hash join. We assume
+that S is a larger relation and the build relation is the output of the
selection on R.
The query execution plan involves the following operators:
-* SelectOperator to filter R based on predicate R.c < 20 (We call the output as
-R')
+* SelectOperator to filter R based on predicate R.c < 20 (We call the output as
+R')
* BuildHashOperator to construct a hash table on R'
* HashJoinOperator to probe the hash table, where the probe relation is S
* DestroyHashTableOperator to destroy the hash table after the join is done
-* Multiple DropTableOperators to destroy the temporaray relations produced as
-output.
+* Multiple DropTableOperators to destroy the temporaray relations produced as
+output.
R has two blocks with IDs as 1 and 2. S has two blocks with IDs as 3 and 4.
-We assume that the SelectOperator produces one filled block and one partially
-filled block as output. Note that in the query plan DAG, the link between
-SelectOperator and BuildHashOperator allows streaming of data. The
-HashJoinOperator's WorkOrder can't be generated unless all of the
+We assume that the SelectOperator produces one filled block and one partially
+filled block as output. Note that in the query plan DAG, the link between
+SelectOperator and BuildHashOperator allows streaming of data. The
+HashJoinOperator's WorkOrder can't be generated unless all of the
BuildHashOperator's WorkOrders have finished their execution. The execution is
-assumed to be performed by a single Worker thread.
+assumed to be performed by a single Worker thread.
-The following table describes the message exchange that happens during the
-query excution. We primarily focus on three operators - Select, BuildHash and
-HashJoin (probe).
+The following table describes the message exchange that happens during the
+query excution. We primarily focus on three operators - Select, BuildHash and
+HashJoin (probe).
| Sender | Receiver | Message | Message Description |
|:-----------------:|----------|---------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/WorkerMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkerMessage.hpp b/query_execution/WorkerMessage.hpp
index 560c1ba..a0434de 100644
--- a/query_execution/WorkerMessage.hpp
+++ b/query_execution/WorkerMessage.hpp
@@ -35,7 +35,6 @@ class WorkerMessage {
enum class WorkerMessageType {
kRebuildWorkOrder = 0,
kWorkOrder,
- kPoison
};
/**
@@ -70,15 +69,6 @@ class WorkerMessage {
}
/**
- * @brief A static factory method for generating a poison message.
- *
- * @return The constructed PoisonMessage.
- **/
- static WorkerMessage* PoisonMessage() {
- return new WorkerMessage(nullptr, 0, WorkerMessageType::kPoison);
- }
-
- /**
* @brief Destructor.
**/
~WorkerMessage() {
@@ -128,8 +118,7 @@ class WorkerMessage {
/**
* @brief Constructor.
*
- * @param work_unit The work order to be executed by the worker. A NULL
- * workorder indicates a poison message.
+ * @param work_unit The work order to be executed by the worker.
* @param relational_op_index The index of the relational operator in the
* query plan DAG that generated the given WorkOrder.
* @param type Type of the WorkerMessage.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/tests/BlockLocator_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp
index fe7b86b..fd25e9e 100644
--- a/query_execution/tests/BlockLocator_unittest.cpp
+++ b/query_execution/tests/BlockLocator_unittest.cpp
@@ -90,16 +90,7 @@ class BlockLocatorTest : public ::testing::Test {
virtual void TearDown() {
storage_manager_.reset();
- serialization::EmptyMessage proto;
-
- const int proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kPoisonMessage);
- free(proto_bytes);
+ TaggedMessage message(kPoisonMessage);
LOG(INFO) << "Worker (id '" << worker_client_id_
<< "') sent PoisonMessage (typed '" << kPoisonMessage
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/storage/tests/DataExchange_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/DataExchange_unittest.cpp b/storage/tests/DataExchange_unittest.cpp
index 38d12f6..4bad17b 100644
--- a/storage/tests/DataExchange_unittest.cpp
+++ b/storage/tests/DataExchange_unittest.cpp
@@ -105,16 +105,7 @@ class DataExchangeTest : public ::testing::Test {
data_exchanger_expected_.shutdown();
storage_manager_expected_.reset();
- serialization::EmptyMessage proto;
-
- const int proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kPoisonMessage);
- free(proto_bytes);
+ TaggedMessage message(kPoisonMessage);
LOG(INFO) << "Worker (id '" << worker_client_id_
<< "') sent PoisonMessage (typed '" << kPoisonMessage