You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/07/13 02:04:30 UTC

[1/2] incubator-quickstep git commit: Introduced QueryManagerBase, and renamed QueryManagerSingleNode.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 7671a5893 -> b4e25edce


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/tests/QueryManagerSingleNode_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
new file mode 100644
index 0000000..52cee20
--- /dev/null
+++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
@@ -0,0 +1,942 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include <climits>
+#include <cstdlib>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryContext.pb.h"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryManagerSingleNode.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryPlan.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/InsertDestination.hpp"
+#include "storage/InsertDestination.pb.h"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/DAG.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/tagged_message.h"
+
+namespace tmb { class MessageBus; }
+
+using std::move;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::client_id;
+
+namespace quickstep {
+
+class WorkOrderProtosContainer;
+
+class MockWorkOrder : public WorkOrder {
+ public:
+  explicit MockWorkOrder(const int op_index)
+      : WorkOrder(0), op_index_(op_index) {}
+
+  void execute() override {
+    VLOG(3) << "WorkOrder[" << op_index_ << "] executing.";
+  }
+
+  inline QueryPlan::DAGNodeIndex getOpIndex() const {
+    return op_index_;
+  }
+
+ private:
+  const QueryPlan::DAGNodeIndex op_index_;
+
+  DISALLOW_COPY_AND_ASSIGN(MockWorkOrder);
+};
+
+class MockOperator: public RelationalOperator {
+ public:
+  enum function_name {
+    kFeedInputBlock = 0,
+    kFeedInputBlocks,
+    kDoneFeedingInputBlocks,
+    kGetAllWorkOrders
+  };
+
+  MockOperator(const bool produce_workorders,
+               const bool has_streaming_input,
+               const int max_getworkorder_iters = 1,
+               const int max_workorders = INT_MAX)
+      : RelationalOperator(0 /* Query Id */),
+        produce_workorders_(produce_workorders),
+        has_streaming_input_(has_streaming_input),
+        max_workorders_(max_workorders),
+        max_getworkorder_iters_(max_getworkorder_iters),
+        num_calls_get_workorders_(0),
+        num_workorders_generated_(0),
+        num_calls_feedblock_(0),
+        num_calls_feedblocks_(0),
+        num_calls_donefeedingblocks_(0) {
+  }
+
+#define MOCK_OP_LOG(x) VLOG(x) << "Op[" << op_index_ << "]: " << __func__ << ": "
+
+  // The methods below are used to check whether QueryManager calls the Relational
+  // operator, how many times it calls a particular method etc.
+  inline int getNumWorkOrders() const {
+    return num_workorders_generated_;
+  }
+
+  inline int getNumCalls(const function_name fname) const {
+    switch (fname) {
+      case kFeedInputBlock:
+        return num_calls_feedblock_;
+      case kFeedInputBlocks:
+        return num_calls_feedblocks_;
+      case kDoneFeedingInputBlocks:
+        return num_calls_donefeedingblocks_;
+      case kGetAllWorkOrders:
+        return num_calls_get_workorders_;
+      default:
+        return -1;
+    }
+  }
+
+  inline bool getBlockingDependenciesMet() const {
+    MOCK_OP_LOG(3) << "met.";
+    return blocking_dependencies_met_;
+  }
+
+  void setInsertDestinationID(const QueryContext::insert_destination_id insert_destination_index) {
+    insert_destination_index_ = insert_destination_index;
+  }
+
+  // Mock to trigger doneFeedingInputBlocks for the dependent operators
+  // in QueryManager::markOperatorFinished.
+  void setOutputRelationID(const relation_id rel_id) {
+    output_relation_id_ = rel_id;
+  }
+
+  // Override methods from the base class.
+  bool getAllWorkOrders(
+      WorkOrdersContainer *container,
+      QueryContext *query_context,
+      StorageManager *storage_manager,
+      const tmb::client_id foreman_client_id,
+      tmb::MessageBus *bus) override {
+    ++num_calls_get_workorders_;
+    if (produce_workorders_) {
+      if (has_streaming_input_) {
+        if ((num_calls_feedblock_ > 0 || num_calls_feedblocks_ > 0) && (num_workorders_generated_ < max_workorders_)) {
+          MOCK_OP_LOG(3) << "[stream] generate WorkOrder";
+          container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
+          ++num_workorders_generated_;
+        }
+      } else {
+        if (blocking_dependencies_met_ && (num_workorders_generated_ < max_workorders_)) {
+          MOCK_OP_LOG(3) << "[static] generate WorkOrder";
+          container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
+          ++num_workorders_generated_;
+        }
+      }
+    }
+    MOCK_OP_LOG(3) << "count(" << num_calls_get_workorders_ << ") "
+                   << "return(" << (num_calls_get_workorders_ == max_getworkorder_iters_) << ")";
+    return num_calls_get_workorders_ == max_getworkorder_iters_;
+  }
+
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override {
+    return true;
+  }
+
+  void feedInputBlock(const block_id input_block_id,
+                      const relation_id input_relation_id) override {
+    ++num_calls_feedblock_;
+    MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")";
+  }
+
+  void feedInputBlocks(const relation_id rel_id,
+                       std::vector<block_id> *partially_filled_blocks) override {
+    ++num_calls_feedblocks_;
+    MOCK_OP_LOG(3) << "count(" << num_calls_feedblocks_ << ")";
+  }
+
+  void doneFeedingInputBlocks(const relation_id rel_id) override {
+    ++num_calls_donefeedingblocks_;
+    MOCK_OP_LOG(3) << "count(" << num_calls_donefeedingblocks_ << ")";
+  }
+
+  QueryContext::insert_destination_id getInsertDestinationID() const override {
+    return insert_destination_index_;
+  }
+
+  const relation_id getOutputRelationID() const override {
+    return output_relation_id_;
+  }
+
+ private:
+  const bool produce_workorders_;
+  const bool has_streaming_input_;
+  const int max_workorders_;
+  const int max_getworkorder_iters_;
+
+  int num_calls_get_workorders_;
+  int num_workorders_generated_;
+  int num_calls_feedblock_;
+  int num_calls_feedblocks_;
+  int num_calls_donefeedingblocks_;
+
+  QueryContext::insert_destination_id insert_destination_index_ = QueryContext::kInvalidInsertDestinationId;
+
+  relation_id output_relation_id_ = -1;
+
+#undef MOCK_OP_LOG
+
+  DISALLOW_COPY_AND_ASSIGN(MockOperator);
+};
+
+
+class QueryManagerTest : public ::testing::Test {
+ protected:
+  virtual void SetUp() {
+    db_.reset(new CatalogDatabase(nullptr /* catalog */, "database"));
+    storage_manager_.reset(new StorageManager("./"));
+    bus_.Initialize();
+    query_handle_.reset(new QueryHandle(0));  // dummy query ID.
+    query_plan_ = query_handle_->getQueryPlanMutable();
+    query_handle_->getQueryContextProtoMutable()->set_query_id(query_handle_->query_id());
+  }
+
+  inline void constructQueryManager() {
+    query_manager_.reset(new QueryManagerSingleNode(
+        0, 1, query_handle_.get(), db_.get(), storage_manager_.get(), &bus_));
+  }
+
+  inline const int getNumWorkOrdersInExecution(const QueryPlan::DAGNodeIndex index) const {
+    return query_manager_->getQueryExecutionState().getNumQueuedWorkOrders(index);
+  }
+
+  inline const int getNumOperatorsFinished() const {
+    return query_manager_->getQueryExecutionState().getNumOperatorsFinished();
+  }
+
+  inline bool getOperatorFinishedStatus(const QueryPlan::DAGNodeIndex index) const {
+    return query_manager_->getQueryExecutionState().hasExecutionFinished(index);
+  }
+
+  inline bool placeDataPipelineMessage(const QueryPlan::DAGNodeIndex source_operator_index) {
+    VLOG(3) << "Place DataPipeline message for Op[" << source_operator_index << "]";
+    serialization::DataPipelineMessage proto;
+    proto.set_operator_index(source_operator_index);
+
+    proto.set_block_id(0);  // dummy block ID
+    proto.set_relation_id(0);  // dummy relation ID.
+    proto.set_query_id(0);  // dummy query ID.
+
+    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
+    const std::size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
+                                      proto_length,
+                                      kDataPipelineMessage);
+    std::free(proto_bytes);
+    query_manager_->processMessage(tagged_message);
+    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
+  }
+
+  inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
+    VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]";
+    TaggedMessage tagged_message;
+    serialization::NormalWorkOrderCompletionMessage proto;
+    proto.set_operator_index(index);
+    proto.set_worker_thread_index(1);  // dummy worker ID.
+    proto.set_query_id(0);  // dummy query ID.
+
+    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
+    const size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kWorkOrderCompleteMessage);
+    std::free(proto_bytes);
+    query_manager_->processMessage(message);
+
+    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
+  }
+
+  inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
+    VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]";
+    serialization::RebuildWorkOrderCompletionMessage proto;
+    proto.set_operator_index(index);
+    proto.set_worker_thread_index(1);  // dummy worker thread ID.
+    proto.set_query_id(0);  // dummy query ID.
+
+    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
+    const size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kRebuildWorkOrderCompleteMessage);
+
+    std::free(proto_bytes);
+    query_manager_->processMessage(message);
+
+    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
+  }
+
+  inline bool placeOutputBlockMessage(const QueryPlan::DAGNodeIndex index) {
+    VLOG(3) << "Place OutputBlock message for Op[" << index << "]";
+    serialization::DataPipelineMessage proto;
+    proto.set_operator_index(index);
+
+    proto.set_block_id(0);  // dummy block ID
+    proto.set_relation_id(0);  // dummy relation ID.
+    proto.set_query_id(0);  // dummy query ID.
+
+    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
+    const std::size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
+                                      proto_length,
+                                      kDataPipelineMessage);
+    std::free(proto_bytes);
+    query_manager_->processMessage(tagged_message);
+    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
+  }
+
+  unique_ptr<CatalogDatabase> db_;
+  unique_ptr<StorageManager> storage_manager_;
+
+  QueryPlan *query_plan_;
+  unique_ptr<QueryHandle> query_handle_;
+  unique_ptr<QueryManagerSingleNode> query_manager_;
+
+  MessageBusImpl bus_;
+
+  client_id worker_client_id_;
+
+  unique_ptr<WorkerDirectory> workers_;
+};
+
+TEST_F(QueryManagerTest, SingleNodeDAGNoWorkOrdersTest) {
+  // This test creates a DAG of a single node. No workorders are generated.
+  query_plan_->addRelationalOperator(new MockOperator(false, false));
+
+  const MockOperator &op = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(0));
+
+  constructQueryManager();
+
+  // op doesn't have any dependencies.
+  EXPECT_TRUE(op.getBlockingDependenciesMet());
+
+  // We expect one call for op's getAllWorkOrders().
+  EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
+}
+
+TEST_F(QueryManagerTest, SingleNodeDAGStaticWorkOrdersTest) {
+  // This test creates a DAG of a single node. Static workorders are generated.
+  const QueryPlan::DAGNodeIndex id =
+      query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
+
+  const MockOperator &op = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(id));
+
+  constructQueryManager();
+
+  // op doesn't have any dependencies.
+  EXPECT_TRUE(op.getBlockingDependenciesMet());
+
+  // We expect one call for op's getAllWorkOrders().
+  EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
+
+  // One workorder is generated.
+  EXPECT_EQ(1, op.getNumWorkOrders());
+
+  unique_ptr<WorkerMessage> worker_message;
+  worker_message.reset(query_manager_->getNextWorkerMessage(0, -1));
+  EXPECT_TRUE(worker_message != nullptr);
+
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+  EXPECT_EQ(0u, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
+  EXPECT_EQ(0, getNumOperatorsFinished());
+
+  // Send a message to QueryManager upon workorder completion.
+  // Last event processed by QueryManager.
+  EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
+
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
+  EXPECT_EQ(1, getNumOperatorsFinished());
+  EXPECT_TRUE(getOperatorFinishedStatus(id));
+}
+
+TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
+  // This test creates a DAG of a single node. WorkOrders are generated
+  // dynamically as pending work orders complete execution, i.e.,
+  // getAllWorkOrders() is called multiple times.  getAllWorkOrders() will be
+  // called 5 times and 3 work orders will be returned, i.e., 1st 3 calls to
+  // getAllWorkOrders() insert 1 WorkOrder and return false, and the next will
+  // insert no WorkOrder and return true.
+
+  // TODO(shoban): This test can not be more robust than this because of fixed
+  // scaffolding of mocking. If we use gMock, we can do much better.
+  const QueryPlan::DAGNodeIndex id =
+      query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3));
+
+  const MockOperator &op = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(id));
+
+  constructQueryManager();
+
+  // op doesn't have any dependencies.
+  EXPECT_TRUE(op.getBlockingDependenciesMet());
+
+  for (int i = 0; i < 3; ++i) {
+    // We expect one call for op's getAllWorkOrders().
+    EXPECT_EQ(i + 1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
+
+    // One workorder is generated.
+    // EXPECT_EQ(1, getWorkerInputQueueSize());
+    EXPECT_EQ(i + 1, op.getNumWorkOrders());
+
+    unique_ptr<WorkerMessage> worker_message;
+    worker_message.reset(query_manager_->getNextWorkerMessage(id, -1));
+
+    EXPECT_TRUE(worker_message != nullptr);
+    EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+              worker_message->getType());
+    EXPECT_EQ(id, worker_message->getRelationalOpIndex());
+
+    delete worker_message->getWorkOrder();
+
+    EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
+    EXPECT_EQ(0, getNumOperatorsFinished());
+
+    if (i < 2) {
+      // Send a message to QueryManager upon workorder completion.
+      EXPECT_FALSE(placeWorkOrderCompleteMessage(id));
+    } else {
+      // Send a message to QueryManager upon workorder completion.
+      // Last event.
+      EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
+    }
+  }
+
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
+
+  EXPECT_EQ(1, getNumOperatorsFinished());
+  EXPECT_TRUE(getOperatorFinishedStatus(id));
+
+  // We place this check in the end, since it's true throughout the test.
+  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
+}
+
+TEST_F(QueryManagerTest, TwoNodesDAGBlockingLinkTest) {
+  // We use two nodes in the DAG with a blocking link between them.
+  // There is no streaming of data involved in this test.
+  const QueryPlan::DAGNodeIndex id1 =
+      query_plan_->addRelationalOperator(new MockOperator(true, false));
+  const QueryPlan::DAGNodeIndex id2 =
+      query_plan_->addRelationalOperator(new MockOperator(true, false));
+
+  // Create a blocking link.
+  query_plan_->addDirectDependency(id2, id1, true);
+
+  static_cast<MockOperator *>(
+      query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
+          ->setOutputRelationID(0xdead);
+
+  const MockOperator &op1 = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(id1));
+  const MockOperator &op2 = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(id2));
+
+  constructQueryManager();
+
+  // op1 doesn't have any dependencies
+  EXPECT_TRUE(op1.getBlockingDependenciesMet());
+
+  // Only op1 should receive a call to getAllWorkOrders initially.
+  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
+
+  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
+
+  // Only op1 should produce a workorder.
+  EXPECT_EQ(1, op1.getNumWorkOrders());
+  EXPECT_EQ(0, op2.getNumWorkOrders());
+
+  // Foreman hasn't yet got workorder completion response for the workorder.
+  unique_ptr<WorkerMessage> worker_message;
+  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
+
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  EXPECT_EQ(1, getNumWorkOrdersInExecution(id1));
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
+  EXPECT_EQ(0, getNumOperatorsFinished());
+
+  // Send a message to Foreman upon workorder (generated by op1) completion.
+  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
+
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
+  // op1 is over now, op2 still to go.
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+  EXPECT_EQ(1, getNumOperatorsFinished());
+
+  EXPECT_TRUE(getOperatorFinishedStatus(id1));
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+  EXPECT_FALSE(getOperatorFinishedStatus(id2));
+
+  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
+
+  // op1 is op2's blocking dependency.
+  EXPECT_TRUE(op2.getBlockingDependenciesMet());
+
+  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+  // op2 should get first call of getAllWorkOrders() when op1 is over.
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+
+  EXPECT_EQ(1, op2.getNumWorkOrders());
+
+  // Send a message to QueryManager upon workorder (generated by op2) completion.
+  // Note that the worker hasn't yet popped the workorder. Usually this won't
+  // happen as workers pop workorders first, execute and then send the response.
+  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
+
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
+
+  EXPECT_EQ(2, getNumOperatorsFinished());
+  EXPECT_TRUE(getOperatorFinishedStatus(id1));
+  EXPECT_TRUE(getOperatorFinishedStatus(id2));
+
+  // Expect no additional calls to getAllWorkOrders.
+  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+}
+
+TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) {
+  // We use two nodes in the DAG with a non-blocking link between them.
+  // We stream output of op1 to op2. Sequeuce of events is as follows:
+  // 1. op1 creates a workorder.
+  // 2. We send a "block full" (from op1) to QueryManager.
+  // 3. op2 creates a workorder because of step 2.
+  const QueryPlan::DAGNodeIndex id1 =
+      query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
+  const QueryPlan::DAGNodeIndex id2 =
+      query_plan_->addRelationalOperator(new MockOperator(true, true, 3));
+
+  // Create a non-blocking link.
+  query_plan_->addDirectDependency(id2, id1, false);
+
+  static_cast<MockOperator *>(
+      query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
+      ->setOutputRelationID(0xdead);
+
+  const MockOperator &op1 = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(id1));
+  const MockOperator &op2 = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(id2));
+
+  constructQueryManager();
+
+  // As none of the operators have a blocking link, blocking dependencies should
+  // be met.
+  EXPECT_TRUE(op1.getBlockingDependenciesMet());
+  EXPECT_TRUE(op2.getBlockingDependenciesMet());
+
+  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(1, op1.getNumWorkOrders());
+  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
+
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  // op2 will generate workorder only after receiving a streaming input.
+  EXPECT_EQ(0, op2.getNumWorkOrders());
+  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
+
+  unique_ptr<WorkerMessage> worker_message;
+  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
+
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  // Send a message to QueryManager upon block getting full (output of op1).
+  EXPECT_FALSE(placeOutputBlockMessage(id1));
+
+  // op1 is not finished yet because the response of workorder completion hasn't
+  // been received yet by the QueryManager.
+  EXPECT_FALSE(getOperatorFinishedStatus(id1));
+  EXPECT_FALSE(getOperatorFinishedStatus(id2));
+
+  // No additional call to op1's getAllWorkOrders.
+  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
+
+  // Output from op1 should be fed to op2.
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
+
+  // A call to op2's getAllWorkOrders because of the streamed input.
+  EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(1, op2.getNumWorkOrders());
+
+  // Place a message of a workorder completion of op1 on Foreman's input queue.
+  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
+
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
+  EXPECT_TRUE(getOperatorFinishedStatus(id1));
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+
+  // An additional call to op2's getAllWorkOrders because of completion of op1.
+  EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(2, op2.getNumWorkOrders());
+
+  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
+
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  // Place a message of a workorder completion of op2 on Foreman's input queue.
+  EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));
+
+  EXPECT_TRUE(getOperatorFinishedStatus(id1));
+
+  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
+
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
+  EXPECT_FALSE(getOperatorFinishedStatus(id2));
+
+  // Send a message to Foreman upon workorder (generated by op2) completion.
+  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
+
+  EXPECT_TRUE(getOperatorFinishedStatus(id1));
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
+  EXPECT_TRUE(getOperatorFinishedStatus(id2));
+}
+
+TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) {
+  // In this test, we create a 2-node DAG with a non-blocking link between them.
+  // There is no streaming of data from op1 to op2 during the execution of op1.
+  // op1 produces a partially filled block at the end of its execution which is
+  // rebuilt and then fed to op2.
+  const QueryPlan::DAGNodeIndex id1 =
+      query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
+  const QueryPlan::DAGNodeIndex id2 =
+      query_plan_->addRelationalOperator(new MockOperator(true, true, 3, 1));
+
+  // Create a non-blocking link.
+  query_plan_->addDirectDependency(id2, id1, false);
+
+  // Create a relation, owned by db_.*/
+  CatalogRelation *relation =
+      new CatalogRelation(nullptr /* catalog_database */, "test_relation");
+  const relation_id output_relation_id = db_->addRelation(relation);
+
+  // Setup the InsertDestination proto in the query context proto.
+  serialization::QueryContext *query_context_proto =
+      query_handle_->getQueryContextProtoMutable();
+
+  const QueryContext::insert_destination_id insert_destination_index =
+      query_context_proto->insert_destinations_size();
+  serialization::InsertDestination *insert_destination_proto =
+      query_context_proto->add_insert_destinations();
+
+  insert_destination_proto->set_insert_destination_type(
+      serialization::InsertDestinationType::BLOCK_POOL);
+  insert_destination_proto->set_relation_id(output_relation_id);
+  insert_destination_proto->set_relational_op_index(id1);
+
+  MockOperator *op1_mutable = static_cast<MockOperator *>(
+      query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1));
+  op1_mutable->setInsertDestinationID(insert_destination_index);
+  op1_mutable->setOutputRelationID(output_relation_id);
+
+  const MockOperator &op1 = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(id1));
+  const MockOperator &op2 = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(id2));
+
+  constructQueryManager();
+
+  // NOTE(zuyu): An operator generally has no ideas about partially filled
+  // blocks, but InsertDestination in QueryContext does.
+  // Mock to add partially filled blocks in the InsertDestination.
+  InsertDestination *insert_destination =
+      query_manager_->getQueryContextMutable()->getInsertDestination(
+          insert_destination_index);
+  DCHECK(insert_destination != nullptr);
+  MutableBlockReference block_ref;
+  static_cast<BlockPoolInsertDestination *>(insert_destination)
+      ->available_block_refs_.push_back(move(block_ref));
+
+  // There's no blocking dependency in the DAG.
+  EXPECT_TRUE(op1.getBlockingDependenciesMet());
+  EXPECT_TRUE(op2.getBlockingDependenciesMet());
+
+  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(1, op1.getNumWorkOrders());
+
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(0, op2.getNumWorkOrders());
+
+  unique_ptr<WorkerMessage> worker_message;
+  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
+
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  // Send a message to QueryManager upon workorder (generated by op1) completion.
+  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
+
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
+
+  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kRebuildWorkOrder,
+            worker_message->getType());
+
+  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  // op1 generates a rebuild workorder. The block is rebuilt and streamed
+  // to Foreman.
+  EXPECT_FALSE(placeDataPipelineMessage(id1));
+
+  EXPECT_FALSE(placeRebuildWorkOrderCompleteMessage(id1));
+  // Based on the streamed input, op2's getAllWorkOrders should produce a
+  // workorder.
+  EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(1, op2.getNumWorkOrders());
+
+  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
+
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+
+  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  EXPECT_TRUE(getOperatorFinishedStatus(id1));
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+  EXPECT_FALSE(getOperatorFinishedStatus(id2));
+  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
+
+  // Send a message to QueryManager upon workorder (generated by op2) completion.
+  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
+
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
+
+  EXPECT_TRUE(getOperatorFinishedStatus(id2));
+}
+
+TEST_F(QueryManagerTest, MultipleNodesNoOutputTest) {
+  // When an operator produces workorders but no output, the QueryManager should
+  // check the dependents of this operator to make progress.
+  const QueryPlan::DAGNodeIndex kNumNodes = 5;
+  std::vector<QueryPlan::DAGNodeIndex> ids;
+  ids.reserve(kNumNodes);
+
+  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
+    if (i == 0) {
+      ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, false));
+    } else {
+      ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, true));
+    }
+    VLOG(3) << ids[i];
+  }
+
+  /**
+   * The DAG looks like this:
+   *
+   * op1 -> op2 -> op3 -> op4 -> op5
+   *
+   **/
+  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes - 1; ++i) {
+    query_plan_->addDirectDependency(ids[i + 1], ids[i], false);
+    static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(ids[i]))
+        ->setOutputRelationID(0xdead);
+  }
+
+  std::vector<const MockOperator*> operators;
+  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
+    operators.push_back(static_cast<const MockOperator*>(&query_plan_->getQueryPlanDAG().getNodePayload(ids[i])));
+  }
+
+  constructQueryManager();
+
+  // operators[0] should have produced a workorder by now.
+  EXPECT_EQ(1, operators[0]->getNumWorkOrders());
+
+  unique_ptr<WorkerMessage> worker_message;
+  worker_message.reset(query_manager_->getNextWorkerMessage(ids[0], -1));
+
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+
+  EXPECT_EQ(ids[0], worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  EXPECT_EQ(1, getNumWorkOrdersInExecution(ids[0]));
+  EXPECT_FALSE(getOperatorFinishedStatus(ids[0]));
+
+  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
+    EXPECT_EQ(1, operators[ids[i]]->getNumCalls(MockOperator::kGetAllWorkOrders));
+  }
+
+  // Send a message to QueryManager upon workorder (generated by operators[0])
+  // completion.
+  EXPECT_TRUE(placeWorkOrderCompleteMessage(ids[0]));
+
+  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
+    EXPECT_EQ(0, getNumWorkOrdersInExecution(ids[i]));
+    EXPECT_TRUE(getOperatorFinishedStatus(ids[i]));
+    if (i < kNumNodes - 1) {
+      EXPECT_EQ(1, operators[i + 1]->getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+    }
+  }
+}
+
+TEST_F(QueryManagerTest, OutOfOrderWorkOrderCompletionTest) {
+  // Consider two operators, both generate one workorder each. The dependent's
+  // workorder finishes before dependency's workorder.
+  const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
+  const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1));
+
+  // Create a non-blocking link.
+  query_plan_->addDirectDependency(id2, id1, false);
+
+  constructQueryManager();
+
+  unique_ptr<WorkerMessage> worker_message;
+  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
+
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+
+  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  // Send a message to QueryManager upon a block (output of op1) getting full.
+  EXPECT_FALSE(placeOutputBlockMessage(id1));
+
+  // op1 is not finished yet because the response of workorder completion hasn't
+  // been received yet.
+  EXPECT_FALSE(getOperatorFinishedStatus(id1));
+  EXPECT_FALSE(getOperatorFinishedStatus(id2));
+
+  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+
+  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  // As mentioned earlier, op2 finishes before op1.
+  EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));
+
+  // op1's workorder execution is over.
+  EXPECT_TRUE(placeWorkOrderCompleteMessage(id1));
+
+  EXPECT_TRUE(getOperatorFinishedStatus(id1));
+  EXPECT_TRUE(getOperatorFinishedStatus(id2));
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/tests/QueryManager_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManager_unittest.cpp b/query_execution/tests/QueryManager_unittest.cpp
deleted file mode 100644
index 37e2cdd..0000000
--- a/query_execution/tests/QueryManager_unittest.cpp
+++ /dev/null
@@ -1,940 +0,0 @@
-/**
- *   Copyright 2011-2015 Quickstep Technologies LLC.
- *   Copyright 2015-2016 Pivotal Software, Inc.
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- **/
-
-#include <climits>
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryContext.pb.h"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryManager.hpp"
-#include "query_execution/WorkOrdersContainer.hpp"
-#include "query_execution/WorkerDirectory.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "query_optimizer/QueryHandle.hpp"
-#include "query_optimizer/QueryPlan.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/InsertDestination.hpp"
-#include "storage/InsertDestination.pb.h"
-#include "storage/StorageBlock.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageManager.hpp"
-#include "utility/DAG.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-#include "gtest/gtest.h"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::move;
-using std::unique_ptr;
-using std::vector;
-
-using tmb::client_id;
-
-namespace quickstep {
-
-class WorkOrderProtosContainer;
-
-class MockWorkOrder : public WorkOrder {
- public:
-  explicit MockWorkOrder(const int op_index)
-      : WorkOrder(0), op_index_(op_index) {}
-
-  void execute() override {
-    VLOG(3) << "WorkOrder[" << op_index_ << "] executing.";
-  }
-
-  inline QueryPlan::DAGNodeIndex getOpIndex() const {
-    return op_index_;
-  }
-
- private:
-  const QueryPlan::DAGNodeIndex op_index_;
-
-  DISALLOW_COPY_AND_ASSIGN(MockWorkOrder);
-};
-
-class MockOperator: public RelationalOperator {
- public:
-  enum function_name {
-    kFeedInputBlock = 0,
-    kFeedInputBlocks,
-    kDoneFeedingInputBlocks,
-    kGetAllWorkOrders
-  };
-
-  MockOperator(const bool produce_workorders,
-               const bool has_streaming_input,
-               const int max_getworkorder_iters = 1,
-               const int max_workorders = INT_MAX)
-      : RelationalOperator(0 /* Query Id */),
-        produce_workorders_(produce_workorders),
-        has_streaming_input_(has_streaming_input),
-        max_workorders_(max_workorders),
-        max_getworkorder_iters_(max_getworkorder_iters),
-        num_calls_get_workorders_(0),
-        num_workorders_generated_(0),
-        num_calls_feedblock_(0),
-        num_calls_feedblocks_(0),
-        num_calls_donefeedingblocks_(0) {
-  }
-
-#define MOCK_OP_LOG(x) VLOG(x) << "Op[" << op_index_ << "]: " << __func__ << ": "
-
-  // The methods below are used to check whether QueryManager calls the Relational
-  // operator, how many times it calls a particular method etc.
-  inline int getNumWorkOrders() const {
-    return num_workorders_generated_;
-  }
-
-  inline int getNumCalls(const function_name fname) const {
-    switch (fname) {
-      case kFeedInputBlock:
-        return num_calls_feedblock_;
-      case kFeedInputBlocks:
-        return num_calls_feedblocks_;
-      case kDoneFeedingInputBlocks:
-        return num_calls_donefeedingblocks_;
-      case kGetAllWorkOrders:
-        return num_calls_get_workorders_;
-      default:
-        return -1;
-    }
-  }
-
-  inline bool getBlockingDependenciesMet() const {
-    MOCK_OP_LOG(3) << "met.";
-    return blocking_dependencies_met_;
-  }
-
-  void setInsertDestinationID(const QueryContext::insert_destination_id insert_destination_index) {
-    insert_destination_index_ = insert_destination_index;
-  }
-
-  // Mock to trigger doneFeedingInputBlocks for the dependent operators
-  // in QueryManager::markOperatorFinished.
-  void setOutputRelationID(const relation_id rel_id) {
-    output_relation_id_ = rel_id;
-  }
-
-  // Override methods from the base class.
-  bool getAllWorkOrders(
-      WorkOrdersContainer *container,
-      QueryContext *query_context,
-      StorageManager *storage_manager,
-      const tmb::client_id foreman_client_id,
-      tmb::MessageBus *bus) override {
-    ++num_calls_get_workorders_;
-    if (produce_workorders_) {
-      if (has_streaming_input_) {
-        if ((num_calls_feedblock_ > 0 || num_calls_feedblocks_ > 0) && (num_workorders_generated_ < max_workorders_)) {
-          MOCK_OP_LOG(3) << "[stream] generate WorkOrder";
-          container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
-          ++num_workorders_generated_;
-        }
-      } else {
-        if (blocking_dependencies_met_ && (num_workorders_generated_ < max_workorders_)) {
-          MOCK_OP_LOG(3) << "[static] generate WorkOrder";
-          container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
-          ++num_workorders_generated_;
-        }
-      }
-    }
-    MOCK_OP_LOG(3) << "count(" << num_calls_get_workorders_ << ") "
-                   << "return(" << (num_calls_get_workorders_ == max_getworkorder_iters_) << ")";
-    return num_calls_get_workorders_ == max_getworkorder_iters_;
-  }
-
-  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override {
-    return true;
-  }
-
-  void feedInputBlock(const block_id input_block_id,
-                      const relation_id input_relation_id) override {
-    ++num_calls_feedblock_;
-    MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")";
-  }
-
-  void feedInputBlocks(const relation_id rel_id,
-                       std::vector<block_id> *partially_filled_blocks) override {
-    ++num_calls_feedblocks_;
-    MOCK_OP_LOG(3) << "count(" << num_calls_feedblocks_ << ")";
-  }
-
-  void doneFeedingInputBlocks(const relation_id rel_id) override {
-    ++num_calls_donefeedingblocks_;
-    MOCK_OP_LOG(3) << "count(" << num_calls_donefeedingblocks_ << ")";
-  }
-
-  QueryContext::insert_destination_id getInsertDestinationID() const override {
-    return insert_destination_index_;
-  }
-
-  const relation_id getOutputRelationID() const override {
-    return output_relation_id_;
-  }
-
- private:
-  const bool produce_workorders_;
-  const bool has_streaming_input_;
-  const int max_workorders_;
-  const int max_getworkorder_iters_;
-
-  int num_calls_get_workorders_;
-  int num_workorders_generated_;
-  int num_calls_feedblock_;
-  int num_calls_feedblocks_;
-  int num_calls_donefeedingblocks_;
-
-  QueryContext::insert_destination_id insert_destination_index_ = QueryContext::kInvalidInsertDestinationId;
-
-  relation_id output_relation_id_ = -1;
-
-#undef MOCK_OP_LOG
-
-  DISALLOW_COPY_AND_ASSIGN(MockOperator);
-};
-
-
-class QueryManagerTest : public ::testing::Test {
- protected:
-  virtual void SetUp() {
-    db_.reset(new CatalogDatabase(nullptr /* catalog */, "database"));
-    storage_manager_.reset(new StorageManager("./"));
-    bus_.Initialize();
-    query_handle_.reset(new QueryHandle(0));  // dummy query ID.
-    query_plan_ = query_handle_->getQueryPlanMutable();
-    query_handle_->getQueryContextProtoMutable()->set_query_id(query_handle_->query_id());
-  }
-
-  inline void constructQueryManager() {
-    query_manager_.reset(new QueryManager(
-        0, 1, query_handle_.get(), db_.get(), storage_manager_.get(), &bus_));
-  }
-
-  inline const int getNumWorkOrdersInExecution(const QueryPlan::DAGNodeIndex index) const {
-    return query_manager_->getQueryExecutionState().getNumQueuedWorkOrders(index);
-  }
-
-  inline const int getNumOperatorsFinished() const {
-    return query_manager_->getQueryExecutionState().getNumOperatorsFinished();
-  }
-
-  inline bool getOperatorFinishedStatus(const QueryPlan::DAGNodeIndex index) const {
-    return query_manager_->getQueryExecutionState().hasExecutionFinished(index);
-  }
-
-  inline bool placeDataPipelineMessage(const QueryPlan::DAGNodeIndex source_operator_index) {
-    VLOG(3) << "Place DataPipeline message for Op[" << source_operator_index << "]";
-    serialization::DataPipelineMessage proto;
-    proto.set_operator_index(source_operator_index);
-
-    proto.set_block_id(0);  // dummy block ID
-    proto.set_relation_id(0);  // dummy relation ID.
-    proto.set_query_id(0);  // dummy query ID.
-
-    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
-    const std::size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
-                                      proto_length,
-                                      kDataPipelineMessage);
-    std::free(proto_bytes);
-    query_manager_->processMessage(tagged_message);
-    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
-  }
-
-  inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
-    VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]";
-    TaggedMessage tagged_message;
-    serialization::NormalWorkOrderCompletionMessage proto;
-    proto.set_operator_index(index);
-    proto.set_worker_thread_index(1);  // dummy worker ID.
-    proto.set_query_id(0);  // dummy query ID.
-
-    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
-    const size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kWorkOrderCompleteMessage);
-    std::free(proto_bytes);
-    query_manager_->processMessage(message);
-
-    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
-  }
-
-  inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
-    VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]";
-    serialization::RebuildWorkOrderCompletionMessage proto;
-    proto.set_operator_index(index);
-    proto.set_worker_thread_index(1);  // dummy worker thread ID.
-    proto.set_query_id(0);  // dummy query ID.
-
-    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
-    const size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kRebuildWorkOrderCompleteMessage);
-
-    std::free(proto_bytes);
-    query_manager_->processMessage(message);
-
-    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
-  }
-
-  inline bool placeOutputBlockMessage(const QueryPlan::DAGNodeIndex index) {
-    VLOG(3) << "Place OutputBlock message for Op[" << index << "]";
-    serialization::DataPipelineMessage proto;
-    proto.set_operator_index(index);
-
-    proto.set_block_id(0);  // dummy block ID
-    proto.set_relation_id(0);  // dummy relation ID.
-    proto.set_query_id(0);  // dummy query ID.
-
-    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
-    const std::size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
-                                      proto_length,
-                                      kDataPipelineMessage);
-    std::free(proto_bytes);
-    query_manager_->processMessage(tagged_message);
-    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
-  }
-
-  unique_ptr<CatalogDatabase> db_;
-  unique_ptr<StorageManager> storage_manager_;
-
-  QueryPlan *query_plan_;
-  unique_ptr<QueryHandle> query_handle_;
-  unique_ptr<QueryManager> query_manager_;
-
-  MessageBusImpl bus_;
-
-  client_id worker_client_id_;
-
-  unique_ptr<WorkerDirectory> workers_;
-};
-
-TEST_F(QueryManagerTest, SingleNodeDAGNoWorkOrdersTest) {
-  // This test creates a DAG of a single node. No workorders are generated.
-  query_plan_->addRelationalOperator(new MockOperator(false, false));
-
-  const MockOperator &op = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(0));
-
-  constructQueryManager();
-
-  // op doesn't have any dependencies.
-  EXPECT_TRUE(op.getBlockingDependenciesMet());
-
-  // We expect one call for op's getAllWorkOrders().
-  EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
-}
-
-TEST_F(QueryManagerTest, SingleNodeDAGStaticWorkOrdersTest) {
-  // This test creates a DAG of a single node. Static workorders are generated.
-  const QueryPlan::DAGNodeIndex id =
-      query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
-
-  const MockOperator &op = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(id));
-
-  constructQueryManager();
-
-  // op doesn't have any dependencies.
-  EXPECT_TRUE(op.getBlockingDependenciesMet());
-
-  // We expect one call for op's getAllWorkOrders().
-  EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  // One workorder is generated.
-  EXPECT_EQ(1, op.getNumWorkOrders());
-
-  unique_ptr<WorkerMessage> worker_message;
-  worker_message.reset(query_manager_->getNextWorkerMessage(0, -1));
-  EXPECT_TRUE(worker_message != nullptr);
-
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-  EXPECT_EQ(0u, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
-  EXPECT_EQ(0, getNumOperatorsFinished());
-
-  // Send a message to QueryManager upon workorder completion.
-  // Last event processed by QueryManager.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
-  EXPECT_EQ(1, getNumOperatorsFinished());
-  EXPECT_TRUE(getOperatorFinishedStatus(id));
-}
-
-TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
-  // This test creates a DAG of a single node. WorkOrders are generated
-  // dynamically as pending work orders complete execution, i.e.,
-  // getAllWorkOrders() is called multiple times.  getAllWorkOrders() will be
-  // called 5 times and 3 work orders will be returned, i.e., 1st 3 calls to
-  // getAllWorkOrders() insert 1 WorkOrder and return false, and the next will
-  // insert no WorkOrder and return true.
-
-  // TODO(shoban): This test can not be more robust than this because of fixed
-  // scaffolding of mocking. If we use gMock, we can do much better.
-  const QueryPlan::DAGNodeIndex id =
-      query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3));
-
-  const MockOperator &op = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(id));
-
-  constructQueryManager();
-
-  // op doesn't have any dependencies.
-  EXPECT_TRUE(op.getBlockingDependenciesMet());
-
-  for (int i = 0; i < 3; ++i) {
-    // We expect one call for op's getAllWorkOrders().
-    EXPECT_EQ(i + 1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
-
-    // One workorder is generated.
-    // EXPECT_EQ(1, getWorkerInputQueueSize());
-    EXPECT_EQ(i + 1, op.getNumWorkOrders());
-
-    unique_ptr<WorkerMessage> worker_message;
-    worker_message.reset(query_manager_->getNextWorkerMessage(id, -1));
-
-    EXPECT_TRUE(worker_message != nullptr);
-    EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-              worker_message->getType());
-    EXPECT_EQ(id, worker_message->getRelationalOpIndex());
-
-    delete worker_message->getWorkOrder();
-
-    EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
-    EXPECT_EQ(0, getNumOperatorsFinished());
-
-    if (i < 2) {
-      // Send a message to QueryManager upon workorder completion.
-      EXPECT_FALSE(placeWorkOrderCompleteMessage(id));
-    } else {
-      // Send a message to QueryManager upon workorder completion.
-      // Last event.
-      EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
-    }
-  }
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
-
-  EXPECT_EQ(1, getNumOperatorsFinished());
-  EXPECT_TRUE(getOperatorFinishedStatus(id));
-
-  // We place this check in the end, since it's true throughout the test.
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
-}
-
-TEST_F(QueryManagerTest, TwoNodesDAGBlockingLinkTest) {
-  // We use two nodes in the DAG with a blocking link between them.
-  // There is no streaming of data involved in this test.
-  const QueryPlan::DAGNodeIndex id1 =
-      query_plan_->addRelationalOperator(new MockOperator(true, false));
-  const QueryPlan::DAGNodeIndex id2 =
-      query_plan_->addRelationalOperator(new MockOperator(true, false));
-
-  // Create a blocking link.
-  query_plan_->addDirectDependency(id2, id1, true);
-
-  static_cast<MockOperator *>(
-      query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
-          ->setOutputRelationID(0xdead);
-
-  const MockOperator &op1 = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(id1));
-  const MockOperator &op2 = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(id2));
-
-  constructQueryManager();
-
-  // op1 doesn't have any dependencies
-  EXPECT_TRUE(op1.getBlockingDependenciesMet());
-
-  // Only op1 should receive a call to getAllWorkOrders initially.
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  // Only op1 should produce a workorder.
-  EXPECT_EQ(1, op1.getNumWorkOrders());
-  EXPECT_EQ(0, op2.getNumWorkOrders());
-
-  // Foreman hasn't yet got workorder completion response for the workorder.
-  unique_ptr<WorkerMessage> worker_message;
-  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
-
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(id1));
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
-  EXPECT_EQ(0, getNumOperatorsFinished());
-
-  // Send a message to Foreman upon workorder (generated by op1) completion.
-  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
-  // op1 is over now, op2 still to go.
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-  EXPECT_EQ(1, getNumOperatorsFinished());
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-  EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
-  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
-
-  // op1 is op2's blocking dependency.
-  EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  // op2 should get first call of getAllWorkOrders() when op1 is over.
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-
-  EXPECT_EQ(1, op2.getNumWorkOrders());
-
-  // Send a message to QueryManager upon workorder (generated by op2) completion.
-  // Note that the worker hasn't yet popped the workorder. Usually this won't
-  // happen as workers pop workorders first, execute and then send the response.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
-
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
-
-  EXPECT_EQ(2, getNumOperatorsFinished());
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_TRUE(getOperatorFinishedStatus(id2));
-
-  // Expect no additional calls to getAllWorkOrders.
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-}
-
-TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) {
-  // We use two nodes in the DAG with a non-blocking link between them.
-  // We stream output of op1 to op2. Sequeuce of events is as follows:
-  // 1. op1 creates a workorder.
-  // 2. We send a "block full" (from op1) to QueryManager.
-  // 3. op2 creates a workorder because of step 2.
-  const QueryPlan::DAGNodeIndex id1 =
-      query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
-  const QueryPlan::DAGNodeIndex id2 =
-      query_plan_->addRelationalOperator(new MockOperator(true, true, 3));
-
-  // Create a non-blocking link.
-  query_plan_->addDirectDependency(id2, id1, false);
-
-  static_cast<MockOperator *>(
-      query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
-      ->setOutputRelationID(0xdead);
-
-  const MockOperator &op1 = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(id1));
-  const MockOperator &op2 = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(id2));
-
-  constructQueryManager();
-
-  // As none of the operators have a blocking link, blocking dependencies should
-  // be met.
-  EXPECT_TRUE(op1.getBlockingDependenciesMet());
-  EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(1, op1.getNumWorkOrders());
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  // op2 will generate workorder only after receiving a streaming input.
-  EXPECT_EQ(0, op2.getNumWorkOrders());
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  unique_ptr<WorkerMessage> worker_message;
-  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
-
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  // Send a message to QueryManager upon block getting full (output of op1).
-  EXPECT_FALSE(placeOutputBlockMessage(id1));
-
-  // op1 is not finished yet because the response of workorder completion hasn't
-  // been received yet by the QueryManager.
-  EXPECT_FALSE(getOperatorFinishedStatus(id1));
-  EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
-  // No additional call to op1's getAllWorkOrders.
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  // Output from op1 should be fed to op2.
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  // A call to op2's getAllWorkOrders because of the streamed input.
-  EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(1, op2.getNumWorkOrders());
-
-  // Place a message of a workorder completion of op1 on Foreman's input queue.
-  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-
-  // An additional call to op2's getAllWorkOrders because of completion of op1.
-  EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(2, op2.getNumWorkOrders());
-
-  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
-
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  // Place a message of a workorder completion of op2 on Foreman's input queue.
-  EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-
-  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
-
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
-  EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
-  // Send a message to Foreman upon workorder (generated by op2) completion.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
-  EXPECT_TRUE(getOperatorFinishedStatus(id2));
-}
-
-TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) {
-  // In this test, we create a 2-node DAG with a non-blocking link between them.
-  // There is no streaming of data from op1 to op2 during the execution of op1.
-  // op1 produces a partially filled block at the end of its execution which is
-  // rebuilt and then fed to op2.
-  const QueryPlan::DAGNodeIndex id1 =
-      query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
-  const QueryPlan::DAGNodeIndex id2 =
-      query_plan_->addRelationalOperator(new MockOperator(true, true, 3, 1));
-
-  // Create a non-blocking link.
-  query_plan_->addDirectDependency(id2, id1, false);
-
-  // Create a relation, owned by db_.*/
-  CatalogRelation *relation =
-      new CatalogRelation(nullptr /* catalog_database */, "test_relation");
-  const relation_id output_relation_id = db_->addRelation(relation);
-
-  // Setup the InsertDestination proto in the query context proto.
-  serialization::QueryContext *query_context_proto =
-      query_handle_->getQueryContextProtoMutable();
-
-  const QueryContext::insert_destination_id insert_destination_index =
-      query_context_proto->insert_destinations_size();
-  serialization::InsertDestination *insert_destination_proto =
-      query_context_proto->add_insert_destinations();
-
-  insert_destination_proto->set_insert_destination_type(
-      serialization::InsertDestinationType::BLOCK_POOL);
-  insert_destination_proto->set_relation_id(output_relation_id);
-  insert_destination_proto->set_relational_op_index(id1);
-
-  MockOperator *op1_mutable = static_cast<MockOperator *>(
-      query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1));
-  op1_mutable->setInsertDestinationID(insert_destination_index);
-  op1_mutable->setOutputRelationID(output_relation_id);
-
-  const MockOperator &op1 = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(id1));
-  const MockOperator &op2 = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(id2));
-
-  constructQueryManager();
-
-  // NOTE(zuyu): An operator generally has no ideas about partially filled
-  // blocks, but InsertDestination in QueryContext does.
-  // Mock to add partially filled blocks in the InsertDestination.
-  InsertDestination *insert_destination =
-      query_manager_->getQueryContextMutable()->getInsertDestination(
-          insert_destination_index);
-  DCHECK(insert_destination != nullptr);
-  MutableBlockReference block_ref;
-  static_cast<BlockPoolInsertDestination *>(insert_destination)
-      ->available_block_refs_.push_back(move(block_ref));
-
-  // There's no blocking dependency in the DAG.
-  EXPECT_TRUE(op1.getBlockingDependenciesMet());
-  EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(1, op1.getNumWorkOrders());
-
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op2.getNumWorkOrders());
-
-  unique_ptr<WorkerMessage> worker_message;
-  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
-
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  // Send a message to QueryManager upon workorder (generated by op1) completion.
-  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
-
-  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kRebuildWorkOrder,
-            worker_message->getType());
-
-  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  // op1 generates a rebuild workorder. The block is rebuilt and streamed
-  // to Foreman.
-  EXPECT_FALSE(placeDataPipelineMessage(id1));
-
-  EXPECT_FALSE(placeRebuildWorkOrderCompleteMessage(id1));
-  // Based on the streamed input, op2's getAllWorkOrders should produce a
-  // workorder.
-  EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(1, op2.getNumWorkOrders());
-
-  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
-
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-
-  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-  EXPECT_FALSE(getOperatorFinishedStatus(id2));
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
-
-  // Send a message to QueryManager upon workorder (generated by op2) completion.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id2));
-}
-
-TEST_F(QueryManagerTest, MultipleNodesNoOutputTest) {
-  // When an operator produces workorders but no output, the QueryManager should
-  // check the dependents of this operator to make progress.
-  const QueryPlan::DAGNodeIndex kNumNodes = 5;
-  std::vector<QueryPlan::DAGNodeIndex> ids;
-  ids.reserve(kNumNodes);
-
-  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
-    if (i == 0) {
-      ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, false));
-    } else {
-      ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, true));
-    }
-    VLOG(3) << ids[i];
-  }
-
-  /**
-   * The DAG looks like this:
-   *
-   * op1 -> op2 -> op3 -> op4 -> op5
-   *
-   **/
-  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes - 1; ++i) {
-    query_plan_->addDirectDependency(ids[i + 1], ids[i], false);
-    static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(ids[i]))
-        ->setOutputRelationID(0xdead);
-  }
-
-  std::vector<const MockOperator*> operators;
-  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
-    operators.push_back(static_cast<const MockOperator*>(&query_plan_->getQueryPlanDAG().getNodePayload(ids[i])));
-  }
-
-  constructQueryManager();
-
-  // operators[0] should have produced a workorder by now.
-  EXPECT_EQ(1, operators[0]->getNumWorkOrders());
-
-  unique_ptr<WorkerMessage> worker_message;
-  worker_message.reset(query_manager_->getNextWorkerMessage(ids[0], -1));
-
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-
-  EXPECT_EQ(ids[0], worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(ids[0]));
-  EXPECT_FALSE(getOperatorFinishedStatus(ids[0]));
-
-  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
-    EXPECT_EQ(1, operators[ids[i]]->getNumCalls(MockOperator::kGetAllWorkOrders));
-  }
-
-  // Send a message to QueryManager upon workorder (generated by operators[0])
-  // completion.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(ids[0]));
-
-  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
-    EXPECT_EQ(0, getNumWorkOrdersInExecution(ids[i]));
-    EXPECT_TRUE(getOperatorFinishedStatus(ids[i]));
-    if (i < kNumNodes - 1) {
-      EXPECT_EQ(1, operators[i + 1]->getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-    }
-  }
-}
-
-TEST_F(QueryManagerTest, OutOfOrderWorkOrderCompletionTest) {
-  // Consider two operators, both generate one workorder each. The dependent's
-  // workorder finishes before dependency's workorder.
-  const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
-  const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1));
-
-  // Create a non-blocking link.
-  query_plan_->addDirectDependency(id2, id1, false);
-
-  constructQueryManager();
-
-  unique_ptr<WorkerMessage> worker_message;
-  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
-
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-
-  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  // Send a message to QueryManager upon a block (output of op1) getting full.
-  EXPECT_FALSE(placeOutputBlockMessage(id1));
-
-  // op1 is not finished yet because the response of workorder completion hasn't
-  // been received yet.
-  EXPECT_FALSE(getOperatorFinishedStatus(id1));
-  EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
-  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-
-  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  // As mentioned earlier, op2 finishes before op1.
-  EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));
-
-  // op1's workorder execution is over.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(id1));
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_TRUE(getOperatorFinishedStatus(id2));
-}
-
-}  // namespace quickstep


