You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/13 02:04:30 UTC
[1/2] incubator-quickstep git commit: Introduced QueryManagerBase,
and renamed QueryManagerSingleNode.
Repository: incubator-quickstep
Updated Branches:
refs/heads/master 7671a5893 -> b4e25edce
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/tests/QueryManagerSingleNode_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
new file mode 100644
index 0000000..52cee20
--- /dev/null
+++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
@@ -0,0 +1,942 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include <climits>
+#include <cstdlib>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryContext.pb.h"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryManagerSingleNode.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryPlan.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/InsertDestination.hpp"
+#include "storage/InsertDestination.pb.h"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/DAG.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/tagged_message.h"
+
+namespace tmb { class MessageBus; }
+
+using std::move;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::client_id;
+
+namespace quickstep {
+
+class WorkOrderProtosContainer;
+
+class MockWorkOrder : public WorkOrder {
+ public:
+ explicit MockWorkOrder(const int op_index)
+ : WorkOrder(0), op_index_(op_index) {}
+
+ void execute() override {
+ VLOG(3) << "WorkOrder[" << op_index_ << "] executing.";
+ }
+
+ inline QueryPlan::DAGNodeIndex getOpIndex() const {
+ return op_index_;
+ }
+
+ private:
+ const QueryPlan::DAGNodeIndex op_index_;
+
+ DISALLOW_COPY_AND_ASSIGN(MockWorkOrder);
+};
+
+class MockOperator: public RelationalOperator {
+ public:
+ enum function_name {
+ kFeedInputBlock = 0,
+ kFeedInputBlocks,
+ kDoneFeedingInputBlocks,
+ kGetAllWorkOrders
+ };
+
+ MockOperator(const bool produce_workorders,
+ const bool has_streaming_input,
+ const int max_getworkorder_iters = 1,
+ const int max_workorders = INT_MAX)
+ : RelationalOperator(0 /* Query Id */),
+ produce_workorders_(produce_workorders),
+ has_streaming_input_(has_streaming_input),
+ max_workorders_(max_workorders),
+ max_getworkorder_iters_(max_getworkorder_iters),
+ num_calls_get_workorders_(0),
+ num_workorders_generated_(0),
+ num_calls_feedblock_(0),
+ num_calls_feedblocks_(0),
+ num_calls_donefeedingblocks_(0) {
+ }
+
+#define MOCK_OP_LOG(x) VLOG(x) << "Op[" << op_index_ << "]: " << __func__ << ": "
+
+ // The methods below are used to check whether QueryManager calls the Relational
+ // operator, how many times it calls a particular method etc.
+ inline int getNumWorkOrders() const {
+ return num_workorders_generated_;
+ }
+
+ inline int getNumCalls(const function_name fname) const {
+ switch (fname) {
+ case kFeedInputBlock:
+ return num_calls_feedblock_;
+ case kFeedInputBlocks:
+ return num_calls_feedblocks_;
+ case kDoneFeedingInputBlocks:
+ return num_calls_donefeedingblocks_;
+ case kGetAllWorkOrders:
+ return num_calls_get_workorders_;
+ default:
+ return -1;
+ }
+ }
+
+ inline bool getBlockingDependenciesMet() const {
+ MOCK_OP_LOG(3) << "met.";
+ return blocking_dependencies_met_;
+ }
+
+ void setInsertDestinationID(const QueryContext::insert_destination_id insert_destination_index) {
+ insert_destination_index_ = insert_destination_index;
+ }
+
+ // Mock to trigger doneFeedingInputBlocks for the dependent operators
+ // in QueryManager::markOperatorFinished.
+ void setOutputRelationID(const relation_id rel_id) {
+ output_relation_id_ = rel_id;
+ }
+
+ // Override methods from the base class.
+ bool getAllWorkOrders(
+ WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id foreman_client_id,
+ tmb::MessageBus *bus) override {
+ ++num_calls_get_workorders_;
+ if (produce_workorders_) {
+ if (has_streaming_input_) {
+ if ((num_calls_feedblock_ > 0 || num_calls_feedblocks_ > 0) && (num_workorders_generated_ < max_workorders_)) {
+ MOCK_OP_LOG(3) << "[stream] generate WorkOrder";
+ container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
+ ++num_workorders_generated_;
+ }
+ } else {
+ if (blocking_dependencies_met_ && (num_workorders_generated_ < max_workorders_)) {
+ MOCK_OP_LOG(3) << "[static] generate WorkOrder";
+ container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
+ ++num_workorders_generated_;
+ }
+ }
+ }
+ MOCK_OP_LOG(3) << "count(" << num_calls_get_workorders_ << ") "
+ << "return(" << (num_calls_get_workorders_ == max_getworkorder_iters_) << ")";
+ return num_calls_get_workorders_ == max_getworkorder_iters_;
+ }
+
+ bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override {
+ return true;
+ }
+
+ void feedInputBlock(const block_id input_block_id,
+ const relation_id input_relation_id) override {
+ ++num_calls_feedblock_;
+ MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")";
+ }
+
+ void feedInputBlocks(const relation_id rel_id,
+ std::vector<block_id> *partially_filled_blocks) override {
+ ++num_calls_feedblocks_;
+ MOCK_OP_LOG(3) << "count(" << num_calls_feedblocks_ << ")";
+ }
+
+ void doneFeedingInputBlocks(const relation_id rel_id) override {
+ ++num_calls_donefeedingblocks_;
+ MOCK_OP_LOG(3) << "count(" << num_calls_donefeedingblocks_ << ")";
+ }
+
+ QueryContext::insert_destination_id getInsertDestinationID() const override {
+ return insert_destination_index_;
+ }
+
+ const relation_id getOutputRelationID() const override {
+ return output_relation_id_;
+ }
+
+ private:
+ const bool produce_workorders_;
+ const bool has_streaming_input_;
+ const int max_workorders_;
+ const int max_getworkorder_iters_;
+
+ int num_calls_get_workorders_;
+ int num_workorders_generated_;
+ int num_calls_feedblock_;
+ int num_calls_feedblocks_;
+ int num_calls_donefeedingblocks_;
+
+ QueryContext::insert_destination_id insert_destination_index_ = QueryContext::kInvalidInsertDestinationId;
+
+ relation_id output_relation_id_ = -1;
+
+#undef MOCK_OP_LOG
+
+ DISALLOW_COPY_AND_ASSIGN(MockOperator);
+};
+
+
+class QueryManagerTest : public ::testing::Test {
+ protected:
+ virtual void SetUp() {
+ db_.reset(new CatalogDatabase(nullptr /* catalog */, "database"));
+ storage_manager_.reset(new StorageManager("./"));
+ bus_.Initialize();
+ query_handle_.reset(new QueryHandle(0)); // dummy query ID.
+ query_plan_ = query_handle_->getQueryPlanMutable();
+ query_handle_->getQueryContextProtoMutable()->set_query_id(query_handle_->query_id());
+ }
+
+ inline void constructQueryManager() {
+ query_manager_.reset(new QueryManagerSingleNode(
+ 0, 1, query_handle_.get(), db_.get(), storage_manager_.get(), &bus_));
+ }
+
+ inline const int getNumWorkOrdersInExecution(const QueryPlan::DAGNodeIndex index) const {
+ return query_manager_->getQueryExecutionState().getNumQueuedWorkOrders(index);
+ }
+
+ inline const int getNumOperatorsFinished() const {
+ return query_manager_->getQueryExecutionState().getNumOperatorsFinished();
+ }
+
+ inline bool getOperatorFinishedStatus(const QueryPlan::DAGNodeIndex index) const {
+ return query_manager_->getQueryExecutionState().hasExecutionFinished(index);
+ }
+
+ inline bool placeDataPipelineMessage(const QueryPlan::DAGNodeIndex source_operator_index) {
+ VLOG(3) << "Place DataPipeline message for Op[" << source_operator_index << "]";
+ serialization::DataPipelineMessage proto;
+ proto.set_operator_index(source_operator_index);
+
+ proto.set_block_id(0); // dummy block ID
+ proto.set_relation_id(0); // dummy relation ID.
+ proto.set_query_id(0); // dummy query ID.
+
+ // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
+ const std::size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
+ proto_length,
+ kDataPipelineMessage);
+ std::free(proto_bytes);
+ query_manager_->processMessage(tagged_message);
+ return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
+ }
+
+ inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
+ VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]";
+ TaggedMessage tagged_message;
+ serialization::NormalWorkOrderCompletionMessage proto;
+ proto.set_operator_index(index);
+ proto.set_worker_thread_index(1); // dummy worker ID.
+ proto.set_query_id(0); // dummy query ID.
+
+ // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kWorkOrderCompleteMessage);
+ std::free(proto_bytes);
+ query_manager_->processMessage(message);
+
+ return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
+ }
+
+ inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
+ VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]";
+ serialization::RebuildWorkOrderCompletionMessage proto;
+ proto.set_operator_index(index);
+ proto.set_worker_thread_index(1); // dummy worker thread ID.
+ proto.set_query_id(0); // dummy query ID.
+
+ // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
+ const size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ TaggedMessage message(static_cast<const void*>(proto_bytes),
+ proto_length,
+ kRebuildWorkOrderCompleteMessage);
+
+ std::free(proto_bytes);
+ query_manager_->processMessage(message);
+
+ return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
+ }
+
+ inline bool placeOutputBlockMessage(const QueryPlan::DAGNodeIndex index) {
+ VLOG(3) << "Place OutputBlock message for Op[" << index << "]";
+ serialization::DataPipelineMessage proto;
+ proto.set_operator_index(index);
+
+ proto.set_block_id(0); // dummy block ID
+ proto.set_relation_id(0); // dummy relation ID.
+ proto.set_query_id(0); // dummy query ID.
+
+ // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
+ const std::size_t proto_length = proto.ByteSize();
+ char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+ CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+ tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
+ proto_length,
+ kDataPipelineMessage);
+ std::free(proto_bytes);
+ query_manager_->processMessage(tagged_message);
+ return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
+ }
+
+ unique_ptr<CatalogDatabase> db_;
+ unique_ptr<StorageManager> storage_manager_;
+
+ QueryPlan *query_plan_;
+ unique_ptr<QueryHandle> query_handle_;
+ unique_ptr<QueryManagerSingleNode> query_manager_;
+
+ MessageBusImpl bus_;
+
+ client_id worker_client_id_;
+
+ unique_ptr<WorkerDirectory> workers_;
+};
+
+TEST_F(QueryManagerTest, SingleNodeDAGNoWorkOrdersTest) {
+ // This test creates a DAG of a single node. No workorders are generated.
+ query_plan_->addRelationalOperator(new MockOperator(false, false));
+
+ const MockOperator &op = static_cast<const MockOperator &>(
+ query_plan_->getQueryPlanDAG().getNodePayload(0));
+
+ constructQueryManager();
+
+ // op doesn't have any dependencies.
+ EXPECT_TRUE(op.getBlockingDependenciesMet());
+
+ // We expect one call for op's getAllWorkOrders().
+ EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
+ EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
+}
+
+TEST_F(QueryManagerTest, SingleNodeDAGStaticWorkOrdersTest) {
+ // This test creates a DAG of a single node. Static workorders are generated.
+ const QueryPlan::DAGNodeIndex id =
+ query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
+
+ const MockOperator &op = static_cast<const MockOperator &>(
+ query_plan_->getQueryPlanDAG().getNodePayload(id));
+
+ constructQueryManager();
+
+ // op doesn't have any dependencies.
+ EXPECT_TRUE(op.getBlockingDependenciesMet());
+
+ // We expect one call for op's getAllWorkOrders().
+ EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
+ EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
+
+ // One workorder is generated.
+ EXPECT_EQ(1, op.getNumWorkOrders());
+
+ unique_ptr<WorkerMessage> worker_message;
+ worker_message.reset(query_manager_->getNextWorkerMessage(0, -1));
+ EXPECT_TRUE(worker_message != nullptr);
+
+ EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+ worker_message->getType());
+ EXPECT_EQ(0u, worker_message->getRelationalOpIndex());
+
+ delete worker_message->getWorkOrder();
+
+ EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
+ EXPECT_EQ(0, getNumOperatorsFinished());
+
+ // Send a message to QueryManager upon workorder completion.
+ // Last event processed by QueryManager.
+ EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
+
+ EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
+ EXPECT_EQ(1, getNumOperatorsFinished());
+ EXPECT_TRUE(getOperatorFinishedStatus(id));
+}
+
+TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
+ // This test creates a DAG of a single node. WorkOrders are generated
+ // dynamically as pending work orders complete execution, i.e.,
+ // getAllWorkOrders() is called multiple times. getAllWorkOrders() will be
+ // called 5 times and 3 work orders will be returned, i.e., 1st 3 calls to
+ // getAllWorkOrders() insert 1 WorkOrder and return false, and the next will
+ // insert no WorkOrder and return true.
+
+ // TODO(shoban): This test can not be more robust than this because of fixed
+ // scaffolding of mocking. If we use gMock, we can do much better.
+ const QueryPlan::DAGNodeIndex id =
+ query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3));
+
+ const MockOperator &op = static_cast<const MockOperator &>(
+ query_plan_->getQueryPlanDAG().getNodePayload(id));
+
+ constructQueryManager();
+
+ // op doesn't have any dependencies.
+ EXPECT_TRUE(op.getBlockingDependenciesMet());
+
+ for (int i = 0; i < 3; ++i) {
+ // We expect one call for op's getAllWorkOrders().
+ EXPECT_EQ(i + 1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
+
+ // One workorder is generated.
+ // EXPECT_EQ(1, getWorkerInputQueueSize());
+ EXPECT_EQ(i + 1, op.getNumWorkOrders());
+
+ unique_ptr<WorkerMessage> worker_message;
+ worker_message.reset(query_manager_->getNextWorkerMessage(id, -1));
+
+ EXPECT_TRUE(worker_message != nullptr);
+ EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+ worker_message->getType());
+ EXPECT_EQ(id, worker_message->getRelationalOpIndex());
+
+ delete worker_message->getWorkOrder();
+
+ EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
+ EXPECT_EQ(0, getNumOperatorsFinished());
+
+ if (i < 2) {
+ // Send a message to QueryManager upon workorder completion.
+ EXPECT_FALSE(placeWorkOrderCompleteMessage(id));
+ } else {
+ // Send a message to QueryManager upon workorder completion.
+ // Last event.
+ EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
+ }
+ }
+
+ EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
+
+ EXPECT_EQ(1, getNumOperatorsFinished());
+ EXPECT_TRUE(getOperatorFinishedStatus(id));
+
+ // We place this check in the end, since it's true throughout the test.
+ EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
+ EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
+}
+
+TEST_F(QueryManagerTest, TwoNodesDAGBlockingLinkTest) {
+ // We use two nodes in the DAG with a blocking link between them.
+ // There is no streaming of data involved in this test.
+ const QueryPlan::DAGNodeIndex id1 =
+ query_plan_->addRelationalOperator(new MockOperator(true, false));
+ const QueryPlan::DAGNodeIndex id2 =
+ query_plan_->addRelationalOperator(new MockOperator(true, false));
+
+ // Create a blocking link.
+ query_plan_->addDirectDependency(id2, id1, true);
+
+ static_cast<MockOperator *>(
+ query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
+ ->setOutputRelationID(0xdead);
+
+ const MockOperator &op1 = static_cast<const MockOperator &>(
+ query_plan_->getQueryPlanDAG().getNodePayload(id1));
+ const MockOperator &op2 = static_cast<const MockOperator &>(
+ query_plan_->getQueryPlanDAG().getNodePayload(id2));
+
+ constructQueryManager();
+
+ // op1 doesn't have any dependencies
+ EXPECT_TRUE(op1.getBlockingDependenciesMet());
+
+ // Only op1 should receive a call to getAllWorkOrders initially.
+ EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
+ EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
+
+ EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
+ EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
+
+ // Only op1 should produce a workorder.
+ EXPECT_EQ(1, op1.getNumWorkOrders());
+ EXPECT_EQ(0, op2.getNumWorkOrders());
+
+ // Foreman hasn't yet got workorder completion response for the workorder.
+ unique_ptr<WorkerMessage> worker_message;
+ worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
+
+ EXPECT_TRUE(worker_message != nullptr);
+ EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+ worker_message->getType());
+ EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
+
+ delete worker_message->getWorkOrder();
+
+ EXPECT_EQ(1, getNumWorkOrdersInExecution(id1));
+ EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
+ EXPECT_EQ(0, getNumOperatorsFinished());
+
+ // Send a message to Foreman upon workorder (generated by op1) completion.
+ EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
+
+ EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
+ // op1 is over now, op2 still to go.
+ EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+ EXPECT_EQ(1, getNumOperatorsFinished());
+
+ EXPECT_TRUE(getOperatorFinishedStatus(id1));
+ EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+ EXPECT_FALSE(getOperatorFinishedStatus(id2));
+
+ worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
+ EXPECT_TRUE(worker_message != nullptr);
+ EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+ worker_message->getType());
+ EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
+
+ delete worker_message->getWorkOrder();
+
+ EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
+
+ // op1 is op2's blocking dependency.
+ EXPECT_TRUE(op2.getBlockingDependenciesMet());
+
+ EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+ // op2 should get first call of getAllWorkOrders() when op1 is over.
+ EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+
+ EXPECT_EQ(1, op2.getNumWorkOrders());
+
+ // Send a message to QueryManager upon workorder (generated by op2) completion.
+ // Note that the worker hasn't yet popped the workorder. Usually this won't
+ // happen as workers pop workorders first, execute and then send the response.
+ EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
+
+ EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+
+ EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
+ EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
+
+ EXPECT_EQ(2, getNumOperatorsFinished());
+ EXPECT_TRUE(getOperatorFinishedStatus(id1));
+ EXPECT_TRUE(getOperatorFinishedStatus(id2));
+
+ // Expect no additional calls to getAllWorkOrders.
+ EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+}
+
+TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) {
+ // We use two nodes in the DAG with a non-blocking link between them.
+ // We stream output of op1 to op2. Sequeuce of events is as follows:
+ // 1. op1 creates a workorder.
+ // 2. We send a "block full" (from op1) to QueryManager.
+ // 3. op2 creates a workorder because of step 2.
+ const QueryPlan::DAGNodeIndex id1 =
+ query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
+ const QueryPlan::DAGNodeIndex id2 =
+ query_plan_->addRelationalOperator(new MockOperator(true, true, 3));
+
+ // Create a non-blocking link.
+ query_plan_->addDirectDependency(id2, id1, false);
+
+ static_cast<MockOperator *>(
+ query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
+ ->setOutputRelationID(0xdead);
+
+ const MockOperator &op1 = static_cast<const MockOperator &>(
+ query_plan_->getQueryPlanDAG().getNodePayload(id1));
+ const MockOperator &op2 = static_cast<const MockOperator &>(
+ query_plan_->getQueryPlanDAG().getNodePayload(id2));
+
+ constructQueryManager();
+
+ // As none of the operators have a blocking link, blocking dependencies should
+ // be met.
+ EXPECT_TRUE(op1.getBlockingDependenciesMet());
+ EXPECT_TRUE(op2.getBlockingDependenciesMet());
+
+ EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(1, op1.getNumWorkOrders());
+ EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
+ EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
+
+ EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+ // op2 will generate workorder only after receiving a streaming input.
+ EXPECT_EQ(0, op2.getNumWorkOrders());
+ EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
+ EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
+
+ unique_ptr<WorkerMessage> worker_message;
+ worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
+
+ EXPECT_TRUE(worker_message != nullptr);
+ EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+ worker_message->getType());
+ EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
+
+ delete worker_message->getWorkOrder();
+
+ // Send a message to QueryManager upon block getting full (output of op1).
+ EXPECT_FALSE(placeOutputBlockMessage(id1));
+
+ // op1 is not finished yet because the response of workorder completion hasn't
+ // been received yet by the QueryManager.
+ EXPECT_FALSE(getOperatorFinishedStatus(id1));
+ EXPECT_FALSE(getOperatorFinishedStatus(id2));
+
+ // No additional call to op1's getAllWorkOrders.
+ EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
+ EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
+
+ // Output from op1 should be fed to op2.
+ EXPECT_EQ(1, op2.getNumCalls(MockOperator::kFeedInputBlock));
+ EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
+
+ // A call to op2's getAllWorkOrders because of the streamed input.
+ EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(1, op2.getNumWorkOrders());
+
+ // Place a message of a workorder completion of op1 on Foreman's input queue.
+ EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
+
+ EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
+ EXPECT_TRUE(getOperatorFinishedStatus(id1));
+ EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+
+ // An additional call to op2's getAllWorkOrders because of completion of op1.
+ EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(2, op2.getNumWorkOrders());
+
+ worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
+
+ EXPECT_TRUE(worker_message != nullptr);
+ EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+ worker_message->getType());
+ EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
+
+ delete worker_message->getWorkOrder();
+
+ // Place a message of a workorder completion of op2 on Foreman's input queue.
+ EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));
+
+ EXPECT_TRUE(getOperatorFinishedStatus(id1));
+
+ worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
+
+ EXPECT_TRUE(worker_message != nullptr);
+ EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+ worker_message->getType());
+ EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
+
+ delete worker_message->getWorkOrder();
+
+ EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
+ EXPECT_FALSE(getOperatorFinishedStatus(id2));
+
+ // Send a message to Foreman upon workorder (generated by op2) completion.
+ EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
+
+ EXPECT_TRUE(getOperatorFinishedStatus(id1));
+ EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+
+ EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
+ EXPECT_TRUE(getOperatorFinishedStatus(id2));
+}
+
+TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) {
+ // In this test, we create a 2-node DAG with a non-blocking link between them.
+ // There is no streaming of data from op1 to op2 during the execution of op1.
+ // op1 produces a partially filled block at the end of its execution which is
+ // rebuilt and then fed to op2.
+ const QueryPlan::DAGNodeIndex id1 =
+ query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
+ const QueryPlan::DAGNodeIndex id2 =
+ query_plan_->addRelationalOperator(new MockOperator(true, true, 3, 1));
+
+ // Create a non-blocking link.
+ query_plan_->addDirectDependency(id2, id1, false);
+
+ // Create a relation, owned by db_.*/
+ CatalogRelation *relation =
+ new CatalogRelation(nullptr /* catalog_database */, "test_relation");
+ const relation_id output_relation_id = db_->addRelation(relation);
+
+ // Setup the InsertDestination proto in the query context proto.
+ serialization::QueryContext *query_context_proto =
+ query_handle_->getQueryContextProtoMutable();
+
+ const QueryContext::insert_destination_id insert_destination_index =
+ query_context_proto->insert_destinations_size();
+ serialization::InsertDestination *insert_destination_proto =
+ query_context_proto->add_insert_destinations();
+
+ insert_destination_proto->set_insert_destination_type(
+ serialization::InsertDestinationType::BLOCK_POOL);
+ insert_destination_proto->set_relation_id(output_relation_id);
+ insert_destination_proto->set_relational_op_index(id1);
+
+ MockOperator *op1_mutable = static_cast<MockOperator *>(
+ query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1));
+ op1_mutable->setInsertDestinationID(insert_destination_index);
+ op1_mutable->setOutputRelationID(output_relation_id);
+
+ const MockOperator &op1 = static_cast<const MockOperator &>(
+ query_plan_->getQueryPlanDAG().getNodePayload(id1));
+ const MockOperator &op2 = static_cast<const MockOperator &>(
+ query_plan_->getQueryPlanDAG().getNodePayload(id2));
+
+ constructQueryManager();
+
+ // NOTE(zuyu): An operator generally has no ideas about partially filled
+ // blocks, but InsertDestination in QueryContext does.
+ // Mock to add partially filled blocks in the InsertDestination.
+ InsertDestination *insert_destination =
+ query_manager_->getQueryContextMutable()->getInsertDestination(
+ insert_destination_index);
+ DCHECK(insert_destination != nullptr);
+ MutableBlockReference block_ref;
+ static_cast<BlockPoolInsertDestination *>(insert_destination)
+ ->available_block_refs_.push_back(move(block_ref));
+
+ // There's no blocking dependency in the DAG.
+ EXPECT_TRUE(op1.getBlockingDependenciesMet());
+ EXPECT_TRUE(op2.getBlockingDependenciesMet());
+
+ EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(1, op1.getNumWorkOrders());
+
+ EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(0, op2.getNumWorkOrders());
+
+ unique_ptr<WorkerMessage> worker_message;
+ worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
+
+ EXPECT_TRUE(worker_message != nullptr);
+ EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+ worker_message->getType());
+ EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
+
+ delete worker_message->getWorkOrder();
+
+ // Send a message to QueryManager upon workorder (generated by op1) completion.
+ EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
+
+ EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
+
+ worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
+ EXPECT_TRUE(worker_message != nullptr);
+ EXPECT_EQ(WorkerMessage::WorkerMessageType::kRebuildWorkOrder,
+ worker_message->getType());
+
+ EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
+
+ delete worker_message->getWorkOrder();
+
+ // op1 generates a rebuild workorder. The block is rebuilt and streamed
+ // to Foreman.
+ EXPECT_FALSE(placeDataPipelineMessage(id1));
+
+ EXPECT_FALSE(placeRebuildWorkOrderCompleteMessage(id1));
+ // Based on the streamed input, op2's getAllWorkOrders should produce a
+ // workorder.
+ EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+ EXPECT_EQ(1, op2.getNumWorkOrders());
+
+ worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
+
+ EXPECT_TRUE(worker_message != nullptr);
+ EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+ worker_message->getType());
+
+ EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
+
+ delete worker_message->getWorkOrder();
+
+ EXPECT_TRUE(getOperatorFinishedStatus(id1));
+ EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+ EXPECT_FALSE(getOperatorFinishedStatus(id2));
+ EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
+
+ // Send a message to QueryManager upon workorder (generated by op2) completion.
+ EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
+
+ EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
+
+ EXPECT_TRUE(getOperatorFinishedStatus(id2));
+}
+
+TEST_F(QueryManagerTest, MultipleNodesNoOutputTest) {
+ // When an operator produces workorders but no output, the QueryManager should
+ // check the dependents of this operator to make progress.
+ const QueryPlan::DAGNodeIndex kNumNodes = 5;
+ std::vector<QueryPlan::DAGNodeIndex> ids;
+ ids.reserve(kNumNodes);
+
+ for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
+ if (i == 0) {
+ ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, false));
+ } else {
+ ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, true));
+ }
+ VLOG(3) << ids[i];
+ }
+
+ /**
+ * The DAG looks like this:
+ *
+ * op1 -> op2 -> op3 -> op4 -> op5
+ *
+ **/
+ for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes - 1; ++i) {
+ query_plan_->addDirectDependency(ids[i + 1], ids[i], false);
+ static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(ids[i]))
+ ->setOutputRelationID(0xdead);
+ }
+
+ std::vector<const MockOperator*> operators;
+ for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
+ operators.push_back(static_cast<const MockOperator*>(&query_plan_->getQueryPlanDAG().getNodePayload(ids[i])));
+ }
+
+ constructQueryManager();
+
+ // operators[0] should have produced a workorder by now.
+ EXPECT_EQ(1, operators[0]->getNumWorkOrders());
+
+ unique_ptr<WorkerMessage> worker_message;
+ worker_message.reset(query_manager_->getNextWorkerMessage(ids[0], -1));
+
+ EXPECT_TRUE(worker_message != nullptr);
+ EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+ worker_message->getType());
+
+ EXPECT_EQ(ids[0], worker_message->getRelationalOpIndex());
+
+ delete worker_message->getWorkOrder();
+
+ EXPECT_EQ(1, getNumWorkOrdersInExecution(ids[0]));
+ EXPECT_FALSE(getOperatorFinishedStatus(ids[0]));
+
+ for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
+ EXPECT_EQ(1, operators[ids[i]]->getNumCalls(MockOperator::kGetAllWorkOrders));
+ }
+
+ // Send a message to QueryManager upon workorder (generated by operators[0])
+ // completion.
+ EXPECT_TRUE(placeWorkOrderCompleteMessage(ids[0]));
+
+ for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
+ EXPECT_EQ(0, getNumWorkOrdersInExecution(ids[i]));
+ EXPECT_TRUE(getOperatorFinishedStatus(ids[i]));
+ if (i < kNumNodes - 1) {
+ EXPECT_EQ(1, operators[i + 1]->getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+ }
+ }
+}
+
+TEST_F(QueryManagerTest, OutOfOrderWorkOrderCompletionTest) {
+ // Consider two operators, both generate one workorder each. The dependent's
+ // workorder finishes before dependency's workorder.
+ const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
+ const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1));
+
+ // Create a non-blocking link.
+ query_plan_->addDirectDependency(id2, id1, false);
+
+ constructQueryManager();
+
+ unique_ptr<WorkerMessage> worker_message;
+ worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
+
+ EXPECT_TRUE(worker_message != nullptr);
+ EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+ worker_message->getType());
+
+ EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
+
+ delete worker_message->getWorkOrder();
+
+ // Send a message to QueryManager upon a block (output of op1) getting full.
+ EXPECT_FALSE(placeOutputBlockMessage(id1));
+
+ // op1 is not finished yet because the response of workorder completion hasn't
+ // been received yet.
+ EXPECT_FALSE(getOperatorFinishedStatus(id1));
+ EXPECT_FALSE(getOperatorFinishedStatus(id2));
+
+ worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
+ EXPECT_TRUE(worker_message != nullptr);
+ EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+ worker_message->getType());
+
+ EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
+
+ delete worker_message->getWorkOrder();
+
+ // As mentioned earlier, op2 finishes before op1.
+ EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));
+
+ // op1's workorder execution is over.
+ EXPECT_TRUE(placeWorkOrderCompleteMessage(id1));
+
+ EXPECT_TRUE(getOperatorFinishedStatus(id1));
+ EXPECT_TRUE(getOperatorFinishedStatus(id2));
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/tests/QueryManager_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManager_unittest.cpp b/query_execution/tests/QueryManager_unittest.cpp
deleted file mode 100644
index 37e2cdd..0000000
--- a/query_execution/tests/QueryManager_unittest.cpp
+++ /dev/null
@@ -1,940 +0,0 @@
-/**
- * Copyright 2011-2015 Quickstep Technologies LLC.
- * Copyright 2015-2016 Pivotal Software, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#include <climits>
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryContext.pb.h"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryManager.hpp"
-#include "query_execution/WorkOrdersContainer.hpp"
-#include "query_execution/WorkerDirectory.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "query_optimizer/QueryHandle.hpp"
-#include "query_optimizer/QueryPlan.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/InsertDestination.hpp"
-#include "storage/InsertDestination.pb.h"
-#include "storage/StorageBlock.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageManager.hpp"
-#include "utility/DAG.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-#include "gtest/gtest.h"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::move;
-using std::unique_ptr;
-using std::vector;
-
-using tmb::client_id;
-
-namespace quickstep {
-
-class WorkOrderProtosContainer;
-
-class MockWorkOrder : public WorkOrder {
- public:
- explicit MockWorkOrder(const int op_index)
- : WorkOrder(0), op_index_(op_index) {}
-
- void execute() override {
- VLOG(3) << "WorkOrder[" << op_index_ << "] executing.";
- }
-
- inline QueryPlan::DAGNodeIndex getOpIndex() const {
- return op_index_;
- }
-
- private:
- const QueryPlan::DAGNodeIndex op_index_;
-
- DISALLOW_COPY_AND_ASSIGN(MockWorkOrder);
-};
-
-class MockOperator: public RelationalOperator {
- public:
- enum function_name {
- kFeedInputBlock = 0,
- kFeedInputBlocks,
- kDoneFeedingInputBlocks,
- kGetAllWorkOrders
- };
-
- MockOperator(const bool produce_workorders,
- const bool has_streaming_input,
- const int max_getworkorder_iters = 1,
- const int max_workorders = INT_MAX)
- : RelationalOperator(0 /* Query Id */),
- produce_workorders_(produce_workorders),
- has_streaming_input_(has_streaming_input),
- max_workorders_(max_workorders),
- max_getworkorder_iters_(max_getworkorder_iters),
- num_calls_get_workorders_(0),
- num_workorders_generated_(0),
- num_calls_feedblock_(0),
- num_calls_feedblocks_(0),
- num_calls_donefeedingblocks_(0) {
- }
-
-#define MOCK_OP_LOG(x) VLOG(x) << "Op[" << op_index_ << "]: " << __func__ << ": "
-
- // The methods below are used to check whether QueryManager calls the Relational
- // operator, how many times it calls a particular method etc.
- inline int getNumWorkOrders() const {
- return num_workorders_generated_;
- }
-
- inline int getNumCalls(const function_name fname) const {
- switch (fname) {
- case kFeedInputBlock:
- return num_calls_feedblock_;
- case kFeedInputBlocks:
- return num_calls_feedblocks_;
- case kDoneFeedingInputBlocks:
- return num_calls_donefeedingblocks_;
- case kGetAllWorkOrders:
- return num_calls_get_workorders_;
- default:
- return -1;
- }
- }
-
- inline bool getBlockingDependenciesMet() const {
- MOCK_OP_LOG(3) << "met.";
- return blocking_dependencies_met_;
- }
-
- void setInsertDestinationID(const QueryContext::insert_destination_id insert_destination_index) {
- insert_destination_index_ = insert_destination_index;
- }
-
- // Mock to trigger doneFeedingInputBlocks for the dependent operators
- // in QueryManager::markOperatorFinished.
- void setOutputRelationID(const relation_id rel_id) {
- output_relation_id_ = rel_id;
- }
-
- // Override methods from the base class.
- bool getAllWorkOrders(
- WorkOrdersContainer *container,
- QueryContext *query_context,
- StorageManager *storage_manager,
- const tmb::client_id foreman_client_id,
- tmb::MessageBus *bus) override {
- ++num_calls_get_workorders_;
- if (produce_workorders_) {
- if (has_streaming_input_) {
- if ((num_calls_feedblock_ > 0 || num_calls_feedblocks_ > 0) && (num_workorders_generated_ < max_workorders_)) {
- MOCK_OP_LOG(3) << "[stream] generate WorkOrder";
- container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
- ++num_workorders_generated_;
- }
- } else {
- if (blocking_dependencies_met_ && (num_workorders_generated_ < max_workorders_)) {
- MOCK_OP_LOG(3) << "[static] generate WorkOrder";
- container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
- ++num_workorders_generated_;
- }
- }
- }
- MOCK_OP_LOG(3) << "count(" << num_calls_get_workorders_ << ") "
- << "return(" << (num_calls_get_workorders_ == max_getworkorder_iters_) << ")";
- return num_calls_get_workorders_ == max_getworkorder_iters_;
- }
-
- bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override {
- return true;
- }
-
- void feedInputBlock(const block_id input_block_id,
- const relation_id input_relation_id) override {
- ++num_calls_feedblock_;
- MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")";
- }
-
- void feedInputBlocks(const relation_id rel_id,
- std::vector<block_id> *partially_filled_blocks) override {
- ++num_calls_feedblocks_;
- MOCK_OP_LOG(3) << "count(" << num_calls_feedblocks_ << ")";
- }
-
- void doneFeedingInputBlocks(const relation_id rel_id) override {
- ++num_calls_donefeedingblocks_;
- MOCK_OP_LOG(3) << "count(" << num_calls_donefeedingblocks_ << ")";
- }
-
- QueryContext::insert_destination_id getInsertDestinationID() const override {
- return insert_destination_index_;
- }
-
- const relation_id getOutputRelationID() const override {
- return output_relation_id_;
- }
-
- private:
- const bool produce_workorders_;
- const bool has_streaming_input_;
- const int max_workorders_;
- const int max_getworkorder_iters_;
-
- int num_calls_get_workorders_;
- int num_workorders_generated_;
- int num_calls_feedblock_;
- int num_calls_feedblocks_;
- int num_calls_donefeedingblocks_;
-
- QueryContext::insert_destination_id insert_destination_index_ = QueryContext::kInvalidInsertDestinationId;
-
- relation_id output_relation_id_ = -1;
-
-#undef MOCK_OP_LOG
-
- DISALLOW_COPY_AND_ASSIGN(MockOperator);
-};
-
-
-class QueryManagerTest : public ::testing::Test {
- protected:
- virtual void SetUp() {
- db_.reset(new CatalogDatabase(nullptr /* catalog */, "database"));
- storage_manager_.reset(new StorageManager("./"));
- bus_.Initialize();
- query_handle_.reset(new QueryHandle(0)); // dummy query ID.
- query_plan_ = query_handle_->getQueryPlanMutable();
- query_handle_->getQueryContextProtoMutable()->set_query_id(query_handle_->query_id());
- }
-
- inline void constructQueryManager() {
- query_manager_.reset(new QueryManager(
- 0, 1, query_handle_.get(), db_.get(), storage_manager_.get(), &bus_));
- }
-
- inline const int getNumWorkOrdersInExecution(const QueryPlan::DAGNodeIndex index) const {
- return query_manager_->getQueryExecutionState().getNumQueuedWorkOrders(index);
- }
-
- inline const int getNumOperatorsFinished() const {
- return query_manager_->getQueryExecutionState().getNumOperatorsFinished();
- }
-
- inline bool getOperatorFinishedStatus(const QueryPlan::DAGNodeIndex index) const {
- return query_manager_->getQueryExecutionState().hasExecutionFinished(index);
- }
-
- inline bool placeDataPipelineMessage(const QueryPlan::DAGNodeIndex source_operator_index) {
- VLOG(3) << "Place DataPipeline message for Op[" << source_operator_index << "]";
- serialization::DataPipelineMessage proto;
- proto.set_operator_index(source_operator_index);
-
- proto.set_block_id(0); // dummy block ID
- proto.set_relation_id(0); // dummy relation ID.
- proto.set_query_id(0); // dummy query ID.
-
- // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
- const std::size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
- proto_length,
- kDataPipelineMessage);
- std::free(proto_bytes);
- query_manager_->processMessage(tagged_message);
- return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
- }
-
- inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
- VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]";
- TaggedMessage tagged_message;
- serialization::NormalWorkOrderCompletionMessage proto;
- proto.set_operator_index(index);
- proto.set_worker_thread_index(1); // dummy worker ID.
- proto.set_query_id(0); // dummy query ID.
-
- // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kWorkOrderCompleteMessage);
- std::free(proto_bytes);
- query_manager_->processMessage(message);
-
- return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
- }
-
- inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
- VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]";
- serialization::RebuildWorkOrderCompletionMessage proto;
- proto.set_operator_index(index);
- proto.set_worker_thread_index(1); // dummy worker thread ID.
- proto.set_query_id(0); // dummy query ID.
-
- // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
- const size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kRebuildWorkOrderCompleteMessage);
-
- std::free(proto_bytes);
- query_manager_->processMessage(message);
-
- return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
- }
-
- inline bool placeOutputBlockMessage(const QueryPlan::DAGNodeIndex index) {
- VLOG(3) << "Place OutputBlock message for Op[" << index << "]";
- serialization::DataPipelineMessage proto;
- proto.set_operator_index(index);
-
- proto.set_block_id(0); // dummy block ID
- proto.set_relation_id(0); // dummy relation ID.
- proto.set_query_id(0); // dummy query ID.
-
- // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
- const std::size_t proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
- proto_length,
- kDataPipelineMessage);
- std::free(proto_bytes);
- query_manager_->processMessage(tagged_message);
- return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
- }
-
- unique_ptr<CatalogDatabase> db_;
- unique_ptr<StorageManager> storage_manager_;
-
- QueryPlan *query_plan_;
- unique_ptr<QueryHandle> query_handle_;
- unique_ptr<QueryManager> query_manager_;
-
- MessageBusImpl bus_;
-
- client_id worker_client_id_;
-
- unique_ptr<WorkerDirectory> workers_;
-};
-
-TEST_F(QueryManagerTest, SingleNodeDAGNoWorkOrdersTest) {
- // This test creates a DAG of a single node. No workorders are generated.
- query_plan_->addRelationalOperator(new MockOperator(false, false));
-
- const MockOperator &op = static_cast<const MockOperator &>(
- query_plan_->getQueryPlanDAG().getNodePayload(0));
-
- constructQueryManager();
-
- // op doesn't have any dependencies.
- EXPECT_TRUE(op.getBlockingDependenciesMet());
-
- // We expect one call for op's getAllWorkOrders().
- EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
-}
-
-TEST_F(QueryManagerTest, SingleNodeDAGStaticWorkOrdersTest) {
- // This test creates a DAG of a single node. Static workorders are generated.
- const QueryPlan::DAGNodeIndex id =
- query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
-
- const MockOperator &op = static_cast<const MockOperator &>(
- query_plan_->getQueryPlanDAG().getNodePayload(id));
-
- constructQueryManager();
-
- // op doesn't have any dependencies.
- EXPECT_TRUE(op.getBlockingDependenciesMet());
-
- // We expect one call for op's getAllWorkOrders().
- EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
-
- // One workorder is generated.
- EXPECT_EQ(1, op.getNumWorkOrders());
-
- unique_ptr<WorkerMessage> worker_message;
- worker_message.reset(query_manager_->getNextWorkerMessage(0, -1));
- EXPECT_TRUE(worker_message != nullptr);
-
- EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
- worker_message->getType());
- EXPECT_EQ(0u, worker_message->getRelationalOpIndex());
-
- delete worker_message->getWorkOrder();
-
- EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
- EXPECT_EQ(0, getNumOperatorsFinished());
-
- // Send a message to QueryManager upon workorder completion.
- // Last event processed by QueryManager.
- EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
-
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
- EXPECT_EQ(1, getNumOperatorsFinished());
- EXPECT_TRUE(getOperatorFinishedStatus(id));
-}
-
-TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
- // This test creates a DAG of a single node. WorkOrders are generated
- // dynamically as pending work orders complete execution, i.e.,
- // getAllWorkOrders() is called multiple times. getAllWorkOrders() will be
- // called 5 times and 3 work orders will be returned, i.e., 1st 3 calls to
- // getAllWorkOrders() insert 1 WorkOrder and return false, and the next will
- // insert no WorkOrder and return true.
-
- // TODO(shoban): This test can not be more robust than this because of fixed
- // scaffolding of mocking. If we use gMock, we can do much better.
- const QueryPlan::DAGNodeIndex id =
- query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3));
-
- const MockOperator &op = static_cast<const MockOperator &>(
- query_plan_->getQueryPlanDAG().getNodePayload(id));
-
- constructQueryManager();
-
- // op doesn't have any dependencies.
- EXPECT_TRUE(op.getBlockingDependenciesMet());
-
- for (int i = 0; i < 3; ++i) {
- // We expect one call for op's getAllWorkOrders().
- EXPECT_EQ(i + 1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
-
- // One workorder is generated.
- // EXPECT_EQ(1, getWorkerInputQueueSize());
- EXPECT_EQ(i + 1, op.getNumWorkOrders());
-
- unique_ptr<WorkerMessage> worker_message;
- worker_message.reset(query_manager_->getNextWorkerMessage(id, -1));
-
- EXPECT_TRUE(worker_message != nullptr);
- EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
- worker_message->getType());
- EXPECT_EQ(id, worker_message->getRelationalOpIndex());
-
- delete worker_message->getWorkOrder();
-
- EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
- EXPECT_EQ(0, getNumOperatorsFinished());
-
- if (i < 2) {
- // Send a message to QueryManager upon workorder completion.
- EXPECT_FALSE(placeWorkOrderCompleteMessage(id));
- } else {
- // Send a message to QueryManager upon workorder completion.
- // Last event.
- EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
- }
- }
-
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
-
- EXPECT_EQ(1, getNumOperatorsFinished());
- EXPECT_TRUE(getOperatorFinishedStatus(id));
-
- // We place this check in the end, since it's true throughout the test.
- EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
-}
-
-TEST_F(QueryManagerTest, TwoNodesDAGBlockingLinkTest) {
- // We use two nodes in the DAG with a blocking link between them.
- // There is no streaming of data involved in this test.
- const QueryPlan::DAGNodeIndex id1 =
- query_plan_->addRelationalOperator(new MockOperator(true, false));
- const QueryPlan::DAGNodeIndex id2 =
- query_plan_->addRelationalOperator(new MockOperator(true, false));
-
- // Create a blocking link.
- query_plan_->addDirectDependency(id2, id1, true);
-
- static_cast<MockOperator *>(
- query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
- ->setOutputRelationID(0xdead);
-
- const MockOperator &op1 = static_cast<const MockOperator &>(
- query_plan_->getQueryPlanDAG().getNodePayload(id1));
- const MockOperator &op2 = static_cast<const MockOperator &>(
- query_plan_->getQueryPlanDAG().getNodePayload(id2));
-
- constructQueryManager();
-
- // op1 doesn't have any dependencies
- EXPECT_TRUE(op1.getBlockingDependenciesMet());
-
- // Only op1 should receive a call to getAllWorkOrders initially.
- EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
-
- EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
-
- // Only op1 should produce a workorder.
- EXPECT_EQ(1, op1.getNumWorkOrders());
- EXPECT_EQ(0, op2.getNumWorkOrders());
-
- // Foreman hasn't yet got workorder completion response for the workorder.
- unique_ptr<WorkerMessage> worker_message;
- worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
-
- EXPECT_TRUE(worker_message != nullptr);
- EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
- worker_message->getType());
- EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
-
- delete worker_message->getWorkOrder();
-
- EXPECT_EQ(1, getNumWorkOrdersInExecution(id1));
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
- EXPECT_EQ(0, getNumOperatorsFinished());
-
- // Send a message to Foreman upon workorder (generated by op1) completion.
- EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
-
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
- // op1 is over now, op2 still to go.
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
- EXPECT_EQ(1, getNumOperatorsFinished());
-
- EXPECT_TRUE(getOperatorFinishedStatus(id1));
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
- EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
- worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
- EXPECT_TRUE(worker_message != nullptr);
- EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
- worker_message->getType());
- EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
-
- delete worker_message->getWorkOrder();
-
- EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
-
- // op1 is op2's blocking dependency.
- EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
- EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
- // op2 should get first call of getAllWorkOrders() when op1 is over.
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-
- EXPECT_EQ(1, op2.getNumWorkOrders());
-
- // Send a message to QueryManager upon workorder (generated by op2) completion.
- // Note that the worker hasn't yet popped the workorder. Usually this won't
- // happen as workers pop workorders first, execute and then send the response.
- EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
-
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
-
- EXPECT_EQ(2, getNumOperatorsFinished());
- EXPECT_TRUE(getOperatorFinishedStatus(id1));
- EXPECT_TRUE(getOperatorFinishedStatus(id2));
-
- // Expect no additional calls to getAllWorkOrders.
- EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-}
-
-TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) {
- // We use two nodes in the DAG with a non-blocking link between them.
- // We stream output of op1 to op2. Sequeuce of events is as follows:
- // 1. op1 creates a workorder.
- // 2. We send a "block full" (from op1) to QueryManager.
- // 3. op2 creates a workorder because of step 2.
- const QueryPlan::DAGNodeIndex id1 =
- query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
- const QueryPlan::DAGNodeIndex id2 =
- query_plan_->addRelationalOperator(new MockOperator(true, true, 3));
-
- // Create a non-blocking link.
- query_plan_->addDirectDependency(id2, id1, false);
-
- static_cast<MockOperator *>(
- query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
- ->setOutputRelationID(0xdead);
-
- const MockOperator &op1 = static_cast<const MockOperator &>(
- query_plan_->getQueryPlanDAG().getNodePayload(id1));
- const MockOperator &op2 = static_cast<const MockOperator &>(
- query_plan_->getQueryPlanDAG().getNodePayload(id2));
-
- constructQueryManager();
-
- // As none of the operators have a blocking link, blocking dependencies should
- // be met.
- EXPECT_TRUE(op1.getBlockingDependenciesMet());
- EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
- EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(1, op1.getNumWorkOrders());
- EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
-
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
- // op2 will generate workorder only after receiving a streaming input.
- EXPECT_EQ(0, op2.getNumWorkOrders());
- EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
-
- unique_ptr<WorkerMessage> worker_message;
- worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
-
- EXPECT_TRUE(worker_message != nullptr);
- EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
- worker_message->getType());
- EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
-
- delete worker_message->getWorkOrder();
-
- // Send a message to QueryManager upon block getting full (output of op1).
- EXPECT_FALSE(placeOutputBlockMessage(id1));
-
- // op1 is not finished yet because the response of workorder completion hasn't
- // been received yet by the QueryManager.
- EXPECT_FALSE(getOperatorFinishedStatus(id1));
- EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
- // No additional call to op1's getAllWorkOrders.
- EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
-
- // Output from op1 should be fed to op2.
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kFeedInputBlock));
- EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
-
- // A call to op2's getAllWorkOrders because of the streamed input.
- EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(1, op2.getNumWorkOrders());
-
- // Place a message of a workorder completion of op1 on Foreman's input queue.
- EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
-
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
- EXPECT_TRUE(getOperatorFinishedStatus(id1));
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-
- // An additional call to op2's getAllWorkOrders because of completion of op1.
- EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(2, op2.getNumWorkOrders());
-
- worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
-
- EXPECT_TRUE(worker_message != nullptr);
- EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
- worker_message->getType());
- EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
-
- delete worker_message->getWorkOrder();
-
- // Place a message of a workorder completion of op2 on Foreman's input queue.
- EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));
-
- EXPECT_TRUE(getOperatorFinishedStatus(id1));
-
- worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
-
- EXPECT_TRUE(worker_message != nullptr);
- EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
- worker_message->getType());
- EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
-
- delete worker_message->getWorkOrder();
-
- EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
- EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
- // Send a message to Foreman upon workorder (generated by op2) completion.
- EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
-
- EXPECT_TRUE(getOperatorFinishedStatus(id1));
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
- EXPECT_TRUE(getOperatorFinishedStatus(id2));
-}
-
-TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) {
- // In this test, we create a 2-node DAG with a non-blocking link between them.
- // There is no streaming of data from op1 to op2 during the execution of op1.
- // op1 produces a partially filled block at the end of its execution which is
- // rebuilt and then fed to op2.
- const QueryPlan::DAGNodeIndex id1 =
- query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
- const QueryPlan::DAGNodeIndex id2 =
- query_plan_->addRelationalOperator(new MockOperator(true, true, 3, 1));
-
- // Create a non-blocking link.
- query_plan_->addDirectDependency(id2, id1, false);
-
- // Create a relation, owned by db_.*/
- CatalogRelation *relation =
- new CatalogRelation(nullptr /* catalog_database */, "test_relation");
- const relation_id output_relation_id = db_->addRelation(relation);
-
- // Setup the InsertDestination proto in the query context proto.
- serialization::QueryContext *query_context_proto =
- query_handle_->getQueryContextProtoMutable();
-
- const QueryContext::insert_destination_id insert_destination_index =
- query_context_proto->insert_destinations_size();
- serialization::InsertDestination *insert_destination_proto =
- query_context_proto->add_insert_destinations();
-
- insert_destination_proto->set_insert_destination_type(
- serialization::InsertDestinationType::BLOCK_POOL);
- insert_destination_proto->set_relation_id(output_relation_id);
- insert_destination_proto->set_relational_op_index(id1);
-
- MockOperator *op1_mutable = static_cast<MockOperator *>(
- query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1));
- op1_mutable->setInsertDestinationID(insert_destination_index);
- op1_mutable->setOutputRelationID(output_relation_id);
-
- const MockOperator &op1 = static_cast<const MockOperator &>(
- query_plan_->getQueryPlanDAG().getNodePayload(id1));
- const MockOperator &op2 = static_cast<const MockOperator &>(
- query_plan_->getQueryPlanDAG().getNodePayload(id2));
-
- constructQueryManager();
-
- // NOTE(zuyu): An operator generally has no ideas about partially filled
- // blocks, but InsertDestination in QueryContext does.
- // Mock to add partially filled blocks in the InsertDestination.
- InsertDestination *insert_destination =
- query_manager_->getQueryContextMutable()->getInsertDestination(
- insert_destination_index);
- DCHECK(insert_destination != nullptr);
- MutableBlockReference block_ref;
- static_cast<BlockPoolInsertDestination *>(insert_destination)
- ->available_block_refs_.push_back(move(block_ref));
-
- // There's no blocking dependency in the DAG.
- EXPECT_TRUE(op1.getBlockingDependenciesMet());
- EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
- EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(1, op1.getNumWorkOrders());
-
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(0, op2.getNumWorkOrders());
-
- unique_ptr<WorkerMessage> worker_message;
- worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
-
- EXPECT_TRUE(worker_message != nullptr);
- EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
- worker_message->getType());
- EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
-
- delete worker_message->getWorkOrder();
-
- // Send a message to QueryManager upon workorder (generated by op1) completion.
- EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
-
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
-
- worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
- EXPECT_TRUE(worker_message != nullptr);
- EXPECT_EQ(WorkerMessage::WorkerMessageType::kRebuildWorkOrder,
- worker_message->getType());
-
- EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
-
- delete worker_message->getWorkOrder();
-
- // op1 generates a rebuild workorder. The block is rebuilt and streamed
- // to Foreman.
- EXPECT_FALSE(placeDataPipelineMessage(id1));
-
- EXPECT_FALSE(placeRebuildWorkOrderCompleteMessage(id1));
- // Based on the streamed input, op2's getAllWorkOrders should produce a
- // workorder.
- EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
- EXPECT_EQ(1, op2.getNumWorkOrders());
-
- worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
-
- EXPECT_TRUE(worker_message != nullptr);
- EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
- worker_message->getType());
-
- EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
-
- delete worker_message->getWorkOrder();
-
- EXPECT_TRUE(getOperatorFinishedStatus(id1));
- EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
- EXPECT_FALSE(getOperatorFinishedStatus(id2));
- EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
-
- // Send a message to QueryManager upon workorder (generated by op2) completion.
- EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
-
- EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
-
- EXPECT_TRUE(getOperatorFinishedStatus(id2));
-}
-
-TEST_F(QueryManagerTest, MultipleNodesNoOutputTest) {
- // When an operator produces workorders but no output, the QueryManager should
- // check the dependents of this operator to make progress.
- const QueryPlan::DAGNodeIndex kNumNodes = 5;
- std::vector<QueryPlan::DAGNodeIndex> ids;
- ids.reserve(kNumNodes);
-
- for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
- if (i == 0) {
- ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, false));
- } else {
- ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, true));
- }
- VLOG(3) << ids[i];
- }
-
- /**
- * The DAG looks like this:
- *
- * op1 -> op2 -> op3 -> op4 -> op5
- *
- **/
- for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes - 1; ++i) {
- query_plan_->addDirectDependency(ids[i + 1], ids[i], false);
- static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(ids[i]))
- ->setOutputRelationID(0xdead);
- }
-
- std::vector<const MockOperator*> operators;
- for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
- operators.push_back(static_cast<const MockOperator*>(&query_plan_->getQueryPlanDAG().getNodePayload(ids[i])));
- }
-
- constructQueryManager();
-
- // operators[0] should have produced a workorder by now.
- EXPECT_EQ(1, operators[0]->getNumWorkOrders());
-
- unique_ptr<WorkerMessage> worker_message;
- worker_message.reset(query_manager_->getNextWorkerMessage(ids[0], -1));
-
- EXPECT_TRUE(worker_message != nullptr);
- EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
- worker_message->getType());
-
- EXPECT_EQ(ids[0], worker_message->getRelationalOpIndex());
-
- delete worker_message->getWorkOrder();
-
- EXPECT_EQ(1, getNumWorkOrdersInExecution(ids[0]));
- EXPECT_FALSE(getOperatorFinishedStatus(ids[0]));
-
- for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
- EXPECT_EQ(1, operators[ids[i]]->getNumCalls(MockOperator::kGetAllWorkOrders));
- }
-
- // Send a message to QueryManager upon workorder (generated by operators[0])
- // completion.
- EXPECT_TRUE(placeWorkOrderCompleteMessage(ids[0]));
-
- for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
- EXPECT_EQ(0, getNumWorkOrdersInExecution(ids[i]));
- EXPECT_TRUE(getOperatorFinishedStatus(ids[i]));
- if (i < kNumNodes - 1) {
- EXPECT_EQ(1, operators[i + 1]->getNumCalls(MockOperator::kDoneFeedingInputBlocks));
- }
- }
-}
-
-TEST_F(QueryManagerTest, OutOfOrderWorkOrderCompletionTest) {
- // Consider two operators, both generate one workorder each. The dependent's
- // workorder finishes before dependency's workorder.
- const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
- const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1));
-
- // Create a non-blocking link.
- query_plan_->addDirectDependency(id2, id1, false);
-
- constructQueryManager();
-
- unique_ptr<WorkerMessage> worker_message;
- worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
-
- EXPECT_TRUE(worker_message != nullptr);
- EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
- worker_message->getType());
-
- EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
-
- delete worker_message->getWorkOrder();
-
- // Send a message to QueryManager upon a block (output of op1) getting full.
- EXPECT_FALSE(placeOutputBlockMessage(id1));
-
- // op1 is not finished yet because the response of workorder completion hasn't
- // been received yet.
- EXPECT_FALSE(getOperatorFinishedStatus(id1));
- EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
- worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
- EXPECT_TRUE(worker_message != nullptr);
- EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
- worker_message->getType());
-
- EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
-
- delete worker_message->getWorkOrder();
-
- // As mentioned earlier, op2 finishes before op1.
- EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));
-
- // op1's workorder execution is over.
- EXPECT_TRUE(placeWorkOrderCompleteMessage(id1));
-
- EXPECT_TRUE(getOperatorFinishedStatus(id1));
- EXPECT_TRUE(getOperatorFinishedStatus(id2));
-}
-
-} // namespace quickstep
[2/2] incubator-quickstep git commit: Introduced QueryManagerBase,
and renamed QueryManagerSingleNode.
Posted by hb...@apache.org.
Introduced QueryManagerBase, and renamed QueryManagerSingleNode.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b4e25edc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b4e25edc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b4e25edc
Branch: refs/heads/master
Commit: b4e25edce8bd49b4f53f63886ad02966ae93bf91
Parents: 7671a58
Author: Zuyu Zhang <zu...@apache.org>
Authored: Fri Jul 8 11:49:14 2016 -0500
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Tue Jul 12 15:51:06 2016 -0700
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 40 +-
query_execution/PolicyEnforcer.cpp | 14 +-
query_execution/PolicyEnforcer.hpp | 8 +-
query_execution/QueryManager.cpp | 470 ---------
query_execution/QueryManager.hpp | 374 --------
query_execution/QueryManagerBase.cpp | 305 ++++++
query_execution/QueryManagerBase.hpp | 312 ++++++
query_execution/QueryManagerSingleNode.cpp | 194 ++++
query_execution/QueryManagerSingleNode.hpp | 140 +++
.../tests/QueryManagerSingleNode_unittest.cpp | 942 +++++++++++++++++++
query_execution/tests/QueryManager_unittest.cpp | 940 ------------------
11 files changed, 1931 insertions(+), 1808 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index e1b1183..5a9189c 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -45,7 +45,8 @@ add_library(quickstep_queryexecution_QueryExecutionMessages_proto
add_library(quickstep_queryexecution_QueryExecutionState ../empty_src.cpp QueryExecutionState.hpp)
add_library(quickstep_queryexecution_QueryExecutionTypedefs ../empty_src.cpp QueryExecutionTypedefs.hpp)
add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryExecutionUtil.hpp)
-add_library(quickstep_queryexecution_QueryManager QueryManager.cpp QueryManager.hpp)
+add_library(quickstep_queryexecution_QueryManagerBase QueryManagerBase.cpp QueryManagerBase.hpp)
+add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp)
add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp)
add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp)
add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp)
@@ -89,18 +90,19 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode
tmb
${GFLAGS_LIB_NAME})
target_link_libraries(quickstep_queryexecution_PolicyEnforcer
- ${GFLAGS_LIB_NAME}
glog
quickstep_catalog_CatalogTypedefs
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
- quickstep_queryexecution_QueryManager
+ quickstep_queryexecution_QueryManagerBase
+ quickstep_queryexecution_QueryManagerSingleNode
quickstep_queryexecution_WorkerDirectory
quickstep_queryexecution_WorkerMessage
quickstep_queryoptimizer_QueryHandle
quickstep_relationaloperators_WorkOrder
quickstep_utility_Macros
- tmb)
+ tmb
+ ${GFLAGS_LIB_NAME})
target_link_libraries(quickstep_queryexecution_QueryContext
glog
quickstep_catalog_CatalogDatabaseLite
@@ -149,7 +151,7 @@ target_link_libraries(quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_WorkerMessage
quickstep_utility_Macros
tmb)
-target_link_libraries(quickstep_queryexecution_QueryManager
+target_link_libraries(quickstep_queryexecution_QueryManagerBase
quickstep_catalog_CatalogDatabase
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogTypedefs
@@ -158,14 +160,25 @@ target_link_libraries(quickstep_queryexecution_QueryManager
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionState
quickstep_queryexecution_QueryExecutionTypedefs
+ quickstep_queryoptimizer_QueryHandle
+ quickstep_queryoptimizer_QueryPlan
+ quickstep_relationaloperators_RelationalOperator
+ quickstep_relationaloperators_WorkOrder
+ quickstep_storage_StorageBlockInfo
+ quickstep_utility_DAG
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
+ quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_QueryContext
+ quickstep_queryexecution_QueryExecutionState
+ quickstep_queryexecution_QueryManagerBase
quickstep_queryexecution_WorkOrdersContainer
quickstep_queryexecution_WorkerMessage
quickstep_queryoptimizer_QueryHandle
quickstep_relationaloperators_RebuildWorkOrder
quickstep_relationaloperators_RelationalOperator
- quickstep_relationaloperators_WorkOrder
+ quickstep_storage_InsertDestination
quickstep_storage_StorageBlock
- quickstep_storage_StorageBlockInfo
quickstep_utility_DAG
quickstep_utility_Macros
tmb)
@@ -210,7 +223,8 @@ target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_QueryExecutionState
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryExecutionUtil
- quickstep_queryexecution_QueryManager
+ quickstep_queryexecution_QueryManagerBase
+ quickstep_queryexecution_QueryManagerSingleNode
quickstep_queryexecution_WorkOrderProtosContainer
quickstep_queryexecution_WorkOrdersContainer
quickstep_queryexecution_Worker
@@ -248,9 +262,9 @@ if (ENABLE_DISTRIBUTED)
add_test(BlockLocator_unittest BlockLocator_unittest)
endif()
-add_executable(QueryManager_unittest
- "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp")
-target_link_libraries(QueryManager_unittest
+add_executable(QueryManagerSingleNode_unittest
+ "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManagerSingleNode_unittest.cpp")
+target_link_libraries(QueryManagerSingleNode_unittest
glog
gtest
gtest_main
@@ -262,7 +276,7 @@ target_link_libraries(QueryManager_unittest
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionState
quickstep_queryexecution_QueryExecutionTypedefs
- quickstep_queryexecution_QueryManager
+ quickstep_queryexecution_QueryManagerSingleNode
quickstep_queryexecution_WorkOrdersContainer
quickstep_queryexecution_WorkerDirectory
quickstep_queryexecution_WorkerMessage
@@ -278,7 +292,7 @@ target_link_libraries(QueryManager_unittest
quickstep_utility_DAG
quickstep_utility_Macros
tmb)
-add_test(QueryManager_unittest QueryManager_unittest)
+add_test(QueryManagerSingleNode_unittest QueryManagerSingleNode_unittest)
add_executable(WorkOrdersContainer_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/WorkOrdersContainer_unittest.cpp")
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index 84aa86a..f310ee1 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -26,7 +26,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryManager.hpp"
+#include "query_execution/QueryManagerSingleNode.hpp"
#include "query_execution/WorkerDirectory.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "relational_operators/WorkOrder.hpp"
@@ -47,8 +47,8 @@ bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
// Query with the same ID not present, ok to admit.
admitted_queries_[query_id].reset(
- new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
- catalog_database_, storage_manager_, bus_));
+ new QueryManagerSingleNode(foreman_client_id_, num_numa_nodes_, query_handle,
+ catalog_database_, storage_manager_, bus_));
return true;
} else {
LOG(ERROR) << "Query with the same ID " << query_id << " exists";
@@ -124,9 +124,9 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
}
DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
- const QueryManager::QueryStatusCode return_code =
+ const QueryManagerBase::QueryStatusCode return_code =
admitted_queries_[query_id]->processMessage(tagged_message);
- if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
+ if (return_code == QueryManagerBase::QueryStatusCode::kQueryExecuted) {
removeQuery(query_id);
if (!waiting_queries_.empty()) {
// Admit the earliest waiting query.
@@ -156,12 +156,12 @@ void PolicyEnforcer::getWorkerMessages(
std::vector<std::size_t> finished_queries_ids;
for (const auto &admitted_query_info : admitted_queries_) {
- QueryManager *curr_query_manager = admitted_query_info.second.get();
+ QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
DCHECK(curr_query_manager != nullptr);
std::size_t messages_collected_curr_query = 0;
while (messages_collected_curr_query < per_query_share) {
WorkerMessage *next_worker_message =
- curr_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID);
+ static_cast<QueryManagerSingleNode*>(curr_query_manager)->getNextWorkerMessage(0, kAnyNUMANodeID);
if (next_worker_message != nullptr) {
++messages_collected_curr_query;
worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
index 470ff2a..79e61d1 100644
--- a/query_execution/PolicyEnforcer.hpp
+++ b/query_execution/PolicyEnforcer.hpp
@@ -26,7 +26,7 @@
#include <vector>
#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryManager.hpp"
+#include "query_execution/QueryManagerBase.hpp"
#include "query_execution/WorkerMessage.hpp"
#include "utility/Macros.hpp"
@@ -190,8 +190,8 @@ class PolicyEnforcer {
tmb::MessageBus *bus_;
const bool profile_individual_workorders_;
- // Key = query ID, value = QueryManager* for the key query.
- std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> admitted_queries_;
+ // Key = query ID, value = QueryManagerBase* for the key query.
+ std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_;
// The queries which haven't been admitted yet.
std::queue<QueryHandle*> waiting_queries_;
@@ -214,4 +214,4 @@ class PolicyEnforcer {
} // namespace quickstep
-#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_
+#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/QueryManager.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManager.cpp b/query_execution/QueryManager.cpp
deleted file mode 100644
index d20b592..0000000
--- a/query_execution/QueryManager.cpp
+++ /dev/null
@@ -1,470 +0,0 @@
-/**
- * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- * University of Wisconsin\u2014Madison.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#include "query_execution/QueryManager.hpp"
-
-#include <cstddef>
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "catalog/PartitionScheme.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "query_optimizer/QueryHandle.hpp"
-#include "relational_operators/RebuildWorkOrder.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/StorageBlock.hpp"
-#include "storage/StorageBlockInfo.hpp"
-
-#include "glog/logging.h"
-
-using std::pair;
-
-namespace quickstep {
-class CatalogDatabaseLite;
-class StorageManager;
-}
-
-namespace quickstep {
-
-QueryManager::QueryManager(const tmb::client_id foreman_client_id,
- const std::size_t num_numa_nodes,
- QueryHandle *query_handle,
- CatalogDatabaseLite *catalog_database,
- StorageManager *storage_manager,
- tmb::MessageBus *bus)
- : foreman_client_id_(foreman_client_id),
- query_id_(DCHECK_NOTNULL(query_handle)->query_id()),
- catalog_database_(DCHECK_NOTNULL(catalog_database)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)),
- bus_(DCHECK_NOTNULL(bus)) {
- DCHECK(query_handle->getQueryPlanMutable() != nullptr);
- query_dag_ = query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable();
- DCHECK(query_dag_ != nullptr);
-
- const dag_node_index num_operators_in_dag = query_dag_->size();
-
- output_consumers_.resize(num_operators_in_dag);
- blocking_dependencies_.resize(num_operators_in_dag);
-
- query_exec_state_.reset(new QueryExecutionState(num_operators_in_dag));
- workorders_container_.reset(
- new WorkOrdersContainer(num_operators_in_dag, num_numa_nodes));
-
- query_context_.reset(new QueryContext(query_handle->getQueryContextProto(),
- *catalog_database_,
- storage_manager_,
- foreman_client_id_,
- bus_));
-
- for (dag_node_index node_index = 0;
- node_index < num_operators_in_dag;
- ++node_index) {
- const QueryContext::insert_destination_id insert_destination_index =
- query_dag_->getNodePayload(node_index).getInsertDestinationID();
- if (insert_destination_index != QueryContext::kInvalidInsertDestinationId) {
- // Rebuild is necessary whenever InsertDestination is present.
- query_exec_state_->setRebuildRequired(node_index);
- query_exec_state_->setRebuildStatus(node_index, 0, false);
- }
-
- for (const pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(node_index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (!query_dag_->getLinkMetadata(node_index, dependent_op_index)) {
- // The link is not a pipeline-breaker. Streaming of blocks is possible
- // between these two operators.
- output_consumers_[node_index].push_back(dependent_op_index);
- } else {
- // The link is a pipeline-breaker. Streaming of blocks is not possible
- // between these two operators.
- blocking_dependencies_[dependent_op_index].push_back(node_index);
- }
- }
- }
-
- // Collect all the workorders from all the relational operators in the DAG.
- for (dag_node_index index = 0; index < num_operators_in_dag; ++index) {
- if (checkAllBlockingDependenciesMet(index)) {
- query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
- processOperator(index, false);
- }
- }
-}
-
-WorkerMessage* QueryManager::getNextWorkerMessage(
- const dag_node_index start_operator_index, const int numa_node) {
- // Default policy: Operator with lowest index first.
- WorkOrder *work_order = nullptr;
- size_t num_operators_checked = 0;
- for (dag_node_index index = start_operator_index;
- num_operators_checked < query_dag_->size();
- index = (index + 1) % query_dag_->size(), ++num_operators_checked) {
- if (query_exec_state_->hasExecutionFinished(index)) {
- continue;
- }
- if (numa_node != -1) {
- // First try to get a normal WorkOrder from the specified NUMA node.
- work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node);
- if (work_order != nullptr) {
- // A WorkOrder found on the given NUMA node.
- query_exec_state_->incrementNumQueuedWorkOrders(index);
- return WorkerMessage::WorkOrderMessage(work_order, index);
- } else {
- // Normal workorder not found on this node. Look for a rebuild workorder
- // on this NUMA node.
- work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node);
- if (work_order != nullptr) {
- return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
- }
- }
- }
- // Either no workorder found on the given NUMA node, or numa_node is -1.
- // Try to get a normal WorkOrder from other NUMA nodes.
- work_order = workorders_container_->getNormalWorkOrder(index);
- if (work_order != nullptr) {
- query_exec_state_->incrementNumQueuedWorkOrders(index);
- return WorkerMessage::WorkOrderMessage(work_order, index);
- } else {
- // Normal WorkOrder not found, look for a RebuildWorkOrder.
- work_order = workorders_container_->getRebuildWorkOrder(index);
- if (work_order != nullptr) {
- return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
- }
- }
- }
- // No WorkOrders available right now.
- return nullptr;
-}
-
-QueryManager::QueryStatusCode QueryManager::processMessage(
- const TaggedMessage &tagged_message) {
- dag_node_index op_index;
- switch (tagged_message.message_type()) {
- case kWorkOrderCompleteMessage: {
- serialization::NormalWorkOrderCompletionMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
-
- op_index = proto.operator_index();
- processWorkOrderCompleteMessage(proto.operator_index());
- break;
- }
- case kRebuildWorkOrderCompleteMessage: {
- serialization::RebuildWorkOrderCompletionMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
-
- op_index = proto.operator_index();
- processRebuildWorkOrderCompleteMessage(proto.operator_index());
- break;
- }
- case kCatalogRelationNewBlockMessage: {
- serialization::CatalogRelationNewBlockMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
-
- const block_id block = proto.block_id();
-
- CatalogRelation *relation =
- static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
- relation->addBlock(block);
-
- if (proto.has_partition_id()) {
- relation->getPartitionSchemeMutable()->addBlockToPartition(
- proto.partition_id(), block);
- }
- return QueryStatusCode::kNone;
- }
- case kDataPipelineMessage: {
- // Possible message senders include InsertDestinations and some
- // operators which modify existing blocks.
- serialization::DataPipelineMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
-
- op_index = proto.operator_index();
- processDataPipelineMessage(proto.operator_index(),
- proto.block_id(),
- proto.relation_id());
- break;
- }
- case kWorkOrdersAvailableMessage: {
- serialization::WorkOrdersAvailableMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
-
- op_index = proto.operator_index();
-
- // Check if new work orders are available.
- fetchNormalWorkOrders(op_index);
-
- // Dispatch the WorkerMessages to the workers. We prefer to start the search
- // for the schedulable WorkOrders beginning from 'op_index'. The first
- // candidate worker to receive the next WorkOrder is the one that sent the
- // response message to Foreman.
- // TODO(zuyu): Improve the data locality for the next WorkOrder.
- break;
- }
- case kWorkOrderFeedbackMessage: {
- WorkOrder::FeedbackMessage msg(
- const_cast<void *>(tagged_message.message()),
- tagged_message.message_bytes());
-
- op_index = msg.header().rel_op_index;
- processFeedbackMessage(msg);
- break;
- }
- default:
- LOG(FATAL) << "Unknown message type found in QueryManager";
- }
-
- if (query_exec_state_->hasExecutionFinished(op_index)) {
- return QueryStatusCode::kOperatorExecuted;
- }
-
- // As kQueryExecuted takes precedence over kOperatorExecuted, we check again.
- if (query_exec_state_->hasQueryExecutionFinished()) {
- return QueryStatusCode::kQueryExecuted;
- }
-
- return QueryStatusCode::kNone;
-}
-
-void QueryManager::processFeedbackMessage(
- const WorkOrder::FeedbackMessage &msg) {
- RelationalOperator *op =
- query_dag_->getNodePayloadMutable(msg.header().rel_op_index);
- op->receiveFeedbackMessage(msg);
-}
-
-void QueryManager::processWorkOrderCompleteMessage(
- const dag_node_index op_index) {
- query_exec_state_->decrementNumQueuedWorkOrders(op_index);
-
- // Check if new work orders are available and fetch them if so.
- fetchNormalWorkOrders(op_index);
-
- if (checkRebuildRequired(op_index)) {
- if (checkNormalExecutionOver(op_index)) {
- if (!checkRebuildInitiated(op_index)) {
- if (initiateRebuild(op_index)) {
- // Rebuild initiated and completed right away.
- markOperatorFinished(op_index);
- } else {
- // Rebuild under progress.
- }
- } else if (checkRebuildOver(op_index)) {
- // Rebuild was under progress and now it is over.
- markOperatorFinished(op_index);
- }
- } else {
- // Normal execution under progress for this operator.
- }
- } else if (checkOperatorExecutionOver(op_index)) {
- // Rebuild not required for this operator and its normal execution is
- // complete.
- markOperatorFinished(op_index);
- }
-
- for (const pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(op_index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- // Process the dependent operator (of the operator whose WorkOrder
- // was just executed) for which all the dependencies have been met.
- processOperator(dependent_op_index, true);
- }
- }
-}
-
-void QueryManager::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index) {
- query_exec_state_->decrementNumRebuildWorkOrders(op_index);
-
- if (checkRebuildOver(op_index)) {
- markOperatorFinished(op_index);
-
- for (const pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(op_index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- processOperator(dependent_op_index, true);
- }
- }
- }
-}
-
-void QueryManager::processOperator(const dag_node_index index,
- const bool recursively_check_dependents) {
- if (fetchNormalWorkOrders(index)) {
- // Fetched work orders. Return to wait for the generated work orders to
- // execute, and skip the execution-finished checks.
- return;
- }
-
- if (checkNormalExecutionOver(index)) {
- if (checkRebuildRequired(index)) {
- if (!checkRebuildInitiated(index)) {
- // Rebuild hasn't started, initiate it.
- if (initiateRebuild(index)) {
- // Rebuild initiated and completed right away.
- markOperatorFinished(index);
- } else {
- // Rebuild WorkOrders have been generated.
- return;
- }
- } else if (checkRebuildOver(index)) {
- // Rebuild had been initiated and it is over.
- markOperatorFinished(index);
- }
- } else {
- // Rebuild is not required and normal execution over, mark finished.
- markOperatorFinished(index);
- }
- // If we reach here, that means the operator has been marked as finished.
- if (recursively_check_dependents) {
- for (const pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- processOperator(dependent_op_index, true);
- }
- }
- }
- }
-}
-
-void QueryManager::processDataPipelineMessage(const dag_node_index op_index,
- const block_id block,
- const relation_id rel_id) {
- for (const dag_node_index consumer_index :
- output_consumers_[op_index]) {
- // Feed the streamed block to the consumer. Note that 'output_consumers_'
- // only contain those dependents of operator with index = op_index which are
- // eligible to receive streamed input.
- query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id);
- // Because of the streamed input just fed, check if there are any new
- // WorkOrders available and if so, fetch them.
- fetchNormalWorkOrders(consumer_index);
- }
-}
-
-bool QueryManager::fetchNormalWorkOrders(const dag_node_index index) {
- bool generated_new_workorders = false;
- if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
- // Do not fetch any work units until all blocking dependencies are met.
- // The releational operator is not aware of blocking dependencies for
- // uncorrelated scalar queries.
- if (!checkAllBlockingDependenciesMet(index)) {
- return false;
- }
- const size_t num_pending_workorders_before =
- workorders_container_->getNumNormalWorkOrders(index);
- const bool done_generation =
- query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(),
- query_context_.get(),
- storage_manager_,
- foreman_client_id_,
- bus_);
- if (done_generation) {
- query_exec_state_->setDoneGenerationWorkOrders(index);
- }
-
- // TODO(shoban): It would be a good check to see if operator is making
- // useful progress, i.e., the operator either generates work orders to
- // execute or still has pending work orders executing. However, this will not
- // work if Foreman polls operators without feeding data. This check can be
- // enabled, if Foreman is refactored to call getAllWorkOrders() only when
- // pending work orders are completed or new input blocks feed.
-
- generated_new_workorders =
- (num_pending_workorders_before <
- workorders_container_->getNumNormalWorkOrders(index));
- }
- return generated_new_workorders;
-}
-
-void QueryManager::markOperatorFinished(const dag_node_index index) {
- query_exec_state_->setExecutionFinished(index);
-
- RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
- op->updateCatalogOnCompletion();
-
- const relation_id output_rel = op->getOutputRelationID();
- for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index);
- // Signal dependent operator that current operator is done feeding input blocks.
- if (output_rel >= 0) {
- dependent_op->doneFeedingInputBlocks(output_rel);
- }
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- dependent_op->informAllBlockingDependenciesMet();
- }
- }
-}
-
-bool QueryManager::initiateRebuild(const dag_node_index index) {
- DCHECK(!workorders_container_->hasRebuildWorkOrder(index));
- DCHECK(checkRebuildRequired(index));
- DCHECK(!checkRebuildInitiated(index));
-
- getRebuildWorkOrders(index, workorders_container_.get());
-
- query_exec_state_->setRebuildStatus(
- index, workorders_container_->getNumRebuildWorkOrders(index), true);
-
- return (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
-}
-
-void QueryManager::getRebuildWorkOrders(const dag_node_index index,
- WorkOrdersContainer *container) {
- const RelationalOperator &op = query_dag_->getNodePayload(index);
- const QueryContext::insert_destination_id insert_destination_index = op.getInsertDestinationID();
-
- if (insert_destination_index == QueryContext::kInvalidInsertDestinationId) {
- return;
- }
-
- std::vector<MutableBlockReference> partially_filled_block_refs;
-
- DCHECK(query_context_ != nullptr);
- InsertDestination *insert_destination = query_context_->getInsertDestination(insert_destination_index);
- DCHECK(insert_destination != nullptr);
-
- insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
-
- for (std::vector<MutableBlockReference>::size_type i = 0;
- i < partially_filled_block_refs.size();
- ++i) {
- container->addRebuildWorkOrder(
- new RebuildWorkOrder(query_id_,
- std::move(partially_filled_block_refs[i]),
- index,
- op.getOutputRelationID(),
- foreman_client_id_,
- bus_),
- index);
- }
-}
-
-} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/QueryManager.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManager.hpp b/query_execution/QueryManager.hpp
deleted file mode 100644
index b52460f..0000000
--- a/query_execution/QueryManager.hpp
+++ /dev/null
@@ -1,374 +0,0 @@
-/**
- * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- * University of Wisconsin\u2014Madison.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/WorkOrdersContainer.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "utility/DAG.hpp"
-#include "utility/Macros.hpp"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-class QueryHandle;
-class StorageManager;
-class WorkerMessage;
-
-/** \addtogroup QueryExecution
- * @{
- */
-
-/**
- * @brief A class that manages the execution of a query including generation
- * of new work orders, keeping track of the query exection state.
- **/
-class QueryManager {
- public:
- typedef DAG<RelationalOperator, bool>::size_type_nodes dag_node_index;
-
- /**
- * @brief Return codes for processMessage() function.
- *
- * @note When both operator and query get executed, kQueryExecuted takes
- * precedence over kOperatorExecuted.
- **/
- enum class QueryStatusCode {
- kOperatorExecuted = 0, // An operator in the query finished execution.
- kQueryExecuted, // The query got executed.
- kNone // None of the above.
- };
-
- /**
- * @brief Constructor.
- *
- * @param foreman_client_id The TMB client ID of the foreman thread.
- * @param num_numa_nodes The number of NUMA nodes used by the system.
- * @param query_handle The QueryHandle object for this query.
- * @param catalog_database The CatalogDatabse used by the query.
- * @param storage_manager The StorageManager used by the query.
- * @param bus The TMB used for communication.
- **/
- QueryManager(const tmb::client_id foreman_client_id,
- const std::size_t num_numa_nodes,
- QueryHandle *query_handle,
- CatalogDatabaseLite *catalog_database,
- StorageManager *storage_manager,
- tmb::MessageBus *bus);
-
- /**
- * @brief Get the next workorder to be excuted, wrapped in a WorkerMessage.
- *
- * @param start_operator_index Begin the search for the schedulable WorkOrder
- * with the operator at this index.
- * @param numa_node The next WorkOrder should preferably have its input(s)
- * from this numa_node. This is a hint and not a binding requirement.
- *
- * @return A pointer to the WorkerMessage. If there's no WorkOrder to be
- * executed, return NULL.
- **/
- WorkerMessage *getNextWorkerMessage(
- const dag_node_index start_operator_index,
- const numa_node_id node_id = -1);
-
- /**
- * @brief Process a message sent to the QueryManager.
- *
- * @param tagged_message TaggedMessage sent to the QueryManager.
- *
- * @return QueryStatusCode as determined after the message is processed.
- **/
- QueryStatusCode processMessage(const TaggedMessage &tagged_message);
-
- /**
- * @brief Get the QueryExecutionState for this query.
- **/
- inline const QueryExecutionState& getQueryExecutionState() const {
- return *query_exec_state_;
- }
-
- /**
- * @brief Get a pointer to the QueryContext.
- **/
- inline QueryContext* getQueryContextMutable() {
- return query_context_.get();
- }
-
- private:
- /**
- * @brief Process the received WorkOrder complete message.
- *
- * @param node_index The index of the specified operator node in the query DAG
- * for the completed WorkOrder.
- **/
- void processWorkOrderCompleteMessage(const dag_node_index op_index);
-
- /**
- * @brief Process the received RebuildWorkOrder complete message.
- *
- * @param node_index The index of the specified operator node in the query DAG
- * for the completed RebuildWorkOrder.
- **/
- void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index);
-
- /**
- * @brief Process a current relational operator: Get its workorders and store
- * them in the WorkOrdersContainer for this query. If the operator can
- * be marked as done, do so.
- *
- * @param index The index of the relational operator to be processed in the
- * query plan DAG.
- * @param recursively_check_dependents If an operator is done, should we
- * call processOperator on its dependents recursively.
- **/
- void processOperator(const dag_node_index index,
- const bool recursively_check_dependents);
-
- /**
- * @brief Process the received data pipeline message.
- *
- * @param node_index The index of the specified operator node in the query DAG
- * for the pipelining block.
- * @param block The block id.
- * @param rel_id The ID of the relation that produced 'block'.
- **/
- void processDataPipelineMessage(const dag_node_index op_index,
- const block_id block,
- const relation_id rel_id);
-
- /**
- * @brief Process the received work order feedback message and notify
- * relational operator.
- *
- * @param message Feedback message from work order.
- **/
- void processFeedbackMessage(const WorkOrder::FeedbackMessage &message);
-
- /**
- * @brief Fetch all work orders currently available in relational operator and
- * store them internally.
- *
- * @param index The index of the relational operator to be processed in the
- * query plan DAG.
- *
- * @return Whether any work order was generated by op.
- **/
- bool fetchNormalWorkOrders(const dag_node_index index);
-
- /**
- * @brief This function does the following things:
- * 1. Mark the given relational operator as "done".
- * 2. For all the dependents of this operator, check if all of their
- * blocking dependencies are met. If so inform them that the blocking
- * dependencies are met.
- * 3. Check if the given operator is done producing output. If it's
- * done, inform the dependents that they won't receive input anymore
- * from the given operator.
- *
- * @param index The index of the given relational operator in the DAG.
- **/
- void markOperatorFinished(const dag_node_index index);
-
- /**
- * @brief Check if all the dependencies of the node at specified index have
- * finished their execution.
- *
- * @note This function's true return value is a pre-requisite for calling
- * getRebuildWorkOrders()
- *
- * @param node_index The index of the specified node in the query DAG.
- *
- * @return True if all the dependencies have finished their execution. False
- * otherwise.
- **/
- inline bool checkAllDependenciesMet(const dag_node_index node_index) const {
- for (const dag_node_index dependency_index :
- query_dag_->getDependencies(node_index)) {
- // If at least one of the dependencies is not met, return false.
- if (!query_exec_state_->hasExecutionFinished(dependency_index)) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * @brief Check if all the blocking dependencies of the node at specified
- * index have finished their execution.
- *
- * @note A blocking dependency is the one which is pipeline breaker. Output of
- * a dependency can't be streamed to its dependent if the link between
- * them is pipeline breaker.
- *
- * @param node_index The index of the specified node in the query DAG.
- *
- * @return True if all the blocking dependencies have finished their
- * execution. False otherwise.
- **/
- inline bool checkAllBlockingDependenciesMet(
- const dag_node_index node_index) const {
- for (const dag_node_index blocking_dependency_index :
- blocking_dependencies_[node_index]) {
- if (!query_exec_state_->hasExecutionFinished(
- blocking_dependency_index)) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * @brief Check if the execution of the given operator is over.
- *
- * @param index The index of the given operator in the DAG.
- *
- * @return True if the execution of the given operator is over, false
- * otherwise.
- **/
- inline bool checkOperatorExecutionOver(const dag_node_index index) const {
- if (checkRebuildRequired(index)) {
- return (checkNormalExecutionOver(index) && checkRebuildOver(index));
- } else {
- return checkNormalExecutionOver(index);
- }
- }
-
- /**
- * @brief Check if the given operator's normal execution is over.
- *
- * @note The conditions for a given operator's normal execution to get over:
- * 1. All of its normal (i.e. non rebuild) WorkOrders have finished
- * execution.
- * 2. The operator is done generating work orders.
- * 3. All of the dependencies of the given operator have been met.
- *
- * @param index The index of the given operator in the DAG.
- *
- * @return True if the normal execution of the given operator is over, false
- * otherwise.
- **/
- inline bool checkNormalExecutionOver(const dag_node_index index) const {
- return (checkAllDependenciesMet(index) &&
- !workorders_container_->hasNormalWorkOrder(index) &&
- query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
- query_exec_state_->hasDoneGenerationWorkOrders(index));
- }
-
- /**
- * @brief Check if the rebuild operation is required for a given operator.
- *
- * @param index The index of the given operator in the DAG.
- *
- * @return True if the rebuild operation is required, false otherwise.
- **/
- inline bool checkRebuildRequired(const dag_node_index index) const {
- return query_exec_state_->isRebuildRequired(index);
- }
-
- /**
- * @brief Check if the rebuild operation for a given operator is over.
- *
- * @param index The index of the given operator in the DAG.
- *
- * @return True if the rebuild operation is over, false otherwise.
- **/
- inline bool checkRebuildOver(const dag_node_index index) const {
- return query_exec_state_->hasRebuildInitiated(index) &&
- !workorders_container_->hasRebuildWorkOrder(index) &&
- (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
- }
-
- /**
- * @brief Check if the rebuild operation for a given operator has been
- * initiated.
- *
- * @param index The index of the given operator in the DAG.
- *
- * @return True if the rebuild operation has been initiated, false otherwise.
- **/
- inline bool checkRebuildInitiated(const dag_node_index index) const {
- return query_exec_state_->hasRebuildInitiated(index);
- }
-
- /**
- * @brief Initiate the rebuild process for partially filled blocks generated
- * during the execution of the given operator.
- *
- * @param index The index of the given operator in the DAG.
- *
- * @return True if the rebuild is over immediately, i.e. the operator didn't
- * generate any rebuild WorkOrders, false otherwise.
- **/
- bool initiateRebuild(const dag_node_index index);
-
- /**
- * @brief Get the rebuild WorkOrders for an operator.
- *
- * @note This function should be called only once, when all the normal
- * WorkOrders generated by an operator finish their execution.
- *
- * @param index The index of the operator in the query plan DAG.
- * @param container A pointer to a WorkOrdersContainer to be used to store the
- * generated WorkOrders.
- **/
- void getRebuildWorkOrders(const dag_node_index index,
- WorkOrdersContainer *container);
-
- const tmb::client_id foreman_client_id_;
- const std::size_t query_id_;
-
- CatalogDatabaseLite *catalog_database_;
- StorageManager *storage_manager_;
- tmb::MessageBus *bus_;
-
- DAG<RelationalOperator, bool> *query_dag_;
-
- std::unique_ptr<QueryContext> query_context_;
-
- // For all nodes, store their receiving dependents.
- std::vector<std::vector<dag_node_index>> output_consumers_;
-
- // For all nodes, store their pipeline breaking dependencies (if any).
- std::vector<std::vector<dag_node_index>> blocking_dependencies_;
-
- std::unique_ptr<QueryExecutionState> query_exec_state_;
-
- std::unique_ptr<WorkOrdersContainer> workorders_container_;
-
- DISALLOW_COPY_AND_ASSIGN(QueryManager);
-};
-
-/** @} */
-
-} // namespace quickstep
-
-#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
new file mode 100644
index 0000000..f7e183f
--- /dev/null
+++ b/query_execution/QueryManagerBase.cpp
@@ -0,0 +1,305 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/QueryManagerBase.hpp"
+
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryPlan.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "glog/logging.h"
+
+using std::pair;
+
+namespace quickstep {
+
+QueryManagerBase::QueryManagerBase(QueryHandle *query_handle,
+ CatalogDatabaseLite *catalog_database)
+ : query_id_(DCHECK_NOTNULL(query_handle)->query_id()),
+ catalog_database_(DCHECK_NOTNULL(catalog_database)),
+ query_dag_(DCHECK_NOTNULL(
+ DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())),
+ num_operators_in_dag_(query_dag_->size()),
+ output_consumers_(num_operators_in_dag_),
+ blocking_dependencies_(num_operators_in_dag_),
+ query_exec_state_(new QueryExecutionState(num_operators_in_dag_)) {
+ for (dag_node_index node_index = 0;
+ node_index < num_operators_in_dag_;
+ ++node_index) {
+ const QueryContext::insert_destination_id insert_destination_index =
+ query_dag_->getNodePayload(node_index).getInsertDestinationID();
+ if (insert_destination_index != QueryContext::kInvalidInsertDestinationId) {
+ // Rebuild is necessary whenever InsertDestination is present.
+ query_exec_state_->setRebuildRequired(node_index);
+ query_exec_state_->setRebuildStatus(node_index, 0, false);
+ }
+
+ for (const pair<dag_node_index, bool> &dependent_link :
+ query_dag_->getDependents(node_index)) {
+ const dag_node_index dependent_op_index = dependent_link.first;
+ if (!query_dag_->getLinkMetadata(node_index, dependent_op_index)) {
+ // The link is not a pipeline-breaker. Streaming of blocks is possible
+ // between these two operators.
+ output_consumers_[node_index].push_back(dependent_op_index);
+ } else {
+ // The link is a pipeline-breaker. Streaming of blocks is not possible
+ // between these two operators.
+ blocking_dependencies_[dependent_op_index].push_back(node_index);
+ }
+ }
+ }
+}
+
+QueryManagerBase::QueryStatusCode QueryManagerBase::processMessage(
+ const TaggedMessage &tagged_message) {
+ dag_node_index op_index;
+ switch (tagged_message.message_type()) {
+ case kWorkOrderCompleteMessage: {
+ serialization::NormalWorkOrderCompletionMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+
+ op_index = proto.operator_index();
+ processWorkOrderCompleteMessage(proto.operator_index());
+ break;
+ }
+ case kRebuildWorkOrderCompleteMessage: {
+ serialization::RebuildWorkOrderCompletionMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+
+ op_index = proto.operator_index();
+ processRebuildWorkOrderCompleteMessage(proto.operator_index());
+ break;
+ }
+ case kCatalogRelationNewBlockMessage: {
+ serialization::CatalogRelationNewBlockMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+
+ const block_id block = proto.block_id();
+
+ CatalogRelation *relation =
+ static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
+ relation->addBlock(block);
+
+ if (proto.has_partition_id()) {
+ relation->getPartitionSchemeMutable()->addBlockToPartition(
+ proto.partition_id(), block);
+ }
+ return QueryStatusCode::kNone;
+ }
+ case kDataPipelineMessage: {
+ // Possible message senders include InsertDestinations and some
+ // operators which modify existing blocks.
+ serialization::DataPipelineMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+
+ op_index = proto.operator_index();
+ processDataPipelineMessage(proto.operator_index(),
+ proto.block_id(),
+ proto.relation_id());
+ break;
+ }
+ case kWorkOrdersAvailableMessage: {
+ serialization::WorkOrdersAvailableMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(),
+ tagged_message.message_bytes()));
+
+ op_index = proto.operator_index();
+
+ // Check if new work orders are available.
+ fetchNormalWorkOrders(op_index);
+ break;
+ }
+ case kWorkOrderFeedbackMessage: {
+ WorkOrder::FeedbackMessage msg(
+ const_cast<void *>(tagged_message.message()),
+ tagged_message.message_bytes());
+
+ op_index = msg.header().rel_op_index;
+ processFeedbackMessage(msg);
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unknown message type found in QueryManager";
+ }
+
+ if (query_exec_state_->hasExecutionFinished(op_index)) {
+ return QueryStatusCode::kOperatorExecuted;
+ }
+
+ // As kQueryExecuted takes precedence over kOperatorExecuted, we check again.
+ if (query_exec_state_->hasQueryExecutionFinished()) {
+ return QueryStatusCode::kQueryExecuted;
+ }
+
+ return QueryStatusCode::kNone;
+}
+
+void QueryManagerBase::processFeedbackMessage(
+ const WorkOrder::FeedbackMessage &msg) {
+ RelationalOperator *op =
+ query_dag_->getNodePayloadMutable(msg.header().rel_op_index);
+ op->receiveFeedbackMessage(msg);
+}
+
+void QueryManagerBase::processWorkOrderCompleteMessage(
+ const dag_node_index op_index) {
+ query_exec_state_->decrementNumQueuedWorkOrders(op_index);
+
+ // Check if new work orders are available and fetch them if so.
+ fetchNormalWorkOrders(op_index);
+
+ if (checkRebuildRequired(op_index)) {
+ if (checkNormalExecutionOver(op_index)) {
+ if (!checkRebuildInitiated(op_index)) {
+ if (initiateRebuild(op_index)) {
+ // Rebuild initiated and completed right away.
+ markOperatorFinished(op_index);
+ } else {
+ // Rebuild under progress.
+ }
+ } else if (checkRebuildOver(op_index)) {
+ // Rebuild was under progress and now it is over.
+ markOperatorFinished(op_index);
+ }
+ } else {
+ // Normal execution under progress for this operator.
+ }
+ } else if (checkOperatorExecutionOver(op_index)) {
+ // Rebuild not required for this operator and its normal execution is
+ // complete.
+ markOperatorFinished(op_index);
+ }
+
+ for (const pair<dag_node_index, bool> &dependent_link :
+ query_dag_->getDependents(op_index)) {
+ const dag_node_index dependent_op_index = dependent_link.first;
+ if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+ // Process the dependent operator (of the operator whose WorkOrder
+ // was just executed) for which all the dependencies have been met.
+ processOperator(dependent_op_index, true);
+ }
+ }
+}
+
+void QueryManagerBase::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index) {
+ query_exec_state_->decrementNumRebuildWorkOrders(op_index);
+
+ if (checkRebuildOver(op_index)) {
+ markOperatorFinished(op_index);
+
+ for (const pair<dag_node_index, bool> &dependent_link :
+ query_dag_->getDependents(op_index)) {
+ const dag_node_index dependent_op_index = dependent_link.first;
+ if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+ processOperator(dependent_op_index, true);
+ }
+ }
+ }
+}
+
+void QueryManagerBase::processOperator(const dag_node_index index,
+ const bool recursively_check_dependents) {
+ if (fetchNormalWorkOrders(index)) {
+ // Fetched work orders. Return to wait for the generated work orders to
+ // execute, and skip the execution-finished checks.
+ return;
+ }
+
+ if (checkNormalExecutionOver(index)) {
+ if (checkRebuildRequired(index)) {
+ if (!checkRebuildInitiated(index)) {
+ // Rebuild hasn't started, initiate it.
+ if (initiateRebuild(index)) {
+ // Rebuild initiated and completed right away.
+ markOperatorFinished(index);
+ } else {
+ // Rebuild WorkOrders have been generated.
+ return;
+ }
+ } else if (checkRebuildOver(index)) {
+ // Rebuild had been initiated and it is over.
+ markOperatorFinished(index);
+ }
+ } else {
+ // Rebuild is not required and normal execution over, mark finished.
+ markOperatorFinished(index);
+ }
+ // If we reach here, that means the operator has been marked as finished.
+ if (recursively_check_dependents) {
+ for (const pair<dag_node_index, bool> &dependent_link :
+ query_dag_->getDependents(index)) {
+ const dag_node_index dependent_op_index = dependent_link.first;
+ if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+ processOperator(dependent_op_index, true);
+ }
+ }
+ }
+ }
+}
+
+void QueryManagerBase::processDataPipelineMessage(const dag_node_index op_index,
+ const block_id block,
+ const relation_id rel_id) {
+ for (const dag_node_index consumer_index :
+ output_consumers_[op_index]) {
+ // Feed the streamed block to the consumer. Note that 'output_consumers_'
+ // only contain those dependents of operator with index = op_index which are
+ // eligible to receive streamed input.
+ query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id);
+ // Because of the streamed input just fed, check if there are any new
+ // WorkOrders available and if so, fetch them.
+ fetchNormalWorkOrders(consumer_index);
+ }
+}
+
+void QueryManagerBase::markOperatorFinished(const dag_node_index index) {
+ query_exec_state_->setExecutionFinished(index);
+
+ RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
+ op->updateCatalogOnCompletion();
+
+ const relation_id output_rel = op->getOutputRelationID();
+ for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) {
+ const dag_node_index dependent_op_index = dependent_link.first;
+ RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index);
+ // Signal dependent operator that current operator is done feeding input blocks.
+ if (output_rel >= 0) {
+ dependent_op->doneFeedingInputBlocks(output_rel);
+ }
+ if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+ dependent_op->informAllBlockingDependenciesMet();
+ }
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
new file mode 100644
index 0000000..9e192c8
--- /dev/null
+++ b/query_execution/QueryManagerBase.hpp
@@ -0,0 +1,312 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/DAG.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A base class that manages the execution of a query including
+ * generation of new work orders, and keeping track of the query
+ * exection state.
+ **/
+class QueryManagerBase {
+ public:
+ typedef DAG<RelationalOperator, bool>::size_type_nodes dag_node_index;
+
+ /**
+ * @brief Return codes for processMessage() function.
+ *
+ * @note When both operator and query get executed, kQueryExecuted takes
+ * precedence over kOperatorExecuted.
+ **/
+ enum class QueryStatusCode {
+ kOperatorExecuted = 0, // An operator in the query finished execution.
+ kQueryExecuted, // The query got executed.
+ kNone // None of the above.
+ };
+
+ /**
+ * @brief Constructor.
+ *
+ * @param query_handle The QueryHandle object for this query.
+ * @param catalog_database The CatalogDatabse used by the query.
+ **/
+ QueryManagerBase(QueryHandle *query_handle,
+ CatalogDatabaseLite *catalog_database);
+
+ /**
+ * @brief Virtual destructor.
+ **/
+ virtual ~QueryManagerBase() {}
+
+ /**
+ * @brief Process a message sent to the QueryManager.
+ *
+ * @param tagged_message TaggedMessage sent to the QueryManager.
+ *
+ * @return QueryStatusCode as determined after the message is processed.
+ **/
+ QueryStatusCode processMessage(const TaggedMessage &tagged_message);
+
+ /**
+ * @brief Get the QueryExecutionState for this query.
+ **/
+ inline const QueryExecutionState& getQueryExecutionState() const {
+ return *query_exec_state_;
+ }
+
+ protected:
+ /**
+ * @brief Process the received WorkOrder complete message.
+ *
+ * @param node_index The index of the specified operator node in the query DAG
+ * for the completed WorkOrder.
+ **/
+ void processWorkOrderCompleteMessage(const dag_node_index op_index);
+
+ /**
+ * @brief Process the received RebuildWorkOrder complete message.
+ *
+ * @param node_index The index of the specified operator node in the query DAG
+ * for the completed RebuildWorkOrder.
+ **/
+ void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index);
+
+ /**
+ * @brief Process a current relational operator: Get its workorders and store
+ * them in the WorkOrdersContainer for this query. If the operator can
+ * be marked as done, do so.
+ *
+ * @param index The index of the relational operator to be processed in the
+ * query plan DAG.
+ * @param recursively_check_dependents If an operator is done, should we
+ * call processOperator on its dependents recursively.
+ **/
+ void processOperator(const dag_node_index index,
+ const bool recursively_check_dependents);
+
+ /**
+ * @brief Process the received data pipeline message.
+ *
+ * @param node_index The index of the specified operator node in the query DAG
+ * for the pipelining block.
+ * @param block The block id.
+ * @param rel_id The ID of the relation that produced 'block'.
+ **/
+ void processDataPipelineMessage(const dag_node_index op_index,
+ const block_id block,
+ const relation_id rel_id);
+
+ /**
+ * @brief Process the received work order feedback message and notify
+ * relational operator.
+ *
+ * @param message Feedback message from work order.
+ **/
+ void processFeedbackMessage(const WorkOrder::FeedbackMessage &message);
+
+ /**
+ * @brief This function does the following things:
+ * 1. Mark the given relational operator as "done".
+ * 2. For all the dependents of this operator, check if all of their
+ * blocking dependencies are met. If so inform them that the blocking
+ * dependencies are met.
+ * 3. Check if the given operator is done producing output. If it's
+ * done, inform the dependents that they won't receive input anymore
+ * from the given operator.
+ *
+ * @param index The index of the given relational operator in the DAG.
+ **/
+ void markOperatorFinished(const dag_node_index index);
+
+ /**
+ * @brief Check if all the dependencies of the node at specified index have
+ * finished their execution.
+ *
+ * @note This function's true return value is a pre-requisite for calling
+ * getRebuildWorkOrders()
+ *
+ * @param node_index The index of the specified node in the query DAG.
+ *
+ * @return True if all the dependencies have finished their execution. False
+ * otherwise.
+ **/
+ inline bool checkAllDependenciesMet(const dag_node_index node_index) const {
+ for (const dag_node_index dependency_index :
+ query_dag_->getDependencies(node_index)) {
+ // If at least one of the dependencies is not met, return false.
+ if (!query_exec_state_->hasExecutionFinished(dependency_index)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * @brief Check if all the blocking dependencies of the node at specified
+ * index have finished their execution.
+ *
+ * @note A blocking dependency is the one which is pipeline breaker. Output of
+ * a dependency can't be streamed to its dependent if the link between
+ * them is pipeline breaker.
+ *
+ * @param node_index The index of the specified node in the query DAG.
+ *
+ * @return True if all the blocking dependencies have finished their
+ * execution. False otherwise.
+ **/
+ inline bool checkAllBlockingDependenciesMet(
+ const dag_node_index node_index) const {
+ for (const dag_node_index blocking_dependency_index :
+ blocking_dependencies_[node_index]) {
+ if (!query_exec_state_->hasExecutionFinished(
+ blocking_dependency_index)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * @brief Check if the execution of the given operator is over.
+ *
+ * @param index The index of the given operator in the DAG.
+ *
+ * @return True if the execution of the given operator is over, false
+ * otherwise.
+ **/
+ inline bool checkOperatorExecutionOver(const dag_node_index index) const {
+ return this->checkNormalExecutionOver(index) &&
+ (!checkRebuildRequired(index) || this->checkRebuildOver(index));
+ }
+
+ /**
+ * @brief Check if the rebuild operation is required for a given operator.
+ *
+ * @param index The index of the given operator in the DAG.
+ *
+ * @return True if the rebuild operation is required, false otherwise.
+ **/
+ inline bool checkRebuildRequired(const dag_node_index index) const {
+ return query_exec_state_->isRebuildRequired(index);
+ }
+
+ /**
+ * @brief Check if the rebuild operation for a given operator has been
+ * initiated.
+ *
+ * @param index The index of the given operator in the DAG.
+ *
+ * @return True if the rebuild operation has been initiated, false otherwise.
+ **/
+ inline bool checkRebuildInitiated(const dag_node_index index) const {
+ return query_exec_state_->hasRebuildInitiated(index);
+ }
+
+ const std::size_t query_id_;
+
+ CatalogDatabaseLite *catalog_database_;
+
+ DAG<RelationalOperator, bool> *query_dag_;
+ const dag_node_index num_operators_in_dag_;
+
+ // For all nodes, store their receiving dependents.
+ std::vector<std::vector<dag_node_index>> output_consumers_;
+
+ // For all nodes, store their pipeline breaking dependencies (if any).
+ std::vector<std::vector<dag_node_index>> blocking_dependencies_;
+
+ std::unique_ptr<QueryExecutionState> query_exec_state_;
+
+ private:
+ /**
+ * @brief Fetch all work orders currently available in relational operator and
+ * store them internally.
+ *
+ * @param index The index of the relational operator to be processed in the
+ * query plan DAG.
+ *
+ * @return Whether any work order was generated by op.
+ **/
+ virtual bool fetchNormalWorkOrders(const dag_node_index index) = 0;
+
+ /**
+ * @brief Check if the given operator's normal execution is over.
+ *
+ * @note The conditions for a given operator's normal execution to get over:
+ * 1. All of its normal (i.e. non rebuild) WorkOrders have finished
+ * execution.
+ * 2. The operator is done generating work orders.
+ * 3. All of the dependencies of the given operator have been met.
+ *
+ * @param index The index of the given operator in the DAG.
+ *
+ * @return True if the normal execution of the given operator is over, false
+ * otherwise.
+ **/
+ virtual bool checkNormalExecutionOver(const dag_node_index index) const = 0;
+
+ /**
+ * @brief Initiate the rebuild process for partially filled blocks generated
+ * during the execution of the given operator.
+ *
+ * @param index The index of the given operator in the DAG.
+ *
+ * @return True if the rebuild is over immediately, i.e. the operator didn't
+ * generate any rebuild WorkOrders, false otherwise.
+ **/
+ virtual bool initiateRebuild(const dag_node_index index) = 0;
+
+ /**
+ * @brief Check if the rebuild operation for a given operator is over.
+ *
+ * @param index The index of the given operator in the DAG.
+ *
+ * @return True if the rebuild operation is over, false otherwise.
+ **/
+ virtual bool checkRebuildOver(const dag_node_index index) const = 0;
+
+ DISALLOW_COPY_AND_ASSIGN(QueryManagerBase);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/QueryManagerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
new file mode 100644
index 0000000..193b188
--- /dev/null
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -0,0 +1,194 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/QueryManagerSingleNode.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "relational_operators/RebuildWorkOrder.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "storage/InsertDestination.hpp"
+#include "storage/StorageBlock.hpp"
+#include "utility/DAG.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+class WorkOrder;
+
+QueryManagerSingleNode::QueryManagerSingleNode(
+ const tmb::client_id foreman_client_id,
+ const std::size_t num_numa_nodes,
+ QueryHandle *query_handle,
+ CatalogDatabaseLite *catalog_database,
+ StorageManager *storage_manager,
+ tmb::MessageBus *bus)
+ : QueryManagerBase(query_handle, catalog_database),
+ foreman_client_id_(foreman_client_id),
+ storage_manager_(DCHECK_NOTNULL(storage_manager)),
+ bus_(DCHECK_NOTNULL(bus)),
+ query_context_(new QueryContext(query_handle->getQueryContextProto(),
+ *catalog_database_,
+ storage_manager_,
+ foreman_client_id_,
+ bus_)),
+ workorders_container_(
+ new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)) {
+ // Collect all the workorders from all the relational operators in the DAG.
+ for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
+ if (checkAllBlockingDependenciesMet(index)) {
+ query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
+ processOperator(index, false);
+ }
+ }
+}
+
+WorkerMessage* QueryManagerSingleNode::getNextWorkerMessage(
+ const dag_node_index start_operator_index, const numa_node_id numa_node) {
+ // Default policy: Operator with lowest index first.
+ WorkOrder *work_order = nullptr;
+ size_t num_operators_checked = 0;
+ for (dag_node_index index = start_operator_index;
+ num_operators_checked < num_operators_in_dag_;
+ index = (index + 1) % num_operators_in_dag_, ++num_operators_checked) {
+ if (query_exec_state_->hasExecutionFinished(index)) {
+ continue;
+ }
+ if (numa_node != kAnyNUMANodeID) {
+ // First try to get a normal WorkOrder from the specified NUMA node.
+ work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node);
+ if (work_order != nullptr) {
+ // A WorkOrder found on the given NUMA node.
+ query_exec_state_->incrementNumQueuedWorkOrders(index);
+ return WorkerMessage::WorkOrderMessage(work_order, index);
+ } else {
+ // Normal workorder not found on this node. Look for a rebuild workorder
+ // on this NUMA node.
+ work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node);
+ if (work_order != nullptr) {
+ return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
+ }
+ }
+ }
+ // Either no workorder found on the given NUMA node, or numa_node is
+ // 'kAnyNUMANodeID'.
+ // Try to get a normal WorkOrder from other NUMA nodes.
+ work_order = workorders_container_->getNormalWorkOrder(index);
+ if (work_order != nullptr) {
+ query_exec_state_->incrementNumQueuedWorkOrders(index);
+ return WorkerMessage::WorkOrderMessage(work_order, index);
+ } else {
+ // Normal WorkOrder not found, look for a RebuildWorkOrder.
+ work_order = workorders_container_->getRebuildWorkOrder(index);
+ if (work_order != nullptr) {
+ return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
+ }
+ }
+ }
+ // No WorkOrders available right now.
+ return nullptr;
+}
+
+bool QueryManagerSingleNode::fetchNormalWorkOrders(const dag_node_index index) {
+ bool generated_new_workorders = false;
+ if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
+ // Do not fetch any work units until all blocking dependencies are met.
+ // The releational operator is not aware of blocking dependencies for
+ // uncorrelated scalar queries.
+ if (!checkAllBlockingDependenciesMet(index)) {
+ return false;
+ }
+ const size_t num_pending_workorders_before =
+ workorders_container_->getNumNormalWorkOrders(index);
+ const bool done_generation =
+ query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(),
+ query_context_.get(),
+ storage_manager_,
+ foreman_client_id_,
+ bus_);
+ if (done_generation) {
+ query_exec_state_->setDoneGenerationWorkOrders(index);
+ }
+
+ // TODO(shoban): It would be a good check to see if operator is making
+ // useful progress, i.e., the operator either generates work orders to
+ // execute or still has pending work orders executing. However, this will not
+ // work if Foreman polls operators without feeding data. This check can be
+ // enabled, if Foreman is refactored to call getAllWorkOrders() only when
+ // pending work orders are completed or new input blocks feed.
+
+ generated_new_workorders =
+ (num_pending_workorders_before <
+ workorders_container_->getNumNormalWorkOrders(index));
+ }
+ return generated_new_workorders;
+}
+
+bool QueryManagerSingleNode::initiateRebuild(const dag_node_index index) {
+ DCHECK(!workorders_container_->hasRebuildWorkOrder(index));
+ DCHECK(checkRebuildRequired(index));
+ DCHECK(!checkRebuildInitiated(index));
+
+ getRebuildWorkOrders(index, workorders_container_.get());
+
+ query_exec_state_->setRebuildStatus(
+ index, workorders_container_->getNumRebuildWorkOrders(index), true);
+
+ return (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
+}
+
+void QueryManagerSingleNode::getRebuildWorkOrders(const dag_node_index index,
+ WorkOrdersContainer *container) {
+ const RelationalOperator &op = query_dag_->getNodePayload(index);
+ const QueryContext::insert_destination_id insert_destination_index = op.getInsertDestinationID();
+
+ if (insert_destination_index == QueryContext::kInvalidInsertDestinationId) {
+ return;
+ }
+
+ std::vector<MutableBlockReference> partially_filled_block_refs;
+
+ DCHECK(query_context_ != nullptr);
+ InsertDestination *insert_destination = query_context_->getInsertDestination(insert_destination_index);
+ DCHECK(insert_destination != nullptr);
+
+ insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
+
+ for (std::vector<MutableBlockReference>::size_type i = 0;
+ i < partially_filled_block_refs.size();
+ ++i) {
+ container->addRebuildWorkOrder(
+ new RebuildWorkOrder(query_id_,
+ std::move(partially_filled_block_refs[i]),
+ index,
+ op.getOutputRelationID(),
+ foreman_client_id_,
+ bus_),
+ index);
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/QueryManagerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp
new file mode 100644
index 0000000..5533f06
--- /dev/null
+++ b/query_execution/QueryManagerSingleNode.hpp
@@ -0,0 +1,140 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_
+
+#include <cstddef>
+#include <memory>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+class StorageManager;
+class WorkerMessage;
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief A class that manages the execution of a query including generation
+ * of new work orders, keeping track of the query exection state.
+ **/
+class QueryManagerSingleNode final : public QueryManagerBase {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param foreman_client_id The TMB client ID of the foreman thread.
+ * @param num_numa_nodes The number of NUMA nodes used by the system.
+ * @param query_handle The QueryHandle object for this query.
+ * @param catalog_database The CatalogDatabse used by the query.
+ * @param storage_manager The StorageManager used by the query.
+ * @param bus The TMB used for communication.
+ **/
+ QueryManagerSingleNode(const tmb::client_id foreman_client_id,
+ const std::size_t num_numa_nodes,
+ QueryHandle *query_handle,
+ CatalogDatabaseLite *catalog_database,
+ StorageManager *storage_manager,
+ tmb::MessageBus *bus);
+
+ ~QueryManagerSingleNode() override {}
+
+ /**
+ * @brief Get the next workorder to be excuted, wrapped in a WorkerMessage.
+ *
+ * @param start_operator_index Begin the search for the schedulable WorkOrder
+ * with the operator at this index.
+ * @param numa_node The next WorkOrder should preferably have its input(s)
+ * from this numa_node. This is a hint and not a binding requirement.
+ *
+ * @return A pointer to the WorkerMessage. If there's no WorkOrder to be
+ * executed, return NULL.
+ **/
+ WorkerMessage* getNextWorkerMessage(
+ const dag_node_index start_operator_index,
+ const numa_node_id node_id = kAnyNUMANodeID);
+
+ /**
+ * @brief Get a pointer to the QueryContext.
+ **/
+ inline QueryContext* getQueryContextMutable() {
+ return query_context_.get();
+ }
+
+ private:
+ bool fetchNormalWorkOrders(const dag_node_index index) override;
+
+ bool checkNormalExecutionOver(const dag_node_index index) const override {
+ return (checkAllDependenciesMet(index) &&
+ !workorders_container_->hasNormalWorkOrder(index) &&
+ query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
+ query_exec_state_->hasDoneGenerationWorkOrders(index));
+ }
+
+ bool initiateRebuild(const dag_node_index index) override;
+
+ bool checkRebuildOver(const dag_node_index index) const override {
+ return query_exec_state_->hasRebuildInitiated(index) &&
+ !workorders_container_->hasRebuildWorkOrder(index) &&
+ (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
+ }
+
+ /**
+ * @brief Get the rebuild WorkOrders for an operator.
+ *
+ * @note This function should be called only once, when all the normal
+ * WorkOrders generated by an operator finish their execution.
+ *
+ * @param index The index of the operator in the query plan DAG.
+ * @param container A pointer to a WorkOrdersContainer to be used to store the
+ * generated WorkOrders.
+ **/
+ void getRebuildWorkOrders(const dag_node_index index,
+ WorkOrdersContainer *container);
+
+ const tmb::client_id foreman_client_id_;
+
+ StorageManager *storage_manager_;
+ tmb::MessageBus *bus_;
+
+ std::unique_ptr<QueryContext> query_context_;
+
+ std::unique_ptr<WorkOrdersContainer> workorders_container_;
+
+ DISALLOW_COPY_AND_ASSIGN(QueryManagerSingleNode);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_