[2/2] incubator-quickstep git commit: Introduced QueryManagerBase, and renamed QueryManagerSingleNode.

Posted by hb...@apache.org.
Introduced QueryManagerBase, and renamed QueryManagerSingleNode.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b4e25edc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b4e25edc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b4e25edc

Branch: refs/heads/master
Commit: b4e25edce8bd49b4f53f63886ad02966ae93bf91
Parents: 7671a58
Author: Zuyu Zhang <zu...@apache.org>
Authored: Fri Jul 8 11:49:14 2016 -0500
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Tue Jul 12 15:51:06 2016 -0700

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                  |  40 +-
 query_execution/PolicyEnforcer.cpp              |  14 +-
 query_execution/PolicyEnforcer.hpp              |   8 +-
 query_execution/QueryManager.cpp                | 470 ---------
 query_execution/QueryManager.hpp                | 374 --------
 query_execution/QueryManagerBase.cpp            | 305 ++++++
 query_execution/QueryManagerBase.hpp            | 312 ++++++
 query_execution/QueryManagerSingleNode.cpp      | 194 ++++
 query_execution/QueryManagerSingleNode.hpp      | 140 +++
 .../tests/QueryManagerSingleNode_unittest.cpp   | 942 +++++++++++++++++++
 query_execution/tests/QueryManager_unittest.cpp | 940 ------------------
 11 files changed, 1931 insertions(+), 1808 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index e1b1183..5a9189c 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -45,7 +45,8 @@ add_library(quickstep_queryexecution_QueryExecutionMessages_proto
 add_library(quickstep_queryexecution_QueryExecutionState ../empty_src.cpp QueryExecutionState.hpp)
 add_library(quickstep_queryexecution_QueryExecutionTypedefs ../empty_src.cpp QueryExecutionTypedefs.hpp)
 add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryExecutionUtil.hpp)
-add_library(quickstep_queryexecution_QueryManager QueryManager.cpp QueryManager.hpp)
+add_library(quickstep_queryexecution_QueryManagerBase QueryManagerBase.cpp QueryManagerBase.hpp)
+add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp)
 add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp)
 add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp)
 add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp)
@@ -89,18 +90,19 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode
                       tmb
                       ${GFLAGS_LIB_NAME})
 target_link_libraries(quickstep_queryexecution_PolicyEnforcer
-                      ${GFLAGS_LIB_NAME}
                       glog
                       quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionTypedefs
-                      quickstep_queryexecution_QueryManager
+                      quickstep_queryexecution_QueryManagerBase
+                      quickstep_queryexecution_QueryManagerSingleNode
                       quickstep_queryexecution_WorkerDirectory
                       quickstep_queryexecution_WorkerMessage
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_relationaloperators_WorkOrder
                       quickstep_utility_Macros
-                      tmb)
+                      tmb
+                      ${GFLAGS_LIB_NAME})
 target_link_libraries(quickstep_queryexecution_QueryContext
                       glog
                       quickstep_catalog_CatalogDatabaseLite
@@ -149,7 +151,7 @@ target_link_libraries(quickstep_queryexecution_QueryExecutionUtil
                       quickstep_queryexecution_WorkerMessage
                       quickstep_utility_Macros
                       tmb)
-target_link_libraries(quickstep_queryexecution_QueryManager
+target_link_libraries(quickstep_queryexecution_QueryManagerBase
                       quickstep_catalog_CatalogDatabase
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogTypedefs
@@ -158,14 +160,25 @@ target_link_libraries(quickstep_queryexecution_QueryManager
                       quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionState
                       quickstep_queryexecution_QueryExecutionTypedefs
+                      quickstep_queryoptimizer_QueryHandle
+                      quickstep_queryoptimizer_QueryPlan
+                      quickstep_relationaloperators_RelationalOperator
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_utility_DAG
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_QueryExecutionState
+                      quickstep_queryexecution_QueryManagerBase
                       quickstep_queryexecution_WorkOrdersContainer
                       quickstep_queryexecution_WorkerMessage
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_relationaloperators_RebuildWorkOrder
                       quickstep_relationaloperators_RelationalOperator
-                      quickstep_relationaloperators_WorkOrder
+                      quickstep_storage_InsertDestination
                       quickstep_storage_StorageBlock
-                      quickstep_storage_StorageBlockInfo
                       quickstep_utility_DAG
                       quickstep_utility_Macros
                       tmb)
@@ -210,7 +223,8 @@ target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_QueryExecutionState
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryExecutionUtil
-                      quickstep_queryexecution_QueryManager
+                      quickstep_queryexecution_QueryManagerBase
+                      quickstep_queryexecution_QueryManagerSingleNode
                       quickstep_queryexecution_WorkOrderProtosContainer
                       quickstep_queryexecution_WorkOrdersContainer
                       quickstep_queryexecution_Worker
@@ -248,9 +262,9 @@ if (ENABLE_DISTRIBUTED)
   add_test(BlockLocator_unittest BlockLocator_unittest)
 endif()
 
-add_executable(QueryManager_unittest
-  "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp")
-target_link_libraries(QueryManager_unittest
+add_executable(QueryManagerSingleNode_unittest
+  "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManagerSingleNode_unittest.cpp")
+target_link_libraries(QueryManagerSingleNode_unittest
                       glog
                       gtest
                       gtest_main
@@ -262,7 +276,7 @@ target_link_libraries(QueryManager_unittest
                       quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionState
                       quickstep_queryexecution_QueryExecutionTypedefs
-                      quickstep_queryexecution_QueryManager
+                      quickstep_queryexecution_QueryManagerSingleNode
                       quickstep_queryexecution_WorkOrdersContainer
                       quickstep_queryexecution_WorkerDirectory
                       quickstep_queryexecution_WorkerMessage
@@ -278,7 +292,7 @@ target_link_libraries(QueryManager_unittest
                       quickstep_utility_DAG
                       quickstep_utility_Macros
                       tmb)
-add_test(QueryManager_unittest QueryManager_unittest)
+add_test(QueryManagerSingleNode_unittest QueryManagerSingleNode_unittest)
 
 add_executable(WorkOrdersContainer_unittest
                "${CMAKE_CURRENT_SOURCE_DIR}/tests/WorkOrdersContainer_unittest.cpp")

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index 84aa86a..f310ee1 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -26,7 +26,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryManager.hpp"
+#include "query_execution/QueryManagerSingleNode.hpp"
 #include "query_execution/WorkerDirectory.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "relational_operators/WorkOrder.hpp"
@@ -47,8 +47,8 @@ bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
     if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
       // Query with the same ID not present, ok to admit.
       admitted_queries_[query_id].reset(
-          new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
-                           catalog_database_, storage_manager_, bus_));
+          new QueryManagerSingleNode(foreman_client_id_, num_numa_nodes_, query_handle,
+                                     catalog_database_, storage_manager_, bus_));
       return true;
     } else {
       LOG(ERROR) << "Query with the same ID " << query_id << " exists";
@@ -124,9 +124,9 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
       LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
   }
   DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-  const QueryManager::QueryStatusCode return_code =
+  const QueryManagerBase::QueryStatusCode return_code =
       admitted_queries_[query_id]->processMessage(tagged_message);
-  if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
+  if (return_code == QueryManagerBase::QueryStatusCode::kQueryExecuted) {
     removeQuery(query_id);
     if (!waiting_queries_.empty()) {
       // Admit the earliest waiting query.
@@ -156,12 +156,12 @@ void PolicyEnforcer::getWorkerMessages(
   std::vector<std::size_t> finished_queries_ids;
 
   for (const auto &admitted_query_info : admitted_queries_) {
-    QueryManager *curr_query_manager = admitted_query_info.second.get();
+    QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
     DCHECK(curr_query_manager != nullptr);
     std::size_t messages_collected_curr_query = 0;
     while (messages_collected_curr_query < per_query_share) {
       WorkerMessage *next_worker_message =
-          curr_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID);
+          static_cast<QueryManagerSingleNode*>(curr_query_manager)->getNextWorkerMessage(0, kAnyNUMANodeID);
       if (next_worker_message != nullptr) {
         ++messages_collected_curr_query;
         worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
index 470ff2a..79e61d1 100644
--- a/query_execution/PolicyEnforcer.hpp
+++ b/query_execution/PolicyEnforcer.hpp
@@ -26,7 +26,7 @@
 #include <vector>
 
 #include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryManager.hpp"
+#include "query_execution/QueryManagerBase.hpp"
 #include "query_execution/WorkerMessage.hpp"
 #include "utility/Macros.hpp"
 
@@ -190,8 +190,8 @@ class PolicyEnforcer {
   tmb::MessageBus *bus_;
   const bool profile_individual_workorders_;
 
-  // Key = query ID, value = QueryManager* for the key query.
-  std::unordered_map<std::size_t, std::unique_ptr<QueryManager>> admitted_queries_;
+  // Key = query ID, value = QueryManagerBase* for the key query.
+  std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_;
 
   // The queries which haven't been admitted yet.
   std::queue<QueryHandle*> waiting_queries_;
@@ -214,4 +214,4 @@ class PolicyEnforcer {
 
 }  // namespace quickstep
 
-#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_
+#endif  // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/QueryManager.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManager.cpp b/query_execution/QueryManager.cpp
deleted file mode 100644
index d20b592..0000000
--- a/query_execution/QueryManager.cpp
+++ /dev/null
@@ -1,470 +0,0 @@
-/**
- *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- *     University of Wisconsin\u2014Madison.
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- **/
-
-#include "query_execution/QueryManager.hpp"
-
-#include <cstddef>
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "catalog/PartitionScheme.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "query_optimizer/QueryHandle.hpp"
-#include "relational_operators/RebuildWorkOrder.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/StorageBlock.hpp"
-#include "storage/StorageBlockInfo.hpp"
-
-#include "glog/logging.h"
-
-using std::pair;
-
-namespace quickstep {
-class CatalogDatabaseLite;
-class StorageManager;
-}
-
-namespace quickstep {
-
-QueryManager::QueryManager(const tmb::client_id foreman_client_id,
-                           const std::size_t num_numa_nodes,
-                           QueryHandle *query_handle,
-                           CatalogDatabaseLite *catalog_database,
-                           StorageManager *storage_manager,
-                           tmb::MessageBus *bus)
-      : foreman_client_id_(foreman_client_id),
-        query_id_(DCHECK_NOTNULL(query_handle)->query_id()),
-        catalog_database_(DCHECK_NOTNULL(catalog_database)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)),
-        bus_(DCHECK_NOTNULL(bus)) {
-  DCHECK(query_handle->getQueryPlanMutable() != nullptr);
-  query_dag_ = query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable();
-  DCHECK(query_dag_ != nullptr);
-
-  const dag_node_index num_operators_in_dag = query_dag_->size();
-
-  output_consumers_.resize(num_operators_in_dag);
-  blocking_dependencies_.resize(num_operators_in_dag);
-
-  query_exec_state_.reset(new QueryExecutionState(num_operators_in_dag));
-  workorders_container_.reset(
-      new WorkOrdersContainer(num_operators_in_dag, num_numa_nodes));
-
-  query_context_.reset(new QueryContext(query_handle->getQueryContextProto(),
-                                        *catalog_database_,
-                                        storage_manager_,
-                                        foreman_client_id_,
-                                        bus_));
-
-  for (dag_node_index node_index = 0;
-       node_index < num_operators_in_dag;
-       ++node_index) {
-    const QueryContext::insert_destination_id insert_destination_index =
-        query_dag_->getNodePayload(node_index).getInsertDestinationID();
-    if (insert_destination_index != QueryContext::kInvalidInsertDestinationId) {
-      // Rebuild is necessary whenever InsertDestination is present.
-      query_exec_state_->setRebuildRequired(node_index);
-      query_exec_state_->setRebuildStatus(node_index, 0, false);
-    }
-
-    for (const pair<dag_node_index, bool> &dependent_link :
-         query_dag_->getDependents(node_index)) {
-      const dag_node_index dependent_op_index = dependent_link.first;
-      if (!query_dag_->getLinkMetadata(node_index, dependent_op_index)) {
-        // The link is not a pipeline-breaker. Streaming of blocks is possible
-        // between these two operators.
-        output_consumers_[node_index].push_back(dependent_op_index);
-      } else {
-        // The link is a pipeline-breaker. Streaming of blocks is not possible
-        // between these two operators.
-        blocking_dependencies_[dependent_op_index].push_back(node_index);
-      }
-    }
-  }
-
-  // Collect all the workorders from all the relational operators in the DAG.
-  for (dag_node_index index = 0; index < num_operators_in_dag; ++index) {
-    if (checkAllBlockingDependenciesMet(index)) {
-      query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
-      processOperator(index, false);
-    }
-  }
-}
-
-WorkerMessage* QueryManager::getNextWorkerMessage(
-    const dag_node_index start_operator_index, const int numa_node) {
-  // Default policy: Operator with lowest index first.
-  WorkOrder *work_order = nullptr;
-  size_t num_operators_checked = 0;
-  for (dag_node_index index = start_operator_index;
-       num_operators_checked < query_dag_->size();
-       index = (index + 1) % query_dag_->size(), ++num_operators_checked) {
-    if (query_exec_state_->hasExecutionFinished(index)) {
-      continue;
-    }
-    if (numa_node != -1) {
-      // First try to get a normal WorkOrder from the specified NUMA node.
-      work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node);
-      if (work_order != nullptr) {
-        // A WorkOrder found on the given NUMA node.
-        query_exec_state_->incrementNumQueuedWorkOrders(index);
-        return WorkerMessage::WorkOrderMessage(work_order, index);
-      } else {
-        // Normal workorder not found on this node. Look for a rebuild workorder
-        // on this NUMA node.
-        work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node);
-        if (work_order != nullptr) {
-          return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
-        }
-      }
-    }
-    // Either no workorder found on the given NUMA node, or numa_node is -1.
-    // Try to get a normal WorkOrder from other NUMA nodes.
-    work_order = workorders_container_->getNormalWorkOrder(index);
-    if (work_order != nullptr) {
-      query_exec_state_->incrementNumQueuedWorkOrders(index);
-      return WorkerMessage::WorkOrderMessage(work_order, index);
-    } else {
-      // Normal WorkOrder not found, look for a RebuildWorkOrder.
-      work_order = workorders_container_->getRebuildWorkOrder(index);
-      if (work_order != nullptr) {
-        return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
-      }
-    }
-  }
-  // No WorkOrders available right now.
-  return nullptr;
-}
-
-QueryManager::QueryStatusCode QueryManager::processMessage(
-    const TaggedMessage &tagged_message) {
-  dag_node_index op_index;
-  switch (tagged_message.message_type()) {
-    case kWorkOrderCompleteMessage: {
-      serialization::NormalWorkOrderCompletionMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-
-      op_index = proto.operator_index();
-      processWorkOrderCompleteMessage(proto.operator_index());
-      break;
-    }
-    case kRebuildWorkOrderCompleteMessage: {
-      serialization::RebuildWorkOrderCompletionMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-
-      op_index = proto.operator_index();
-      processRebuildWorkOrderCompleteMessage(proto.operator_index());
-      break;
-    }
-    case kCatalogRelationNewBlockMessage: {
-      serialization::CatalogRelationNewBlockMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-
-      const block_id block = proto.block_id();
-
-      CatalogRelation *relation =
-          static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
-      relation->addBlock(block);
-
-      if (proto.has_partition_id()) {
-        relation->getPartitionSchemeMutable()->addBlockToPartition(
-            proto.partition_id(), block);
-      }
-      return QueryStatusCode::kNone;
-    }
-    case kDataPipelineMessage: {
-      // Possible message senders include InsertDestinations and some
-      // operators which modify existing blocks.
-      serialization::DataPipelineMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-
-      op_index = proto.operator_index();
-      processDataPipelineMessage(proto.operator_index(),
-                                 proto.block_id(),
-                                 proto.relation_id());
-      break;
-    }
-    case kWorkOrdersAvailableMessage: {
-      serialization::WorkOrdersAvailableMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-
-      op_index = proto.operator_index();
-
-      // Check if new work orders are available.
-      fetchNormalWorkOrders(op_index);
-
-      // Dispatch the WorkerMessages to the workers. We prefer to start the search
-      // for the schedulable WorkOrders beginning from 'op_index'. The first
-      // candidate worker to receive the next WorkOrder is the one that sent the
-      // response message to Foreman.
-      // TODO(zuyu): Improve the data locality for the next WorkOrder.
-      break;
-    }
-    case kWorkOrderFeedbackMessage: {
-      WorkOrder::FeedbackMessage msg(
-          const_cast<void *>(tagged_message.message()),
-          tagged_message.message_bytes());
-
-      op_index = msg.header().rel_op_index;
-      processFeedbackMessage(msg);
-      break;
-    }
-    default:
-      LOG(FATAL) << "Unknown message type found in QueryManager";
-  }
-
-  if (query_exec_state_->hasExecutionFinished(op_index)) {
-    return QueryStatusCode::kOperatorExecuted;
-  }
-
-  // As kQueryExecuted takes precedence over kOperatorExecuted, we check again.
-  if (query_exec_state_->hasQueryExecutionFinished()) {
-    return QueryStatusCode::kQueryExecuted;
-  }
-
-  return QueryStatusCode::kNone;
-}
-
-void QueryManager::processFeedbackMessage(
-    const WorkOrder::FeedbackMessage &msg) {
-  RelationalOperator *op =
-      query_dag_->getNodePayloadMutable(msg.header().rel_op_index);
-  op->receiveFeedbackMessage(msg);
-}
-
-void QueryManager::processWorkOrderCompleteMessage(
-    const dag_node_index op_index) {
-  query_exec_state_->decrementNumQueuedWorkOrders(op_index);
-
-  // Check if new work orders are available and fetch them if so.
-  fetchNormalWorkOrders(op_index);
-
-  if (checkRebuildRequired(op_index)) {
-    if (checkNormalExecutionOver(op_index)) {
-      if (!checkRebuildInitiated(op_index)) {
-        if (initiateRebuild(op_index)) {
-          // Rebuild initiated and completed right away.
-          markOperatorFinished(op_index);
-        } else {
-          // Rebuild under progress.
-        }
-      } else if (checkRebuildOver(op_index)) {
-        // Rebuild was under progress and now it is over.
-        markOperatorFinished(op_index);
-      }
-    } else {
-      // Normal execution under progress for this operator.
-    }
-  } else if (checkOperatorExecutionOver(op_index)) {
-    // Rebuild not required for this operator and its normal execution is
-    // complete.
-    markOperatorFinished(op_index);
-  }
-
-  for (const pair<dag_node_index, bool> &dependent_link :
-       query_dag_->getDependents(op_index)) {
-    const dag_node_index dependent_op_index = dependent_link.first;
-    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-      // Process the dependent operator (of the operator whose WorkOrder
-      // was just executed) for which all the dependencies have been met.
-      processOperator(dependent_op_index, true);
-    }
-  }
-}
-
-void QueryManager::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index) {
-  query_exec_state_->decrementNumRebuildWorkOrders(op_index);
-
-  if (checkRebuildOver(op_index)) {
-    markOperatorFinished(op_index);
-
-    for (const pair<dag_node_index, bool> &dependent_link :
-         query_dag_->getDependents(op_index)) {
-      const dag_node_index dependent_op_index = dependent_link.first;
-      if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-        processOperator(dependent_op_index, true);
-      }
-    }
-  }
-}
-
-void QueryManager::processOperator(const dag_node_index index,
-                                   const bool recursively_check_dependents) {
-  if (fetchNormalWorkOrders(index)) {
-    // Fetched work orders. Return to wait for the generated work orders to
-    // execute, and skip the execution-finished checks.
-    return;
-  }
-
-  if (checkNormalExecutionOver(index)) {
-    if (checkRebuildRequired(index)) {
-      if (!checkRebuildInitiated(index)) {
-        // Rebuild hasn't started, initiate it.
-        if (initiateRebuild(index)) {
-          // Rebuild initiated and completed right away.
-          markOperatorFinished(index);
-        } else {
-          // Rebuild WorkOrders have been generated.
-          return;
-        }
-      } else if (checkRebuildOver(index)) {
-        // Rebuild had been initiated and it is over.
-        markOperatorFinished(index);
-      }
-    } else {
-      // Rebuild is not required and normal execution over, mark finished.
-      markOperatorFinished(index);
-    }
-    // If we reach here, that means the operator has been marked as finished.
-    if (recursively_check_dependents) {
-      for (const pair<dag_node_index, bool> &dependent_link :
-           query_dag_->getDependents(index)) {
-        const dag_node_index dependent_op_index = dependent_link.first;
-        if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-          processOperator(dependent_op_index, true);
-        }
-      }
-    }
-  }
-}
-
-void QueryManager::processDataPipelineMessage(const dag_node_index op_index,
-                                              const block_id block,
-                                              const relation_id rel_id) {
-  for (const dag_node_index consumer_index :
-       output_consumers_[op_index]) {
-    // Feed the streamed block to the consumer. Note that 'output_consumers_'
-    // only contain those dependents of operator with index = op_index which are
-    // eligible to receive streamed input.
-    query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id);
-    // Because of the streamed input just fed, check if there are any new
-    // WorkOrders available and if so, fetch them.
-    fetchNormalWorkOrders(consumer_index);
-  }
-}
-
-bool QueryManager::fetchNormalWorkOrders(const dag_node_index index) {
-  bool generated_new_workorders = false;
-  if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
-    // Do not fetch any work units until all blocking dependencies are met.
-    // The releational operator is not aware of blocking dependencies for
-    // uncorrelated scalar queries.
-    if (!checkAllBlockingDependenciesMet(index)) {
-      return false;
-    }
-    const size_t num_pending_workorders_before =
-        workorders_container_->getNumNormalWorkOrders(index);
-    const bool done_generation =
-        query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(),
-                                                                   query_context_.get(),
-                                                                   storage_manager_,
-                                                                   foreman_client_id_,
-                                                                   bus_);
-    if (done_generation) {
-      query_exec_state_->setDoneGenerationWorkOrders(index);
-    }
-
-    // TODO(shoban): It would be a good check to see if operator is making
-    // useful progress, i.e., the operator either generates work orders to
-    // execute or still has pending work orders executing. However, this will not
-    // work if Foreman polls operators without feeding data. This check can be
-    // enabled, if Foreman is refactored to call getAllWorkOrders() only when
-    // pending work orders are completed or new input blocks feed.
-
-    generated_new_workorders =
-        (num_pending_workorders_before <
-         workorders_container_->getNumNormalWorkOrders(index));
-  }
-  return generated_new_workorders;
-}
-
-void QueryManager::markOperatorFinished(const dag_node_index index) {
-  query_exec_state_->setExecutionFinished(index);
-
-  RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
-  op->updateCatalogOnCompletion();
-
-  const relation_id output_rel = op->getOutputRelationID();
-  for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) {
-    const dag_node_index dependent_op_index = dependent_link.first;
-    RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index);
-    // Signal dependent operator that current operator is done feeding input blocks.
-    if (output_rel >= 0) {
-      dependent_op->doneFeedingInputBlocks(output_rel);
-    }
-    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
-      dependent_op->informAllBlockingDependenciesMet();
-    }
-  }
-}
-
-bool QueryManager::initiateRebuild(const dag_node_index index) {
-  DCHECK(!workorders_container_->hasRebuildWorkOrder(index));
-  DCHECK(checkRebuildRequired(index));
-  DCHECK(!checkRebuildInitiated(index));
-
-  getRebuildWorkOrders(index, workorders_container_.get());
-
-  query_exec_state_->setRebuildStatus(
-      index, workorders_container_->getNumRebuildWorkOrders(index), true);
-
-  return (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
-}
-
-void QueryManager::getRebuildWorkOrders(const dag_node_index index,
-                          WorkOrdersContainer *container) {
-  const RelationalOperator &op = query_dag_->getNodePayload(index);
-  const QueryContext::insert_destination_id insert_destination_index = op.getInsertDestinationID();
-
-  if (insert_destination_index == QueryContext::kInvalidInsertDestinationId) {
-    return;
-  }
-
-  std::vector<MutableBlockReference> partially_filled_block_refs;
-
-  DCHECK(query_context_ != nullptr);
-  InsertDestination *insert_destination = query_context_->getInsertDestination(insert_destination_index);
-  DCHECK(insert_destination != nullptr);
-
-  insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
-
-  for (std::vector<MutableBlockReference>::size_type i = 0;
-       i < partially_filled_block_refs.size();
-       ++i) {
-    container->addRebuildWorkOrder(
-        new RebuildWorkOrder(query_id_,
-                             std::move(partially_filled_block_refs[i]),
-                             index,
-                             op.getOutputRelationID(),
-                             foreman_client_id_,
-                             bus_),
-        index);
-  }
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/QueryManager.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManager.hpp b/query_execution/QueryManager.hpp
deleted file mode 100644
index b52460f..0000000
--- a/query_execution/QueryManager.hpp
+++ /dev/null
@@ -1,374 +0,0 @@
-/**
- *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- *     University of Wisconsin\u2014Madison.
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_
-#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/WorkOrdersContainer.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "utility/DAG.hpp"
-#include "utility/Macros.hpp"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-namespace quickstep {
-
-class CatalogDatabaseLite;
-class QueryHandle;
-class StorageManager;
-class WorkerMessage;
-
-/** \addtogroup QueryExecution
- *  @{
- */
-
-/**
- * @brief A class that manages the execution of a query including generation
- *        of new work orders, keeping track of the query exection state.
- **/
-class QueryManager {
- public:
-  typedef DAG<RelationalOperator, bool>::size_type_nodes dag_node_index;
-
-  /**
-   * @brief Return codes for processMessage() function.
-   *
-   * @note When both operator and query get executed, kQueryExecuted takes
-   *       precedence over kOperatorExecuted.
-   **/
-  enum class QueryStatusCode {
-    kOperatorExecuted = 0,  // An operator in the query finished execution.
-    kQueryExecuted,         // The query got executed.
-    kNone                   // None of the above.
-  };
-
-  /**
-   * @brief Constructor.
-   *
-   * @param foreman_client_id The TMB client ID of the foreman thread.
-   * @param num_numa_nodes The number of NUMA nodes used by the system.
-   * @param query_handle The QueryHandle object for this query.
-   * @param catalog_database The CatalogDatabse used by the query.
-   * @param storage_manager The StorageManager used by the query.
-   * @param bus The TMB used for communication.
-   **/
-  QueryManager(const tmb::client_id foreman_client_id,
-               const std::size_t num_numa_nodes,
-               QueryHandle *query_handle,
-               CatalogDatabaseLite *catalog_database,
-               StorageManager *storage_manager,
-               tmb::MessageBus *bus);
-
- /**
-   * @brief Get the next workorder to be excuted, wrapped in a WorkerMessage.
-   *
-   * @param start_operator_index Begin the search for the schedulable WorkOrder
-   *        with the operator at this index.
-   * @param numa_node The next WorkOrder should preferably have its input(s)
-   *        from this numa_node. This is a hint and not a binding requirement.
-   *
-   * @return A pointer to the WorkerMessage. If there's no WorkOrder to be
-   *         executed, return NULL.
-   **/
-  WorkerMessage *getNextWorkerMessage(
-      const dag_node_index start_operator_index,
-      const numa_node_id node_id = -1);
-
-  /**
-   * @brief Process a message sent to the QueryManager.
-   *
-   * @param tagged_message TaggedMessage sent to the QueryManager.
-   *
-   * @return QueryStatusCode as determined after the message is processed.
-   **/
-  QueryStatusCode processMessage(const TaggedMessage &tagged_message);
-
-  /**
-   * @brief Get the QueryExecutionState for this query.
-   **/
-  inline const QueryExecutionState& getQueryExecutionState() const {
-    return *query_exec_state_;
-  }
-
-  /**
-   * @brief Get a pointer to the QueryContext.
-   **/
-  inline QueryContext* getQueryContextMutable() {
-    return query_context_.get();
-  }
-
- private:
-  /**
-   * @brief Process the received WorkOrder complete message.
-   *
-   * @param node_index The index of the specified operator node in the query DAG
-   *        for the completed WorkOrder.
-   **/
-  void processWorkOrderCompleteMessage(const dag_node_index op_index);
-
-  /**
-   * @brief Process the received RebuildWorkOrder complete message.
-   *
-   * @param node_index The index of the specified operator node in the query DAG
-   *        for the completed RebuildWorkOrder.
-   **/
-  void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index);
-
-  /**
-   * @brief Process a current relational operator: Get its workorders and store
-   *        them in the WorkOrdersContainer for this query. If the operator can
-   *        be marked as done, do so.
-   *
-   * @param index The index of the relational operator to be processed in the
-   *        query plan DAG.
-   * @param recursively_check_dependents If an operator is done, should we
-   *        call processOperator on its dependents recursively.
-   **/
-  void processOperator(const dag_node_index index,
-                       const bool recursively_check_dependents);
-
-  /**
-   * @brief Process the received data pipeline message.
-   *
-   * @param node_index The index of the specified operator node in the query DAG
-   *        for the pipelining block.
-   * @param block The block id.
-   * @param rel_id The ID of the relation that produced 'block'.
-   **/
-  void processDataPipelineMessage(const dag_node_index op_index,
-                                  const block_id block,
-                                  const relation_id rel_id);
-
-  /**
-   * @brief Process the received work order feedback message and notify
-   *        relational operator.
-   *
-   * @param message Feedback message from work order.
-   **/
-  void processFeedbackMessage(const WorkOrder::FeedbackMessage &message);
-
-  /**
-   * @brief Fetch all work orders currently available in relational operator and
-   *        store them internally.
-   *
-   * @param index The index of the relational operator to be processed in the
-   *        query plan DAG.
-   *
-   * @return Whether any work order was generated by op.
-   **/
-  bool fetchNormalWorkOrders(const dag_node_index index);
-
-  /**
-   * @brief This function does the following things:
-   *        1. Mark the given relational operator as "done".
-   *        2. For all the dependents of this operator, check if all of their
-   *        blocking dependencies are met. If so inform them that the blocking
-   *        dependencies are met.
-   *        3. Check if the given operator is done producing output. If it's
-   *        done, inform the dependents that they won't receive input anymore
-   *        from the given operator.
-   *
-   * @param index The index of the given relational operator in the DAG.
-   **/
-  void markOperatorFinished(const dag_node_index index);
-
-  /**
-   * @brief Check if all the dependencies of the node at specified index have
-   *        finished their execution.
-   *
-   * @note This function's true return value is a pre-requisite for calling
-   *       getRebuildWorkOrders()
-   *
-   * @param node_index The index of the specified node in the query DAG.
-   *
-   * @return True if all the dependencies have finished their execution. False
-   *         otherwise.
-   **/
-  inline bool checkAllDependenciesMet(const dag_node_index node_index) const {
-    for (const dag_node_index dependency_index :
-         query_dag_->getDependencies(node_index)) {
-      // If at least one of the dependencies is not met, return false.
-      if (!query_exec_state_->hasExecutionFinished(dependency_index)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * @brief Check if all the blocking dependencies of the node at specified
-   *        index have finished their execution.
-   *
-   * @note A blocking dependency is the one which is pipeline breaker. Output of
-   *       a dependency can't be streamed to its dependent if the link between
-   *       them is pipeline breaker.
-   *
-   * @param node_index The index of the specified node in the query DAG.
-   *
-   * @return True if all the blocking dependencies have finished their
-   *         execution. False otherwise.
-   **/
-  inline bool checkAllBlockingDependenciesMet(
-      const dag_node_index node_index) const {
-    for (const dag_node_index blocking_dependency_index :
-         blocking_dependencies_[node_index]) {
-      if (!query_exec_state_->hasExecutionFinished(
-              blocking_dependency_index)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * @brief Check if the execution of the given operator is over.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the execution of the given operator is over, false
-   *         otherwise.
-   **/
-  inline bool checkOperatorExecutionOver(const dag_node_index index) const {
-    if (checkRebuildRequired(index)) {
-      return (checkNormalExecutionOver(index) && checkRebuildOver(index));
-    } else {
-      return checkNormalExecutionOver(index);
-    }
-  }
-
-  /**
-   * @brief Check if the given operator's normal execution is over.
-   *
-   * @note The conditions for a given operator's normal execution to get over:
-   *       1. All of its  normal (i.e. non rebuild) WorkOrders have finished
-   *       execution.
-   *       2. The operator is done generating work orders.
-   *       3. All of the dependencies of the given operator have been met.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the normal execution of the given operator is over, false
-   *         otherwise.
-   **/
-  inline bool checkNormalExecutionOver(const dag_node_index index) const {
-    return (checkAllDependenciesMet(index) &&
-            !workorders_container_->hasNormalWorkOrder(index) &&
-            query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
-            query_exec_state_->hasDoneGenerationWorkOrders(index));
-  }
-
-  /**
-   * @brief Check if the rebuild operation is required for a given operator.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the rebuild operation is required, false otherwise.
-   **/
-  inline bool checkRebuildRequired(const dag_node_index index) const {
-    return query_exec_state_->isRebuildRequired(index);
-  }
-
-  /**
-   * @brief Check if the rebuild operation for a given operator is over.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the rebuild operation is over, false otherwise.
-   **/
-  inline bool checkRebuildOver(const dag_node_index index) const {
-    return query_exec_state_->hasRebuildInitiated(index) &&
-           !workorders_container_->hasRebuildWorkOrder(index) &&
-           (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
-  }
-
-  /**
-   * @brief Check if the rebuild operation for a given operator has been
-   *        initiated.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the rebuild operation has been initiated, false otherwise.
-   **/
-  inline bool checkRebuildInitiated(const dag_node_index index) const {
-    return query_exec_state_->hasRebuildInitiated(index);
-  }
-
-  /**
-   * @brief Initiate the rebuild process for partially filled blocks generated
-   *        during the execution of the given operator.
-   *
-   * @param index The index of the given operator in the DAG.
-   *
-   * @return True if the rebuild is over immediately, i.e. the operator didn't
-   *         generate any rebuild WorkOrders, false otherwise.
-   **/
-  bool initiateRebuild(const dag_node_index index);
-
-  /**
-   * @brief Get the rebuild WorkOrders for an operator.
-   *
-   * @note This function should be called only once, when all the normal
-   *       WorkOrders generated by an operator finish their execution.
-   *
-   * @param index The index of the operator in the query plan DAG.
-   * @param container A pointer to a WorkOrdersContainer to be used to store the
-   *        generated WorkOrders.
-   **/
-  void getRebuildWorkOrders(const dag_node_index index,
-                            WorkOrdersContainer *container);
-
-  const tmb::client_id foreman_client_id_;
-  const std::size_t query_id_;
-
-  CatalogDatabaseLite *catalog_database_;
-  StorageManager *storage_manager_;
-  tmb::MessageBus *bus_;
-
-  DAG<RelationalOperator, bool> *query_dag_;
-
-  std::unique_ptr<QueryContext> query_context_;
-
-  // For all nodes, store their receiving dependents.
-  std::vector<std::vector<dag_node_index>> output_consumers_;
-
-  // For all nodes, store their pipeline breaking dependencies (if any).
-  std::vector<std::vector<dag_node_index>> blocking_dependencies_;
-
-  std::unique_ptr<QueryExecutionState> query_exec_state_;
-
-  std::unique_ptr<WorkOrdersContainer> workorders_container_;
-
-  DISALLOW_COPY_AND_ASSIGN(QueryManager);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
new file mode 100644
index 0000000..f7e183f
--- /dev/null
+++ b/query_execution/QueryManagerBase.cpp
@@ -0,0 +1,305 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/QueryManagerBase.hpp"
+
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryPlan.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "glog/logging.h"
+
+using std::pair;
+
+namespace quickstep {
+
+QueryManagerBase::QueryManagerBase(QueryHandle *query_handle,
+                                   CatalogDatabaseLite *catalog_database)
+    : query_id_(DCHECK_NOTNULL(query_handle)->query_id()),
+      catalog_database_(DCHECK_NOTNULL(catalog_database)),
+      query_dag_(DCHECK_NOTNULL(
+          DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())),
+      num_operators_in_dag_(query_dag_->size()),
+      output_consumers_(num_operators_in_dag_),
+      blocking_dependencies_(num_operators_in_dag_),
+      query_exec_state_(new QueryExecutionState(num_operators_in_dag_)) {
+  for (dag_node_index node_index = 0;
+       node_index < num_operators_in_dag_;
+       ++node_index) {
+    const QueryContext::insert_destination_id insert_destination_index =
+        query_dag_->getNodePayload(node_index).getInsertDestinationID();
+    if (insert_destination_index != QueryContext::kInvalidInsertDestinationId) {
+      // Rebuild is necessary whenever InsertDestination is present.
+      query_exec_state_->setRebuildRequired(node_index);
+      query_exec_state_->setRebuildStatus(node_index, 0, false);
+    }
+
+    for (const pair<dag_node_index, bool> &dependent_link :
+         query_dag_->getDependents(node_index)) {
+      const dag_node_index dependent_op_index = dependent_link.first;
+      if (!query_dag_->getLinkMetadata(node_index, dependent_op_index)) {
+        // The link is not a pipeline-breaker. Streaming of blocks is possible
+        // between these two operators.
+        output_consumers_[node_index].push_back(dependent_op_index);
+      } else {
+        // The link is a pipeline-breaker. Streaming of blocks is not possible
+        // between these two operators.
+        blocking_dependencies_[dependent_op_index].push_back(node_index);
+      }
+    }
+  }
+}
+
+QueryManagerBase::QueryStatusCode QueryManagerBase::processMessage(
+    const TaggedMessage &tagged_message) {
+  dag_node_index op_index;
+  switch (tagged_message.message_type()) {
+    case kWorkOrderCompleteMessage: {
+      serialization::NormalWorkOrderCompletionMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+
+      op_index = proto.operator_index();
+      processWorkOrderCompleteMessage(proto.operator_index());
+      break;
+    }
+    case kRebuildWorkOrderCompleteMessage: {
+      serialization::RebuildWorkOrderCompletionMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+
+      op_index = proto.operator_index();
+      processRebuildWorkOrderCompleteMessage(proto.operator_index());
+      break;
+    }
+    case kCatalogRelationNewBlockMessage: {
+      serialization::CatalogRelationNewBlockMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+
+      const block_id block = proto.block_id();
+
+      CatalogRelation *relation =
+          static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
+      relation->addBlock(block);
+
+      if (proto.has_partition_id()) {
+        relation->getPartitionSchemeMutable()->addBlockToPartition(
+            proto.partition_id(), block);
+      }
+      return QueryStatusCode::kNone;
+    }
+    case kDataPipelineMessage: {
+      // Possible message senders include InsertDestinations and some
+      // operators which modify existing blocks.
+      serialization::DataPipelineMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+
+      op_index = proto.operator_index();
+      processDataPipelineMessage(proto.operator_index(),
+                                 proto.block_id(),
+                                 proto.relation_id());
+      break;
+    }
+    case kWorkOrdersAvailableMessage: {
+      serialization::WorkOrdersAvailableMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+
+      op_index = proto.operator_index();
+
+      // Check if new work orders are available.
+      fetchNormalWorkOrders(op_index);
+      break;
+    }
+    case kWorkOrderFeedbackMessage: {
+      WorkOrder::FeedbackMessage msg(
+          const_cast<void *>(tagged_message.message()),
+          tagged_message.message_bytes());
+
+      op_index = msg.header().rel_op_index;
+      processFeedbackMessage(msg);
+      break;
+    }
+    default:
+      LOG(FATAL) << "Unknown message type found in QueryManager";
+  }
+
+  if (query_exec_state_->hasExecutionFinished(op_index)) {
+    return QueryStatusCode::kOperatorExecuted;
+  }
+
+  // As kQueryExecuted takes precedence over kOperatorExecuted, we check again.
+  if (query_exec_state_->hasQueryExecutionFinished()) {
+    return QueryStatusCode::kQueryExecuted;
+  }
+
+  return QueryStatusCode::kNone;
+}
+
+void QueryManagerBase::processFeedbackMessage(
+    const WorkOrder::FeedbackMessage &msg) {
+  RelationalOperator *op =
+      query_dag_->getNodePayloadMutable(msg.header().rel_op_index);
+  op->receiveFeedbackMessage(msg);
+}
+
+void QueryManagerBase::processWorkOrderCompleteMessage(
+    const dag_node_index op_index) {
+  query_exec_state_->decrementNumQueuedWorkOrders(op_index);
+
+  // Check if new work orders are available and fetch them if so.
+  fetchNormalWorkOrders(op_index);
+
+  if (checkRebuildRequired(op_index)) {
+    if (checkNormalExecutionOver(op_index)) {
+      if (!checkRebuildInitiated(op_index)) {
+        if (initiateRebuild(op_index)) {
+          // Rebuild initiated and completed right away.
+          markOperatorFinished(op_index);
+        } else {
+          // Rebuild under progress.
+        }
+      } else if (checkRebuildOver(op_index)) {
+        // Rebuild was under progress and now it is over.
+        markOperatorFinished(op_index);
+      }
+    } else {
+      // Normal execution under progress for this operator.
+    }
+  } else if (checkOperatorExecutionOver(op_index)) {
+    // Rebuild not required for this operator and its normal execution is
+    // complete.
+    markOperatorFinished(op_index);
+  }
+
+  for (const pair<dag_node_index, bool> &dependent_link :
+       query_dag_->getDependents(op_index)) {
+    const dag_node_index dependent_op_index = dependent_link.first;
+    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+      // Process the dependent operator (of the operator whose WorkOrder
+      // was just executed) for which all the dependencies have been met.
+      processOperator(dependent_op_index, true);
+    }
+  }
+}
+
+void QueryManagerBase::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index) {
+  query_exec_state_->decrementNumRebuildWorkOrders(op_index);
+
+  if (checkRebuildOver(op_index)) {
+    markOperatorFinished(op_index);
+
+    for (const pair<dag_node_index, bool> &dependent_link :
+         query_dag_->getDependents(op_index)) {
+      const dag_node_index dependent_op_index = dependent_link.first;
+      if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+        processOperator(dependent_op_index, true);
+      }
+    }
+  }
+}
+
+void QueryManagerBase::processOperator(const dag_node_index index,
+                                       const bool recursively_check_dependents) {
+  if (fetchNormalWorkOrders(index)) {
+    // Fetched work orders. Return to wait for the generated work orders to
+    // execute, and skip the execution-finished checks.
+    return;
+  }
+
+  if (checkNormalExecutionOver(index)) {
+    if (checkRebuildRequired(index)) {
+      if (!checkRebuildInitiated(index)) {
+        // Rebuild hasn't started, initiate it.
+        if (initiateRebuild(index)) {
+          // Rebuild initiated and completed right away.
+          markOperatorFinished(index);
+        } else {
+          // Rebuild WorkOrders have been generated.
+          return;
+        }
+      } else if (checkRebuildOver(index)) {
+        // Rebuild had been initiated and it is over.
+        markOperatorFinished(index);
+      }
+    } else {
+      // Rebuild is not required and normal execution over, mark finished.
+      markOperatorFinished(index);
+    }
+    // If we reach here, that means the operator has been marked as finished.
+    if (recursively_check_dependents) {
+      for (const pair<dag_node_index, bool> &dependent_link :
+           query_dag_->getDependents(index)) {
+        const dag_node_index dependent_op_index = dependent_link.first;
+        if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+          processOperator(dependent_op_index, true);
+        }
+      }
+    }
+  }
+}
+
+void QueryManagerBase::processDataPipelineMessage(const dag_node_index op_index,
+                                                  const block_id block,
+                                                  const relation_id rel_id) {
+  for (const dag_node_index consumer_index :
+       output_consumers_[op_index]) {
+    // Feed the streamed block to the consumer. Note that 'output_consumers_'
+    // only contain those dependents of operator with index = op_index which are
+    // eligible to receive streamed input.
+    query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id);
+    // Because of the streamed input just fed, check if there are any new
+    // WorkOrders available and if so, fetch them.
+    fetchNormalWorkOrders(consumer_index);
+  }
+}
+
+void QueryManagerBase::markOperatorFinished(const dag_node_index index) {
+  query_exec_state_->setExecutionFinished(index);
+
+  RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
+  op->updateCatalogOnCompletion();
+
+  const relation_id output_rel = op->getOutputRelationID();
+  for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) {
+    const dag_node_index dependent_op_index = dependent_link.first;
+    RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index);
+    // Signal dependent operator that current operator is done feeding input blocks.
+    if (output_rel >= 0) {
+      dependent_op->doneFeedingInputBlocks(output_rel);
+    }
+    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+      dependent_op->informAllBlockingDependenciesMet();
+    }
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
new file mode 100644
index 0000000..9e192c8
--- /dev/null
+++ b/query_execution/QueryManagerBase.hpp
@@ -0,0 +1,312 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/DAG.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A base class that manages the execution of a query including
+ *        generation of new work orders, and keeping track of the query
+ *        exection state.
+ **/
+class QueryManagerBase {
+ public:
+  typedef DAG<RelationalOperator, bool>::size_type_nodes dag_node_index;
+
+  /**
+   * @brief Return codes for processMessage() function.
+   *
+   * @note When both operator and query get executed, kQueryExecuted takes
+   *       precedence over kOperatorExecuted.
+   **/
+  enum class QueryStatusCode {
+    kOperatorExecuted = 0,  // An operator in the query finished execution.
+    kQueryExecuted,         // The query got executed.
+    kNone                   // None of the above.
+  };
+
+  /**
+   * @brief Constructor.
+   *
+   * @param query_handle The QueryHandle object for this query.
+   * @param catalog_database The CatalogDatabse used by the query.
+   **/
+  QueryManagerBase(QueryHandle *query_handle,
+                   CatalogDatabaseLite *catalog_database);
+
+  /**
+   * @brief Virtual destructor.
+   **/
+  virtual ~QueryManagerBase() {}
+
+  /**
+   * @brief Process a message sent to the QueryManager.
+   *
+   * @param tagged_message TaggedMessage sent to the QueryManager.
+   *
+   * @return QueryStatusCode as determined after the message is processed.
+   **/
+  QueryStatusCode processMessage(const TaggedMessage &tagged_message);
+
+  /**
+   * @brief Get the QueryExecutionState for this query.
+   **/
+  inline const QueryExecutionState& getQueryExecutionState() const {
+    return *query_exec_state_;
+  }
+
+ protected:
+  /**
+   * @brief Process the received WorkOrder complete message.
+   *
+   * @param node_index The index of the specified operator node in the query DAG
+   *        for the completed WorkOrder.
+   **/
+  void processWorkOrderCompleteMessage(const dag_node_index op_index);
+
+  /**
+   * @brief Process the received RebuildWorkOrder complete message.
+   *
+   * @param node_index The index of the specified operator node in the query DAG
+   *        for the completed RebuildWorkOrder.
+   **/
+  void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index);
+
+  /**
+   * @brief Process a current relational operator: Get its workorders and store
+   *        them in the WorkOrdersContainer for this query. If the operator can
+   *        be marked as done, do so.
+   *
+   * @param index The index of the relational operator to be processed in the
+   *        query plan DAG.
+   * @param recursively_check_dependents If an operator is done, should we
+   *        call processOperator on its dependents recursively.
+   **/
+  void processOperator(const dag_node_index index,
+                       const bool recursively_check_dependents);
+
+  /**
+   * @brief Process the received data pipeline message.
+   *
+   * @param node_index The index of the specified operator node in the query DAG
+   *        for the pipelining block.
+   * @param block The block id.
+   * @param rel_id The ID of the relation that produced 'block'.
+   **/
+  void processDataPipelineMessage(const dag_node_index op_index,
+                                  const block_id block,
+                                  const relation_id rel_id);
+
+  /**
+   * @brief Process the received work order feedback message and notify
+   *        relational operator.
+   *
+   * @param message Feedback message from work order.
+   **/
+  void processFeedbackMessage(const WorkOrder::FeedbackMessage &message);
+
+  /**
+   * @brief This function does the following things:
+   *        1. Mark the given relational operator as "done".
+   *        2. For all the dependents of this operator, check if all of their
+   *        blocking dependencies are met. If so inform them that the blocking
+   *        dependencies are met.
+   *        3. Check if the given operator is done producing output. If it's
+   *        done, inform the dependents that they won't receive input anymore
+   *        from the given operator.
+   *
+   * @param index The index of the given relational operator in the DAG.
+   **/
+  void markOperatorFinished(const dag_node_index index);
+
+  /**
+   * @brief Check if all the dependencies of the node at specified index have
+   *        finished their execution.
+   *
+   * @note This function's true return value is a pre-requisite for calling
+   *       getRebuildWorkOrders()
+   *
+   * @param node_index The index of the specified node in the query DAG.
+   *
+   * @return True if all the dependencies have finished their execution. False
+   *         otherwise.
+   **/
+  inline bool checkAllDependenciesMet(const dag_node_index node_index) const {
+    for (const dag_node_index dependency_index :
+         query_dag_->getDependencies(node_index)) {
+      // If at least one of the dependencies is not met, return false.
+      if (!query_exec_state_->hasExecutionFinished(dependency_index)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * @brief Check if all the blocking dependencies of the node at specified
+   *        index have finished their execution.
+   *
+   * @note A blocking dependency is the one which is pipeline breaker. Output of
+   *       a dependency can't be streamed to its dependent if the link between
+   *       them is pipeline breaker.
+   *
+   * @param node_index The index of the specified node in the query DAG.
+   *
+   * @return True if all the blocking dependencies have finished their
+   *         execution. False otherwise.
+   **/
+  inline bool checkAllBlockingDependenciesMet(
+      const dag_node_index node_index) const {
+    for (const dag_node_index blocking_dependency_index :
+         blocking_dependencies_[node_index]) {
+      if (!query_exec_state_->hasExecutionFinished(
+              blocking_dependency_index)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * @brief Check if the execution of the given operator is over.
+   *
+   * @param index The index of the given operator in the DAG.
+   *
+   * @return True if the execution of the given operator is over, false
+   *         otherwise.
+   **/
+  inline bool checkOperatorExecutionOver(const dag_node_index index) const {
+    return this->checkNormalExecutionOver(index) &&
+           (!checkRebuildRequired(index) || this->checkRebuildOver(index));
+  }
+
+  /**
+   * @brief Check if the rebuild operation is required for a given operator.
+   *
+   * @param index The index of the given operator in the DAG.
+   *
+   * @return True if the rebuild operation is required, false otherwise.
+   **/
+  inline bool checkRebuildRequired(const dag_node_index index) const {
+    return query_exec_state_->isRebuildRequired(index);
+  }
+
+  /**
+   * @brief Check if the rebuild operation for a given operator has been
+   *        initiated.
+   *
+   * @param index The index of the given operator in the DAG.
+   *
+   * @return True if the rebuild operation has been initiated, false otherwise.
+   **/
+  inline bool checkRebuildInitiated(const dag_node_index index) const {
+    return query_exec_state_->hasRebuildInitiated(index);
+  }
+
+  const std::size_t query_id_;
+
+  CatalogDatabaseLite *catalog_database_;
+
+  DAG<RelationalOperator, bool> *query_dag_;
+  const dag_node_index num_operators_in_dag_;
+
+  // For all nodes, store their receiving dependents.
+  std::vector<std::vector<dag_node_index>> output_consumers_;
+
+  // For all nodes, store their pipeline breaking dependencies (if any).
+  std::vector<std::vector<dag_node_index>> blocking_dependencies_;
+
+  std::unique_ptr<QueryExecutionState> query_exec_state_;
+
+ private:
+  /**
+   * @brief Fetch all work orders currently available in relational operator and
+   *        store them internally.
+   *
+   * @param index The index of the relational operator to be processed in the
+   *        query plan DAG.
+   *
+   * @return Whether any work order was generated by op.
+   **/
+  virtual bool fetchNormalWorkOrders(const dag_node_index index) = 0;
+
+  /**
+   * @brief Check if the given operator's normal execution is over.
+   *
+   * @note The conditions for a given operator's normal execution to get over:
+   *       1. All of its  normal (i.e. non rebuild) WorkOrders have finished
+   *       execution.
+   *       2. The operator is done generating work orders.
+   *       3. All of the dependencies of the given operator have been met.
+   *
+   * @param index The index of the given operator in the DAG.
+   *
+   * @return True if the normal execution of the given operator is over, false
+   *         otherwise.
+   **/
+  virtual bool checkNormalExecutionOver(const dag_node_index index) const = 0;
+
+  /**
+   * @brief Initiate the rebuild process for partially filled blocks generated
+   *        during the execution of the given operator.
+   *
+   * @param index The index of the given operator in the DAG.
+   *
+   * @return True if the rebuild is over immediately, i.e. the operator didn't
+   *         generate any rebuild WorkOrders, false otherwise.
+   **/
+  virtual bool initiateRebuild(const dag_node_index index) = 0;
+
+  /**
+   * @brief Check if the rebuild operation for a given operator is over.
+   *
+   * @param index The index of the given operator in the DAG.
+   *
+   * @return True if the rebuild operation is over, false otherwise.
+   **/
+  virtual bool checkRebuildOver(const dag_node_index index) const = 0;
+
+  DISALLOW_COPY_AND_ASSIGN(QueryManagerBase);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/QueryManagerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
new file mode 100644
index 0000000..193b188
--- /dev/null
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -0,0 +1,194 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/QueryManagerSingleNode.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "relational_operators/RebuildWorkOrder.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "storage/InsertDestination.hpp"
+#include "storage/StorageBlock.hpp"
+#include "utility/DAG.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+class WorkOrder;
+
+QueryManagerSingleNode::QueryManagerSingleNode(
+    const tmb::client_id foreman_client_id,
+    const std::size_t num_numa_nodes,
+    QueryHandle *query_handle,
+    CatalogDatabaseLite *catalog_database,
+    StorageManager *storage_manager,
+    tmb::MessageBus *bus)
+    : QueryManagerBase(query_handle, catalog_database),
+      foreman_client_id_(foreman_client_id),
+      storage_manager_(DCHECK_NOTNULL(storage_manager)),
+      bus_(DCHECK_NOTNULL(bus)),
+      query_context_(new QueryContext(query_handle->getQueryContextProto(),
+                                      *catalog_database_,
+                                      storage_manager_,
+                                      foreman_client_id_,
+                                      bus_)),
+      workorders_container_(
+          new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)) {
+  // Collect all the workorders from all the relational operators in the DAG.
+  for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
+    if (checkAllBlockingDependenciesMet(index)) {
+      query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
+      processOperator(index, false);
+    }
+  }
+}
+
+WorkerMessage* QueryManagerSingleNode::getNextWorkerMessage(
+    const dag_node_index start_operator_index, const numa_node_id numa_node) {
+  // Default policy: Operator with lowest index first.
+  WorkOrder *work_order = nullptr;
+  size_t num_operators_checked = 0;
+  for (dag_node_index index = start_operator_index;
+       num_operators_checked < num_operators_in_dag_;
+       index = (index + 1) % num_operators_in_dag_, ++num_operators_checked) {
+    if (query_exec_state_->hasExecutionFinished(index)) {
+      continue;
+    }
+    if (numa_node != kAnyNUMANodeID) {
+      // First try to get a normal WorkOrder from the specified NUMA node.
+      work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node);
+      if (work_order != nullptr) {
+        // A WorkOrder found on the given NUMA node.
+        query_exec_state_->incrementNumQueuedWorkOrders(index);
+        return WorkerMessage::WorkOrderMessage(work_order, index);
+      } else {
+        // Normal workorder not found on this node. Look for a rebuild workorder
+        // on this NUMA node.
+        work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node);
+        if (work_order != nullptr) {
+          return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
+        }
+      }
+    }
+    // Either no workorder found on the given NUMA node, or numa_node is
+    // 'kAnyNUMANodeID'.
+    // Try to get a normal WorkOrder from other NUMA nodes.
+    work_order = workorders_container_->getNormalWorkOrder(index);
+    if (work_order != nullptr) {
+      query_exec_state_->incrementNumQueuedWorkOrders(index);
+      return WorkerMessage::WorkOrderMessage(work_order, index);
+    } else {
+      // Normal WorkOrder not found, look for a RebuildWorkOrder.
+      work_order = workorders_container_->getRebuildWorkOrder(index);
+      if (work_order != nullptr) {
+        return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
+      }
+    }
+  }
+  // No WorkOrders available right now.
+  return nullptr;
+}
+
+bool QueryManagerSingleNode::fetchNormalWorkOrders(const dag_node_index index) {
+  bool generated_new_workorders = false;
+  if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
+    // Do not fetch any work units until all blocking dependencies are met.
+    // The releational operator is not aware of blocking dependencies for
+    // uncorrelated scalar queries.
+    if (!checkAllBlockingDependenciesMet(index)) {
+      return false;
+    }
+    const size_t num_pending_workorders_before =
+        workorders_container_->getNumNormalWorkOrders(index);
+    const bool done_generation =
+        query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(),
+                                                                   query_context_.get(),
+                                                                   storage_manager_,
+                                                                   foreman_client_id_,
+                                                                   bus_);
+    if (done_generation) {
+      query_exec_state_->setDoneGenerationWorkOrders(index);
+    }
+
+    // TODO(shoban): It would be a good check to see if operator is making
+    // useful progress, i.e., the operator either generates work orders to
+    // execute or still has pending work orders executing. However, this will not
+    // work if Foreman polls operators without feeding data. This check can be
+    // enabled, if Foreman is refactored to call getAllWorkOrders() only when
+    // pending work orders are completed or new input blocks feed.
+
+    generated_new_workorders =
+        (num_pending_workorders_before <
+         workorders_container_->getNumNormalWorkOrders(index));
+  }
+  return generated_new_workorders;
+}
+
+bool QueryManagerSingleNode::initiateRebuild(const dag_node_index index) {
+  DCHECK(!workorders_container_->hasRebuildWorkOrder(index));
+  DCHECK(checkRebuildRequired(index));
+  DCHECK(!checkRebuildInitiated(index));
+
+  getRebuildWorkOrders(index, workorders_container_.get());
+
+  query_exec_state_->setRebuildStatus(
+      index, workorders_container_->getNumRebuildWorkOrders(index), true);
+
+  return (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
+}
+
+void QueryManagerSingleNode::getRebuildWorkOrders(const dag_node_index index,
+                                                  WorkOrdersContainer *container) {
+  const RelationalOperator &op = query_dag_->getNodePayload(index);
+  const QueryContext::insert_destination_id insert_destination_index = op.getInsertDestinationID();
+
+  if (insert_destination_index == QueryContext::kInvalidInsertDestinationId) {
+    return;
+  }
+
+  std::vector<MutableBlockReference> partially_filled_block_refs;
+
+  DCHECK(query_context_ != nullptr);
+  InsertDestination *insert_destination = query_context_->getInsertDestination(insert_destination_index);
+  DCHECK(insert_destination != nullptr);
+
+  insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
+
+  for (std::vector<MutableBlockReference>::size_type i = 0;
+       i < partially_filled_block_refs.size();
+       ++i) {
+    container->addRebuildWorkOrder(
+        new RebuildWorkOrder(query_id_,
+                             std::move(partially_filled_block_refs[i]),
+                             index,
+                             op.getOutputRelationID(),
+                             foreman_client_id_,
+                             bus_),
+        index);
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/QueryManagerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp
new file mode 100644
index 0000000..5533f06
--- /dev/null
+++ b/query_execution/QueryManagerSingleNode.hpp
@@ -0,0 +1,140 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_
+
+#include <cstddef>
+#include <memory>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+class StorageManager;
+class WorkerMessage;
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A class that manages the execution of a query including generation
+ *        of new work orders, keeping track of the query exection state.
+ **/
+class QueryManagerSingleNode final : public QueryManagerBase {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param foreman_client_id The TMB client ID of the foreman thread.
+   * @param num_numa_nodes The number of NUMA nodes used by the system.
+   * @param query_handle The QueryHandle object for this query.
+   * @param catalog_database The CatalogDatabse used by the query.
+   * @param storage_manager The StorageManager used by the query.
+   * @param bus The TMB used for communication.
+   **/
+  QueryManagerSingleNode(const tmb::client_id foreman_client_id,
+                         const std::size_t num_numa_nodes,
+                         QueryHandle *query_handle,
+                         CatalogDatabaseLite *catalog_database,
+                         StorageManager *storage_manager,
+                         tmb::MessageBus *bus);
+
+  ~QueryManagerSingleNode() override {}
+
+ /**
+   * @brief Get the next workorder to be excuted, wrapped in a WorkerMessage.
+   *
+   * @param start_operator_index Begin the search for the schedulable WorkOrder
+   *        with the operator at this index.
+   * @param numa_node The next WorkOrder should preferably have its input(s)
+   *        from this numa_node. This is a hint and not a binding requirement.
+   *
+   * @return A pointer to the WorkerMessage. If there's no WorkOrder to be
+   *         executed, return NULL.
+   **/
+  WorkerMessage* getNextWorkerMessage(
+      const dag_node_index start_operator_index,
+      const numa_node_id node_id = kAnyNUMANodeID);
+
+  /**
+   * @brief Get a pointer to the QueryContext.
+   **/
+  inline QueryContext* getQueryContextMutable() {
+    return query_context_.get();
+  }
+
+ private:
+  bool fetchNormalWorkOrders(const dag_node_index index) override;
+
+  bool checkNormalExecutionOver(const dag_node_index index) const override {
+    return (checkAllDependenciesMet(index) &&
+            !workorders_container_->hasNormalWorkOrder(index) &&
+            query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
+            query_exec_state_->hasDoneGenerationWorkOrders(index));
+  }
+
+  bool initiateRebuild(const dag_node_index index) override;
+
+  bool checkRebuildOver(const dag_node_index index) const override {
+    return query_exec_state_->hasRebuildInitiated(index) &&
+           !workorders_container_->hasRebuildWorkOrder(index) &&
+           (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
+  }
+
+  /**
+   * @brief Get the rebuild WorkOrders for an operator.
+   *
+   * @note This function should be called only once, when all the normal
+   *       WorkOrders generated by an operator finish their execution.
+   *
+   * @param index The index of the operator in the query plan DAG.
+   * @param container A pointer to a WorkOrdersContainer to be used to store the
+   *        generated WorkOrders.
+   **/
+  void getRebuildWorkOrders(const dag_node_index index,
+                            WorkOrdersContainer *container);
+
+  const tmb::client_id foreman_client_id_;
+
+  StorageManager *storage_manager_;
+  tmb::MessageBus *bus_;
+
+  std::unique_ptr<QueryContext> query_context_;
+
+  std::unique_ptr<WorkOrdersContainer> workorders_container_;
+
+  DISALLOW_COPY_AND_ASSIGN(QueryManagerSingleNode);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_