You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ra...@apache.org on 2016/08/05 22:52:29 UTC

[05/30] incubator-quickstep git commit: Introduced QueryManagerBase, and renamed QueryManagerSingleNode.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/tests/QueryManagerSingleNode_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
new file mode 100644
index 0000000..52cee20
--- /dev/null
+++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
@@ -0,0 +1,942 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include <climits>
+#include <cstdlib>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/QueryContext.pb.h"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryManagerSingleNode.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_execution/WorkerMessage.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryPlan.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/InsertDestination.hpp"
+#include "storage/InsertDestination.pb.h"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/DAG.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/tagged_message.h"
+
+namespace tmb { class MessageBus; }
+
+using std::move;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::client_id;
+
+namespace quickstep {
+
+class WorkOrderProtosContainer;
+
+class MockWorkOrder : public WorkOrder {
+ public:
+  explicit MockWorkOrder(const int op_index)
+      : WorkOrder(0), op_index_(op_index) {}
+
+  void execute() override {
+    VLOG(3) << "WorkOrder[" << op_index_ << "] executing.";
+  }
+
+  inline QueryPlan::DAGNodeIndex getOpIndex() const {
+    return op_index_;
+  }
+
+ private:
+  const QueryPlan::DAGNodeIndex op_index_;
+
+  DISALLOW_COPY_AND_ASSIGN(MockWorkOrder);
+};
+
+class MockOperator: public RelationalOperator {
+ public:
+  enum function_name {
+    kFeedInputBlock = 0,
+    kFeedInputBlocks,
+    kDoneFeedingInputBlocks,
+    kGetAllWorkOrders
+  };
+
+  MockOperator(const bool produce_workorders,
+               const bool has_streaming_input,
+               const int max_getworkorder_iters = 1,
+               const int max_workorders = INT_MAX)
+      : RelationalOperator(0 /* Query Id */),
+        produce_workorders_(produce_workorders),
+        has_streaming_input_(has_streaming_input),
+        max_workorders_(max_workorders),
+        max_getworkorder_iters_(max_getworkorder_iters),
+        num_calls_get_workorders_(0),
+        num_workorders_generated_(0),
+        num_calls_feedblock_(0),
+        num_calls_feedblocks_(0),
+        num_calls_donefeedingblocks_(0) {
+  }
+
+#define MOCK_OP_LOG(x) VLOG(x) << "Op[" << op_index_ << "]: " << __func__ << ": "
+
+  // The methods below are used to check whether QueryManager calls the Relational
+  // operator, how many times it calls a particular method etc.
+  inline int getNumWorkOrders() const {
+    return num_workorders_generated_;
+  }
+
+  inline int getNumCalls(const function_name fname) const {
+    switch (fname) {
+      case kFeedInputBlock:
+        return num_calls_feedblock_;
+      case kFeedInputBlocks:
+        return num_calls_feedblocks_;
+      case kDoneFeedingInputBlocks:
+        return num_calls_donefeedingblocks_;
+      case kGetAllWorkOrders:
+        return num_calls_get_workorders_;
+      default:
+        return -1;
+    }
+  }
+
+  inline bool getBlockingDependenciesMet() const {
+    MOCK_OP_LOG(3) << "met.";
+    return blocking_dependencies_met_;
+  }
+
+  void setInsertDestinationID(const QueryContext::insert_destination_id insert_destination_index) {
+    insert_destination_index_ = insert_destination_index;
+  }
+
+  // Mock to trigger doneFeedingInputBlocks for the dependent operators
+  // in QueryManager::markOperatorFinished.
+  void setOutputRelationID(const relation_id rel_id) {
+    output_relation_id_ = rel_id;
+  }
+
+  // Override methods from the base class.
+  bool getAllWorkOrders(
+      WorkOrdersContainer *container,
+      QueryContext *query_context,
+      StorageManager *storage_manager,
+      const tmb::client_id foreman_client_id,
+      tmb::MessageBus *bus) override {
+    ++num_calls_get_workorders_;
+    if (produce_workorders_) {
+      if (has_streaming_input_) {
+        if ((num_calls_feedblock_ > 0 || num_calls_feedblocks_ > 0) && (num_workorders_generated_ < max_workorders_)) {
+          MOCK_OP_LOG(3) << "[stream] generate WorkOrder";
+          container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
+          ++num_workorders_generated_;
+        }
+      } else {
+        if (blocking_dependencies_met_ && (num_workorders_generated_ < max_workorders_)) {
+          MOCK_OP_LOG(3) << "[static] generate WorkOrder";
+          container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
+          ++num_workorders_generated_;
+        }
+      }
+    }
+    MOCK_OP_LOG(3) << "count(" << num_calls_get_workorders_ << ") "
+                   << "return(" << (num_calls_get_workorders_ == max_getworkorder_iters_) << ")";
+    return num_calls_get_workorders_ == max_getworkorder_iters_;
+  }
+
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override {
+    return true;
+  }
+
+  void feedInputBlock(const block_id input_block_id,
+                      const relation_id input_relation_id) override {
+    ++num_calls_feedblock_;
+    MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")";
+  }
+
+  void feedInputBlocks(const relation_id rel_id,
+                       std::vector<block_id> *partially_filled_blocks) override {
+    ++num_calls_feedblocks_;
+    MOCK_OP_LOG(3) << "count(" << num_calls_feedblocks_ << ")";
+  }
+
+  void doneFeedingInputBlocks(const relation_id rel_id) override {
+    ++num_calls_donefeedingblocks_;
+    MOCK_OP_LOG(3) << "count(" << num_calls_donefeedingblocks_ << ")";
+  }
+
+  QueryContext::insert_destination_id getInsertDestinationID() const override {
+    return insert_destination_index_;
+  }
+
+  const relation_id getOutputRelationID() const override {
+    return output_relation_id_;
+  }
+
+ private:
+  const bool produce_workorders_;
+  const bool has_streaming_input_;
+  const int max_workorders_;
+  const int max_getworkorder_iters_;
+
+  int num_calls_get_workorders_;
+  int num_workorders_generated_;
+  int num_calls_feedblock_;
+  int num_calls_feedblocks_;
+  int num_calls_donefeedingblocks_;
+
+  QueryContext::insert_destination_id insert_destination_index_ = QueryContext::kInvalidInsertDestinationId;
+
+  relation_id output_relation_id_ = -1;
+
+#undef MOCK_OP_LOG
+
+  DISALLOW_COPY_AND_ASSIGN(MockOperator);
+};
+
+
+class QueryManagerTest : public ::testing::Test {
+ protected:
+  virtual void SetUp() {
+    db_.reset(new CatalogDatabase(nullptr /* catalog */, "database"));
+    storage_manager_.reset(new StorageManager("./"));
+    bus_.Initialize();
+    query_handle_.reset(new QueryHandle(0));  // dummy query ID.
+    query_plan_ = query_handle_->getQueryPlanMutable();
+    query_handle_->getQueryContextProtoMutable()->set_query_id(query_handle_->query_id());
+  }
+
+  inline void constructQueryManager() {
+    query_manager_.reset(new QueryManagerSingleNode(
+        0, 1, query_handle_.get(), db_.get(), storage_manager_.get(), &bus_));
+  }
+
+  inline const int getNumWorkOrdersInExecution(const QueryPlan::DAGNodeIndex index) const {
+    return query_manager_->getQueryExecutionState().getNumQueuedWorkOrders(index);
+  }
+
+  inline const int getNumOperatorsFinished() const {
+    return query_manager_->getQueryExecutionState().getNumOperatorsFinished();
+  }
+
+  inline bool getOperatorFinishedStatus(const QueryPlan::DAGNodeIndex index) const {
+    return query_manager_->getQueryExecutionState().hasExecutionFinished(index);
+  }
+
+  inline bool placeDataPipelineMessage(const QueryPlan::DAGNodeIndex source_operator_index) {
+    VLOG(3) << "Place DataPipeline message for Op[" << source_operator_index << "]";
+    serialization::DataPipelineMessage proto;
+    proto.set_operator_index(source_operator_index);
+
+    proto.set_block_id(0);  // dummy block ID
+    proto.set_relation_id(0);  // dummy relation ID.
+    proto.set_query_id(0);  // dummy query ID.
+
+    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
+    const std::size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
+                                      proto_length,
+                                      kDataPipelineMessage);
+    std::free(proto_bytes);
+    query_manager_->processMessage(tagged_message);
+    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
+  }
+
+  inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
+    VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]";
+    TaggedMessage tagged_message;
+    serialization::NormalWorkOrderCompletionMessage proto;
+    proto.set_operator_index(index);
+    proto.set_worker_thread_index(1);  // dummy worker ID.
+    proto.set_query_id(0);  // dummy query ID.
+
+    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
+    const size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kWorkOrderCompleteMessage);
+    std::free(proto_bytes);
+    query_manager_->processMessage(message);
+
+    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
+  }
+
+  inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
+    VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]";
+    serialization::RebuildWorkOrderCompletionMessage proto;
+    proto.set_operator_index(index);
+    proto.set_worker_thread_index(1);  // dummy worker thread ID.
+    proto.set_query_id(0);  // dummy query ID.
+
+    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
+    const size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kRebuildWorkOrderCompleteMessage);
+
+    std::free(proto_bytes);
+    query_manager_->processMessage(message);
+
+    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
+  }
+
+  inline bool placeOutputBlockMessage(const QueryPlan::DAGNodeIndex index) {
+    VLOG(3) << "Place OutputBlock message for Op[" << index << "]";
+    serialization::DataPipelineMessage proto;
+    proto.set_operator_index(index);
+
+    proto.set_block_id(0);  // dummy block ID
+    proto.set_relation_id(0);  // dummy relation ID.
+    proto.set_query_id(0);  // dummy query ID.
+
+    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
+    const std::size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
+                                      proto_length,
+                                      kDataPipelineMessage);
+    std::free(proto_bytes);
+    query_manager_->processMessage(tagged_message);
+    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
+  }
+
+  unique_ptr<CatalogDatabase> db_;
+  unique_ptr<StorageManager> storage_manager_;
+
+  QueryPlan *query_plan_;
+  unique_ptr<QueryHandle> query_handle_;
+  unique_ptr<QueryManagerSingleNode> query_manager_;
+
+  MessageBusImpl bus_;
+
+  client_id worker_client_id_;
+
+  unique_ptr<WorkerDirectory> workers_;
+};
+
+TEST_F(QueryManagerTest, SingleNodeDAGNoWorkOrdersTest) {
+  // This test creates a DAG of a single node. No workorders are generated.
+  query_plan_->addRelationalOperator(new MockOperator(false, false));
+
+  const MockOperator &op = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(0));
+
+  constructQueryManager();
+
+  // op doesn't have any dependencies.
+  EXPECT_TRUE(op.getBlockingDependenciesMet());
+
+  // We expect one call for op's getAllWorkOrders().
+  EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
+}
+
+TEST_F(QueryManagerTest, SingleNodeDAGStaticWorkOrdersTest) {
+  // This test creates a DAG of a single node. Static workorders are generated.
+  const QueryPlan::DAGNodeIndex id =
+      query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
+
+  const MockOperator &op = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(id));
+
+  constructQueryManager();
+
+  // op doesn't have any dependencies.
+  EXPECT_TRUE(op.getBlockingDependenciesMet());
+
+  // We expect one call for op's getAllWorkOrders().
+  EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
+
+  // One workorder is generated.
+  EXPECT_EQ(1, op.getNumWorkOrders());
+
+  unique_ptr<WorkerMessage> worker_message;
+  worker_message.reset(query_manager_->getNextWorkerMessage(0, -1));
+  EXPECT_TRUE(worker_message != nullptr);
+
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+  EXPECT_EQ(0u, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
+  EXPECT_EQ(0, getNumOperatorsFinished());
+
+  // Send a message to QueryManager upon workorder completion.
+  // Last event processed by QueryManager.
+  EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
+
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
+  EXPECT_EQ(1, getNumOperatorsFinished());
+  EXPECT_TRUE(getOperatorFinishedStatus(id));
+}
+
+TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
+  // This test creates a DAG of a single node. WorkOrders are generated
+  // dynamically as pending work orders complete execution, i.e.,
+  // getAllWorkOrders() is called multiple times.  getAllWorkOrders() will be
+  // called 5 times and 3 work orders will be returned, i.e., 1st 3 calls to
+  // getAllWorkOrders() insert 1 WorkOrder and return false, and the next will
+  // insert no WorkOrder and return true.
+
+  // TODO(shoban): This test can not be more robust than this because of fixed
+  // scaffolding of mocking. If we use gMock, we can do much better.
+  const QueryPlan::DAGNodeIndex id =
+      query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3));
+
+  const MockOperator &op = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(id));
+
+  constructQueryManager();
+
+  // op doesn't have any dependencies.
+  EXPECT_TRUE(op.getBlockingDependenciesMet());
+
+  for (int i = 0; i < 3; ++i) {
+    // We expect one call for op's getAllWorkOrders().
+    EXPECT_EQ(i + 1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
+
+    // One workorder is generated.
+    // EXPECT_EQ(1, getWorkerInputQueueSize());
+    EXPECT_EQ(i + 1, op.getNumWorkOrders());
+
+    unique_ptr<WorkerMessage> worker_message;
+    worker_message.reset(query_manager_->getNextWorkerMessage(id, -1));
+
+    EXPECT_TRUE(worker_message != nullptr);
+    EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+              worker_message->getType());
+    EXPECT_EQ(id, worker_message->getRelationalOpIndex());
+
+    delete worker_message->getWorkOrder();
+
+    EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
+    EXPECT_EQ(0, getNumOperatorsFinished());
+
+    if (i < 2) {
+      // Send a message to QueryManager upon workorder completion.
+      EXPECT_FALSE(placeWorkOrderCompleteMessage(id));
+    } else {
+      // Send a message to QueryManager upon workorder completion.
+      // Last event.
+      EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
+    }
+  }
+
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
+
+  EXPECT_EQ(1, getNumOperatorsFinished());
+  EXPECT_TRUE(getOperatorFinishedStatus(id));
+
+  // We place this check in the end, since it's true throughout the test.
+  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
+}
+
+TEST_F(QueryManagerTest, TwoNodesDAGBlockingLinkTest) {
+  // We use two nodes in the DAG with a blocking link between them.
+  // There is no streaming of data involved in this test.
+  const QueryPlan::DAGNodeIndex id1 =
+      query_plan_->addRelationalOperator(new MockOperator(true, false));
+  const QueryPlan::DAGNodeIndex id2 =
+      query_plan_->addRelationalOperator(new MockOperator(true, false));
+
+  // Create a blocking link.
+  query_plan_->addDirectDependency(id2, id1, true);
+
+  static_cast<MockOperator *>(
+      query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
+          ->setOutputRelationID(0xdead);
+
+  const MockOperator &op1 = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(id1));
+  const MockOperator &op2 = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(id2));
+
+  constructQueryManager();
+
+  // op1 doesn't have any dependencies
+  EXPECT_TRUE(op1.getBlockingDependenciesMet());
+
+  // Only op1 should receive a call to getAllWorkOrders initially.
+  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
+
+  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
+
+  // Only op1 should produce a workorder.
+  EXPECT_EQ(1, op1.getNumWorkOrders());
+  EXPECT_EQ(0, op2.getNumWorkOrders());
+
+  // Foreman hasn't yet got workorder completion response for the workorder.
+  unique_ptr<WorkerMessage> worker_message;
+  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
+
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  EXPECT_EQ(1, getNumWorkOrdersInExecution(id1));
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
+  EXPECT_EQ(0, getNumOperatorsFinished());
+
+  // Send a message to Foreman upon workorder (generated by op1) completion.
+  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
+
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
+  // op1 is over now, op2 still to go.
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+  EXPECT_EQ(1, getNumOperatorsFinished());
+
+  EXPECT_TRUE(getOperatorFinishedStatus(id1));
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+  EXPECT_FALSE(getOperatorFinishedStatus(id2));
+
+  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
+
+  // op1 is op2's blocking dependency.
+  EXPECT_TRUE(op2.getBlockingDependenciesMet());
+
+  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+  // op2 should get first call of getAllWorkOrders() when op1 is over.
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+
+  EXPECT_EQ(1, op2.getNumWorkOrders());
+
+  // Send a message to QueryManager upon workorder (generated by op2) completion.
+  // Note that the worker hasn't yet popped the workorder. Usually this won't
+  // happen as workers pop workorders first, execute and then send the response.
+  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
+
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
+
+  EXPECT_EQ(2, getNumOperatorsFinished());
+  EXPECT_TRUE(getOperatorFinishedStatus(id1));
+  EXPECT_TRUE(getOperatorFinishedStatus(id2));
+
+  // Expect no additional calls to getAllWorkOrders.
+  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+}
+
+TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) {
+  // We use two nodes in the DAG with a non-blocking link between them.
+  // We stream output of op1 to op2. Sequeuce of events is as follows:
+  // 1. op1 creates a workorder.
+  // 2. We send a "block full" (from op1) to QueryManager.
+  // 3. op2 creates a workorder because of step 2.
+  const QueryPlan::DAGNodeIndex id1 =
+      query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
+  const QueryPlan::DAGNodeIndex id2 =
+      query_plan_->addRelationalOperator(new MockOperator(true, true, 3));
+
+  // Create a non-blocking link.
+  query_plan_->addDirectDependency(id2, id1, false);
+
+  static_cast<MockOperator *>(
+      query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
+      ->setOutputRelationID(0xdead);
+
+  const MockOperator &op1 = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(id1));
+  const MockOperator &op2 = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(id2));
+
+  constructQueryManager();
+
+  // As none of the operators have a blocking link, blocking dependencies should
+  // be met.
+  EXPECT_TRUE(op1.getBlockingDependenciesMet());
+  EXPECT_TRUE(op2.getBlockingDependenciesMet());
+
+  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(1, op1.getNumWorkOrders());
+  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
+
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  // op2 will generate workorder only after receiving a streaming input.
+  EXPECT_EQ(0, op2.getNumWorkOrders());
+  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
+
+  unique_ptr<WorkerMessage> worker_message;
+  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
+
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  // Send a message to QueryManager upon block getting full (output of op1).
+  EXPECT_FALSE(placeOutputBlockMessage(id1));
+
+  // op1 is not finished yet because the response of workorder completion hasn't
+  // been received yet by the QueryManager.
+  EXPECT_FALSE(getOperatorFinishedStatus(id1));
+  EXPECT_FALSE(getOperatorFinishedStatus(id2));
+
+  // No additional call to op1's getAllWorkOrders.
+  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
+
+  // Output from op1 should be fed to op2.
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kFeedInputBlock));
+  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
+
+  // A call to op2's getAllWorkOrders because of the streamed input.
+  EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(1, op2.getNumWorkOrders());
+
+  // Place a message of a workorder completion of op1 on Foreman's input queue.
+  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
+
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
+  EXPECT_TRUE(getOperatorFinishedStatus(id1));
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+
+  // An additional call to op2's getAllWorkOrders because of completion of op1.
+  EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(2, op2.getNumWorkOrders());
+
+  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
+
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  // Place a message of a workorder completion of op2 on Foreman's input queue.
+  EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));
+
+  EXPECT_TRUE(getOperatorFinishedStatus(id1));
+
+  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
+
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
+  EXPECT_FALSE(getOperatorFinishedStatus(id2));
+
+  // Send a message to Foreman upon workorder (generated by op2) completion.
+  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
+
+  EXPECT_TRUE(getOperatorFinishedStatus(id1));
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
+  EXPECT_TRUE(getOperatorFinishedStatus(id2));
+}
+
+TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) {
+  // In this test, we create a 2-node DAG with a non-blocking link between them.
+  // There is no streaming of data from op1 to op2 during the execution of op1.
+  // op1 produces a partially filled block at the end of its execution which is
+  // rebuilt and then fed to op2.
+  const QueryPlan::DAGNodeIndex id1 =
+      query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
+  const QueryPlan::DAGNodeIndex id2 =
+      query_plan_->addRelationalOperator(new MockOperator(true, true, 3, 1));
+
+  // Create a non-blocking link.
+  query_plan_->addDirectDependency(id2, id1, false);
+
+  // Create a relation, owned by db_.*/
+  CatalogRelation *relation =
+      new CatalogRelation(nullptr /* catalog_database */, "test_relation");
+  const relation_id output_relation_id = db_->addRelation(relation);
+
+  // Setup the InsertDestination proto in the query context proto.
+  serialization::QueryContext *query_context_proto =
+      query_handle_->getQueryContextProtoMutable();
+
+  const QueryContext::insert_destination_id insert_destination_index =
+      query_context_proto->insert_destinations_size();
+  serialization::InsertDestination *insert_destination_proto =
+      query_context_proto->add_insert_destinations();
+
+  insert_destination_proto->set_insert_destination_type(
+      serialization::InsertDestinationType::BLOCK_POOL);
+  insert_destination_proto->set_relation_id(output_relation_id);
+  insert_destination_proto->set_relational_op_index(id1);
+
+  MockOperator *op1_mutable = static_cast<MockOperator *>(
+      query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1));
+  op1_mutable->setInsertDestinationID(insert_destination_index);
+  op1_mutable->setOutputRelationID(output_relation_id);
+
+  const MockOperator &op1 = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(id1));
+  const MockOperator &op2 = static_cast<const MockOperator &>(
+      query_plan_->getQueryPlanDAG().getNodePayload(id2));
+
+  constructQueryManager();
+
+  // NOTE(zuyu): An operator generally has no ideas about partially filled
+  // blocks, but InsertDestination in QueryContext does.
+  // Mock to add partially filled blocks in the InsertDestination.
+  InsertDestination *insert_destination =
+      query_manager_->getQueryContextMutable()->getInsertDestination(
+          insert_destination_index);
+  DCHECK(insert_destination != nullptr);
+  MutableBlockReference block_ref;
+  static_cast<BlockPoolInsertDestination *>(insert_destination)
+      ->available_block_refs_.push_back(move(block_ref));
+
+  // There's no blocking dependency in the DAG.
+  EXPECT_TRUE(op1.getBlockingDependenciesMet());
+  EXPECT_TRUE(op2.getBlockingDependenciesMet());
+
+  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(1, op1.getNumWorkOrders());
+
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(0, op2.getNumWorkOrders());
+
+  unique_ptr<WorkerMessage> worker_message;
+  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
+
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  // Send a message to QueryManager upon workorder (generated by op1) completion.
+  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
+
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
+
+  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kRebuildWorkOrder,
+            worker_message->getType());
+
+  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  // op1 generates a rebuild workorder. The block is rebuilt and streamed
+  // to Foreman.
+  EXPECT_FALSE(placeDataPipelineMessage(id1));
+
+  EXPECT_FALSE(placeRebuildWorkOrderCompleteMessage(id1));
+  // Based on the streamed input, op2's getAllWorkOrders should produce a
+  // workorder.
+  EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
+  EXPECT_EQ(1, op2.getNumWorkOrders());
+
+  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
+
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+
+  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  EXPECT_TRUE(getOperatorFinishedStatus(id1));
+  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+  EXPECT_FALSE(getOperatorFinishedStatus(id2));
+  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
+
+  // Send a message to QueryManager upon workorder (generated by op2) completion.
+  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
+
+  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
+
+  EXPECT_TRUE(getOperatorFinishedStatus(id2));
+}
+
+TEST_F(QueryManagerTest, MultipleNodesNoOutputTest) {
+  // When an operator produces workorders but no output, the QueryManager should
+  // check the dependents of this operator to make progress.
+  const QueryPlan::DAGNodeIndex kNumNodes = 5;
+  std::vector<QueryPlan::DAGNodeIndex> ids;
+  ids.reserve(kNumNodes);
+
+  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
+    if (i == 0) {
+      ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, false));
+    } else {
+      ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, true));
+    }
+    VLOG(3) << ids[i];
+  }
+
+  /**
+   * The DAG looks like this:
+   *
+   * op1 -> op2 -> op3 -> op4 -> op5
+   *
+   **/
+  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes - 1; ++i) {
+    query_plan_->addDirectDependency(ids[i + 1], ids[i], false);
+    static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(ids[i]))
+        ->setOutputRelationID(0xdead);
+  }
+
+  std::vector<const MockOperator*> operators;
+  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
+    operators.push_back(static_cast<const MockOperator*>(&query_plan_->getQueryPlanDAG().getNodePayload(ids[i])));
+  }
+
+  constructQueryManager();
+
+  // operators[0] should have produced a workorder by now.
+  EXPECT_EQ(1, operators[0]->getNumWorkOrders());
+
+  unique_ptr<WorkerMessage> worker_message;
+  worker_message.reset(query_manager_->getNextWorkerMessage(ids[0], -1));
+
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+
+  EXPECT_EQ(ids[0], worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  EXPECT_EQ(1, getNumWorkOrdersInExecution(ids[0]));
+  EXPECT_FALSE(getOperatorFinishedStatus(ids[0]));
+
+  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
+    EXPECT_EQ(1, operators[ids[i]]->getNumCalls(MockOperator::kGetAllWorkOrders));
+  }
+
+  // Send a message to QueryManager upon workorder (generated by operators[0])
+  // completion.
+  EXPECT_TRUE(placeWorkOrderCompleteMessage(ids[0]));
+
+  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
+    EXPECT_EQ(0, getNumWorkOrdersInExecution(ids[i]));
+    EXPECT_TRUE(getOperatorFinishedStatus(ids[i]));
+    if (i < kNumNodes - 1) {
+      EXPECT_EQ(1, operators[i + 1]->getNumCalls(MockOperator::kDoneFeedingInputBlocks));
+    }
+  }
+}
+
+TEST_F(QueryManagerTest, OutOfOrderWorkOrderCompletionTest) {
+  // Consider two operators, both generate one workorder each. The dependent's
+  // workorder finishes before dependency's workorder.
+  const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
+  const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1));
+
+  // Create a non-blocking link.
+  query_plan_->addDirectDependency(id2, id1, false);
+
+  constructQueryManager();
+
+  unique_ptr<WorkerMessage> worker_message;
+  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
+
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+
+  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  // Send a message to QueryManager upon a block (output of op1) getting full.
+  EXPECT_FALSE(placeOutputBlockMessage(id1));
+
+  // op1 is not finished yet because the response of workorder completion hasn't
+  // been received yet.
+  EXPECT_FALSE(getOperatorFinishedStatus(id1));
+  EXPECT_FALSE(getOperatorFinishedStatus(id2));
+
+  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
+  EXPECT_TRUE(worker_message != nullptr);
+  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
+            worker_message->getType());
+
+  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
+
+  delete worker_message->getWorkOrder();
+
+  // As mentioned earlier, op2 finishes before op1.
+  EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));
+
+  // op1's workorder execution is over.
+  EXPECT_TRUE(placeWorkOrderCompleteMessage(id1));
+
+  EXPECT_TRUE(getOperatorFinishedStatus(id1));
+  EXPECT_TRUE(getOperatorFinishedStatus(id2));
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b4e25edc/query_execution/tests/QueryManager_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManager_unittest.cpp b/query_execution/tests/QueryManager_unittest.cpp
deleted file mode 100644
index 37e2cdd..0000000
--- a/query_execution/tests/QueryManager_unittest.cpp
+++ /dev/null
@@ -1,940 +0,0 @@
-/**
- *   Copyright 2011-2015 Quickstep Technologies LLC.
- *   Copyright 2015-2016 Pivotal Software, Inc.
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- **/
-
-#include <climits>
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryContext.pb.h"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionState.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryManager.hpp"
-#include "query_execution/WorkOrdersContainer.hpp"
-#include "query_execution/WorkerDirectory.hpp"
-#include "query_execution/WorkerMessage.hpp"
-#include "query_optimizer/QueryHandle.hpp"
-#include "query_optimizer/QueryPlan.hpp"
-#include "relational_operators/RelationalOperator.hpp"
-#include "relational_operators/WorkOrder.hpp"
-#include "storage/InsertDestination.hpp"
-#include "storage/InsertDestination.pb.h"
-#include "storage/StorageBlock.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageManager.hpp"
-#include "utility/DAG.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-#include "gtest/gtest.h"
-
-#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::move;
-using std::unique_ptr;
-using std::vector;
-
-using tmb::client_id;
-
-namespace quickstep {
-
-class WorkOrderProtosContainer;
-
-class MockWorkOrder : public WorkOrder {
- public:
-  explicit MockWorkOrder(const int op_index)
-      : WorkOrder(0), op_index_(op_index) {}
-
-  void execute() override {
-    VLOG(3) << "WorkOrder[" << op_index_ << "] executing.";
-  }
-
-  inline QueryPlan::DAGNodeIndex getOpIndex() const {
-    return op_index_;
-  }
-
- private:
-  const QueryPlan::DAGNodeIndex op_index_;
-
-  DISALLOW_COPY_AND_ASSIGN(MockWorkOrder);
-};
-
-class MockOperator: public RelationalOperator {
- public:
-  enum function_name {
-    kFeedInputBlock = 0,
-    kFeedInputBlocks,
-    kDoneFeedingInputBlocks,
-    kGetAllWorkOrders
-  };
-
-  MockOperator(const bool produce_workorders,
-               const bool has_streaming_input,
-               const int max_getworkorder_iters = 1,
-               const int max_workorders = INT_MAX)
-      : RelationalOperator(0 /* Query Id */),
-        produce_workorders_(produce_workorders),
-        has_streaming_input_(has_streaming_input),
-        max_workorders_(max_workorders),
-        max_getworkorder_iters_(max_getworkorder_iters),
-        num_calls_get_workorders_(0),
-        num_workorders_generated_(0),
-        num_calls_feedblock_(0),
-        num_calls_feedblocks_(0),
-        num_calls_donefeedingblocks_(0) {
-  }
-
-#define MOCK_OP_LOG(x) VLOG(x) << "Op[" << op_index_ << "]: " << __func__ << ": "
-
-  // The methods below are used to check whether QueryManager calls the Relational
-  // operator, how many times it calls a particular method etc.
-  inline int getNumWorkOrders() const {
-    return num_workorders_generated_;
-  }
-
-  inline int getNumCalls(const function_name fname) const {
-    switch (fname) {
-      case kFeedInputBlock:
-        return num_calls_feedblock_;
-      case kFeedInputBlocks:
-        return num_calls_feedblocks_;
-      case kDoneFeedingInputBlocks:
-        return num_calls_donefeedingblocks_;
-      case kGetAllWorkOrders:
-        return num_calls_get_workorders_;
-      default:
-        return -1;
-    }
-  }
-
-  inline bool getBlockingDependenciesMet() const {
-    MOCK_OP_LOG(3) << "met.";
-    return blocking_dependencies_met_;
-  }
-
-  void setInsertDestinationID(const QueryContext::insert_destination_id insert_destination_index) {
-    insert_destination_index_ = insert_destination_index;
-  }
-
-  // Mock to trigger doneFeedingInputBlocks for the dependent operators
-  // in QueryManager::markOperatorFinished.
-  void setOutputRelationID(const relation_id rel_id) {
-    output_relation_id_ = rel_id;
-  }
-
-  // Override methods from the base class.
-  bool getAllWorkOrders(
-      WorkOrdersContainer *container,
-      QueryContext *query_context,
-      StorageManager *storage_manager,
-      const tmb::client_id foreman_client_id,
-      tmb::MessageBus *bus) override {
-    ++num_calls_get_workorders_;
-    if (produce_workorders_) {
-      if (has_streaming_input_) {
-        if ((num_calls_feedblock_ > 0 || num_calls_feedblocks_ > 0) && (num_workorders_generated_ < max_workorders_)) {
-          MOCK_OP_LOG(3) << "[stream] generate WorkOrder";
-          container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
-          ++num_workorders_generated_;
-        }
-      } else {
-        if (blocking_dependencies_met_ && (num_workorders_generated_ < max_workorders_)) {
-          MOCK_OP_LOG(3) << "[static] generate WorkOrder";
-          container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
-          ++num_workorders_generated_;
-        }
-      }
-    }
-    MOCK_OP_LOG(3) << "count(" << num_calls_get_workorders_ << ") "
-                   << "return(" << (num_calls_get_workorders_ == max_getworkorder_iters_) << ")";
-    return num_calls_get_workorders_ == max_getworkorder_iters_;
-  }
-
-  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override {
-    return true;
-  }
-
-  void feedInputBlock(const block_id input_block_id,
-                      const relation_id input_relation_id) override {
-    ++num_calls_feedblock_;
-    MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")";
-  }
-
-  void feedInputBlocks(const relation_id rel_id,
-                       std::vector<block_id> *partially_filled_blocks) override {
-    ++num_calls_feedblocks_;
-    MOCK_OP_LOG(3) << "count(" << num_calls_feedblocks_ << ")";
-  }
-
-  void doneFeedingInputBlocks(const relation_id rel_id) override {
-    ++num_calls_donefeedingblocks_;
-    MOCK_OP_LOG(3) << "count(" << num_calls_donefeedingblocks_ << ")";
-  }
-
-  QueryContext::insert_destination_id getInsertDestinationID() const override {
-    return insert_destination_index_;
-  }
-
-  const relation_id getOutputRelationID() const override {
-    return output_relation_id_;
-  }
-
- private:
-  const bool produce_workorders_;
-  const bool has_streaming_input_;
-  const int max_workorders_;
-  const int max_getworkorder_iters_;
-
-  int num_calls_get_workorders_;
-  int num_workorders_generated_;
-  int num_calls_feedblock_;
-  int num_calls_feedblocks_;
-  int num_calls_donefeedingblocks_;
-
-  QueryContext::insert_destination_id insert_destination_index_ = QueryContext::kInvalidInsertDestinationId;
-
-  relation_id output_relation_id_ = -1;
-
-#undef MOCK_OP_LOG
-
-  DISALLOW_COPY_AND_ASSIGN(MockOperator);
-};
-
-
-class QueryManagerTest : public ::testing::Test {
- protected:
-  virtual void SetUp() {
-    db_.reset(new CatalogDatabase(nullptr /* catalog */, "database"));
-    storage_manager_.reset(new StorageManager("./"));
-    bus_.Initialize();
-    query_handle_.reset(new QueryHandle(0));  // dummy query ID.
-    query_plan_ = query_handle_->getQueryPlanMutable();
-    query_handle_->getQueryContextProtoMutable()->set_query_id(query_handle_->query_id());
-  }
-
-  inline void constructQueryManager() {
-    query_manager_.reset(new QueryManager(
-        0, 1, query_handle_.get(), db_.get(), storage_manager_.get(), &bus_));
-  }
-
-  inline const int getNumWorkOrdersInExecution(const QueryPlan::DAGNodeIndex index) const {
-    return query_manager_->getQueryExecutionState().getNumQueuedWorkOrders(index);
-  }
-
-  inline const int getNumOperatorsFinished() const {
-    return query_manager_->getQueryExecutionState().getNumOperatorsFinished();
-  }
-
-  inline bool getOperatorFinishedStatus(const QueryPlan::DAGNodeIndex index) const {
-    return query_manager_->getQueryExecutionState().hasExecutionFinished(index);
-  }
-
-  inline bool placeDataPipelineMessage(const QueryPlan::DAGNodeIndex source_operator_index) {
-    VLOG(3) << "Place DataPipeline message for Op[" << source_operator_index << "]";
-    serialization::DataPipelineMessage proto;
-    proto.set_operator_index(source_operator_index);
-
-    proto.set_block_id(0);  // dummy block ID
-    proto.set_relation_id(0);  // dummy relation ID.
-    proto.set_query_id(0);  // dummy query ID.
-
-    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
-    const std::size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
-                                      proto_length,
-                                      kDataPipelineMessage);
-    std::free(proto_bytes);
-    query_manager_->processMessage(tagged_message);
-    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
-  }
-
-  inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
-    VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]";
-    TaggedMessage tagged_message;
-    serialization::NormalWorkOrderCompletionMessage proto;
-    proto.set_operator_index(index);
-    proto.set_worker_thread_index(1);  // dummy worker ID.
-    proto.set_query_id(0);  // dummy query ID.
-
-    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
-    const size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kWorkOrderCompleteMessage);
-    std::free(proto_bytes);
-    query_manager_->processMessage(message);
-
-    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
-  }
-
-  inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
-    VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]";
-    serialization::RebuildWorkOrderCompletionMessage proto;
-    proto.set_operator_index(index);
-    proto.set_worker_thread_index(1);  // dummy worker thread ID.
-    proto.set_query_id(0);  // dummy query ID.
-
-    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
-    const size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kRebuildWorkOrderCompleteMessage);
-
-    std::free(proto_bytes);
-    query_manager_->processMessage(message);
-
-    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
-  }
-
-  inline bool placeOutputBlockMessage(const QueryPlan::DAGNodeIndex index) {
-    VLOG(3) << "Place OutputBlock message for Op[" << index << "]";
-    serialization::DataPipelineMessage proto;
-    proto.set_operator_index(index);
-
-    proto.set_block_id(0);  // dummy block ID
-    proto.set_relation_id(0);  // dummy relation ID.
-    proto.set_query_id(0);  // dummy query ID.
-
-    // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
-    const std::size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
-                                      proto_length,
-                                      kDataPipelineMessage);
-    std::free(proto_bytes);
-    query_manager_->processMessage(tagged_message);
-    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
-  }
-
-  unique_ptr<CatalogDatabase> db_;
-  unique_ptr<StorageManager> storage_manager_;
-
-  QueryPlan *query_plan_;
-  unique_ptr<QueryHandle> query_handle_;
-  unique_ptr<QueryManager> query_manager_;
-
-  MessageBusImpl bus_;
-
-  client_id worker_client_id_;
-
-  unique_ptr<WorkerDirectory> workers_;
-};
-
-TEST_F(QueryManagerTest, SingleNodeDAGNoWorkOrdersTest) {
-  // This test creates a DAG of a single node. No workorders are generated.
-  query_plan_->addRelationalOperator(new MockOperator(false, false));
-
-  const MockOperator &op = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(0));
-
-  constructQueryManager();
-
-  // op doesn't have any dependencies.
-  EXPECT_TRUE(op.getBlockingDependenciesMet());
-
-  // We expect one call for op's getAllWorkOrders().
-  EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
-}
-
-TEST_F(QueryManagerTest, SingleNodeDAGStaticWorkOrdersTest) {
-  // This test creates a DAG of a single node. Static workorders are generated.
-  const QueryPlan::DAGNodeIndex id =
-      query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
-
-  const MockOperator &op = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(id));
-
-  constructQueryManager();
-
-  // op doesn't have any dependencies.
-  EXPECT_TRUE(op.getBlockingDependenciesMet());
-
-  // We expect one call for op's getAllWorkOrders().
-  EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  // One workorder is generated.
-  EXPECT_EQ(1, op.getNumWorkOrders());
-
-  unique_ptr<WorkerMessage> worker_message;
-  worker_message.reset(query_manager_->getNextWorkerMessage(0, -1));
-  EXPECT_TRUE(worker_message != nullptr);
-
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-  EXPECT_EQ(0u, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
-  EXPECT_EQ(0, getNumOperatorsFinished());
-
-  // Send a message to QueryManager upon workorder completion.
-  // Last event processed by QueryManager.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
-  EXPECT_EQ(1, getNumOperatorsFinished());
-  EXPECT_TRUE(getOperatorFinishedStatus(id));
-}
-
-TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
-  // This test creates a DAG of a single node. WorkOrders are generated
-  // dynamically as pending work orders complete execution, i.e.,
-  // getAllWorkOrders() is called multiple times.  getAllWorkOrders() will be
-  // called 5 times and 3 work orders will be returned, i.e., 1st 3 calls to
-  // getAllWorkOrders() insert 1 WorkOrder and return false, and the next will
-  // insert no WorkOrder and return true.
-
-  // TODO(shoban): This test can not be more robust than this because of fixed
-  // scaffolding of mocking. If we use gMock, we can do much better.
-  const QueryPlan::DAGNodeIndex id =
-      query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3));
-
-  const MockOperator &op = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(id));
-
-  constructQueryManager();
-
-  // op doesn't have any dependencies.
-  EXPECT_TRUE(op.getBlockingDependenciesMet());
-
-  for (int i = 0; i < 3; ++i) {
-    // We expect one call for op's getAllWorkOrders().
-    EXPECT_EQ(i + 1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
-
-    // One workorder is generated.
-    // EXPECT_EQ(1, getWorkerInputQueueSize());
-    EXPECT_EQ(i + 1, op.getNumWorkOrders());
-
-    unique_ptr<WorkerMessage> worker_message;
-    worker_message.reset(query_manager_->getNextWorkerMessage(id, -1));
-
-    EXPECT_TRUE(worker_message != nullptr);
-    EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-              worker_message->getType());
-    EXPECT_EQ(id, worker_message->getRelationalOpIndex());
-
-    delete worker_message->getWorkOrder();
-
-    EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
-    EXPECT_EQ(0, getNumOperatorsFinished());
-
-    if (i < 2) {
-      // Send a message to QueryManager upon workorder completion.
-      EXPECT_FALSE(placeWorkOrderCompleteMessage(id));
-    } else {
-      // Send a message to QueryManager upon workorder completion.
-      // Last event.
-      EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
-    }
-  }
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
-
-  EXPECT_EQ(1, getNumOperatorsFinished());
-  EXPECT_TRUE(getOperatorFinishedStatus(id));
-
-  // We place this check in the end, since it's true throughout the test.
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks));
-}
-
-TEST_F(QueryManagerTest, TwoNodesDAGBlockingLinkTest) {
-  // We use two nodes in the DAG with a blocking link between them.
-  // There is no streaming of data involved in this test.
-  const QueryPlan::DAGNodeIndex id1 =
-      query_plan_->addRelationalOperator(new MockOperator(true, false));
-  const QueryPlan::DAGNodeIndex id2 =
-      query_plan_->addRelationalOperator(new MockOperator(true, false));
-
-  // Create a blocking link.
-  query_plan_->addDirectDependency(id2, id1, true);
-
-  static_cast<MockOperator *>(
-      query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
-          ->setOutputRelationID(0xdead);
-
-  const MockOperator &op1 = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(id1));
-  const MockOperator &op2 = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(id2));
-
-  constructQueryManager();
-
-  // op1 doesn't have any dependencies
-  EXPECT_TRUE(op1.getBlockingDependenciesMet());
-
-  // Only op1 should receive a call to getAllWorkOrders initially.
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  // Only op1 should produce a workorder.
-  EXPECT_EQ(1, op1.getNumWorkOrders());
-  EXPECT_EQ(0, op2.getNumWorkOrders());
-
-  // Foreman hasn't yet got workorder completion response for the workorder.
-  unique_ptr<WorkerMessage> worker_message;
-  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
-
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(id1));
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
-  EXPECT_EQ(0, getNumOperatorsFinished());
-
-  // Send a message to Foreman upon workorder (generated by op1) completion.
-  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
-  // op1 is over now, op2 still to go.
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-  EXPECT_EQ(1, getNumOperatorsFinished());
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-  EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
-  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
-
-  // op1 is op2's blocking dependency.
-  EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  // op2 should get first call of getAllWorkOrders() when op1 is over.
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-
-  EXPECT_EQ(1, op2.getNumWorkOrders());
-
-  // Send a message to QueryManager upon workorder (generated by op2) completion.
-  // Note that the worker hasn't yet popped the workorder. Usually this won't
-  // happen as workers pop workorders first, execute and then send the response.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
-
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
-
-  EXPECT_EQ(2, getNumOperatorsFinished());
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_TRUE(getOperatorFinishedStatus(id2));
-
-  // Expect no additional calls to getAllWorkOrders.
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-}
-
-TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) {
-  // We use two nodes in the DAG with a non-blocking link between them.
-  // We stream output of op1 to op2. Sequeuce of events is as follows:
-  // 1. op1 creates a workorder.
-  // 2. We send a "block full" (from op1) to QueryManager.
-  // 3. op2 creates a workorder because of step 2.
-  const QueryPlan::DAGNodeIndex id1 =
-      query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
-  const QueryPlan::DAGNodeIndex id2 =
-      query_plan_->addRelationalOperator(new MockOperator(true, true, 3));
-
-  // Create a non-blocking link.
-  query_plan_->addDirectDependency(id2, id1, false);
-
-  static_cast<MockOperator *>(
-      query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
-      ->setOutputRelationID(0xdead);
-
-  const MockOperator &op1 = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(id1));
-  const MockOperator &op2 = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(id2));
-
-  constructQueryManager();
-
-  // As none of the operators have a blocking link, blocking dependencies should
-  // be met.
-  EXPECT_TRUE(op1.getBlockingDependenciesMet());
-  EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(1, op1.getNumWorkOrders());
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  // op2 will generate workorder only after receiving a streaming input.
-  EXPECT_EQ(0, op2.getNumWorkOrders());
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  unique_ptr<WorkerMessage> worker_message;
-  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
-
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  // Send a message to QueryManager upon block getting full (output of op1).
-  EXPECT_FALSE(placeOutputBlockMessage(id1));
-
-  // op1 is not finished yet because the response of workorder completion hasn't
-  // been received yet by the QueryManager.
-  EXPECT_FALSE(getOperatorFinishedStatus(id1));
-  EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
-  // No additional call to op1's getAllWorkOrders.
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  // Output from op1 should be fed to op2.
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kFeedInputBlock));
-  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks));
-
-  // A call to op2's getAllWorkOrders because of the streamed input.
-  EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(1, op2.getNumWorkOrders());
-
-  // Place a message of a workorder completion of op1 on Foreman's input queue.
-  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-
-  // An additional call to op2's getAllWorkOrders because of completion of op1.
-  EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(2, op2.getNumWorkOrders());
-
-  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
-
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  // Place a message of a workorder completion of op2 on Foreman's input queue.
-  EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-
-  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
-
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
-  EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
-  // Send a message to Foreman upon workorder (generated by op2) completion.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
-  EXPECT_TRUE(getOperatorFinishedStatus(id2));
-}
-
-TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) {
-  // In this test, we create a 2-node DAG with a non-blocking link between them.
-  // There is no streaming of data from op1 to op2 during the execution of op1.
-  // op1 produces a partially filled block at the end of its execution which is
-  // rebuilt and then fed to op2.
-  const QueryPlan::DAGNodeIndex id1 =
-      query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
-  const QueryPlan::DAGNodeIndex id2 =
-      query_plan_->addRelationalOperator(new MockOperator(true, true, 3, 1));
-
-  // Create a non-blocking link.
-  query_plan_->addDirectDependency(id2, id1, false);
-
-  // Create a relation, owned by db_.*/
-  CatalogRelation *relation =
-      new CatalogRelation(nullptr /* catalog_database */, "test_relation");
-  const relation_id output_relation_id = db_->addRelation(relation);
-
-  // Setup the InsertDestination proto in the query context proto.
-  serialization::QueryContext *query_context_proto =
-      query_handle_->getQueryContextProtoMutable();
-
-  const QueryContext::insert_destination_id insert_destination_index =
-      query_context_proto->insert_destinations_size();
-  serialization::InsertDestination *insert_destination_proto =
-      query_context_proto->add_insert_destinations();
-
-  insert_destination_proto->set_insert_destination_type(
-      serialization::InsertDestinationType::BLOCK_POOL);
-  insert_destination_proto->set_relation_id(output_relation_id);
-  insert_destination_proto->set_relational_op_index(id1);
-
-  MockOperator *op1_mutable = static_cast<MockOperator *>(
-      query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1));
-  op1_mutable->setInsertDestinationID(insert_destination_index);
-  op1_mutable->setOutputRelationID(output_relation_id);
-
-  const MockOperator &op1 = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(id1));
-  const MockOperator &op2 = static_cast<const MockOperator &>(
-      query_plan_->getQueryPlanDAG().getNodePayload(id2));
-
-  constructQueryManager();
-
-  // NOTE(zuyu): An operator generally has no ideas about partially filled
-  // blocks, but InsertDestination in QueryContext does.
-  // Mock to add partially filled blocks in the InsertDestination.
-  InsertDestination *insert_destination =
-      query_manager_->getQueryContextMutable()->getInsertDestination(
-          insert_destination_index);
-  DCHECK(insert_destination != nullptr);
-  MutableBlockReference block_ref;
-  static_cast<BlockPoolInsertDestination *>(insert_destination)
-      ->available_block_refs_.push_back(move(block_ref));
-
-  // There's no blocking dependency in the DAG.
-  EXPECT_TRUE(op1.getBlockingDependenciesMet());
-  EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
-  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(1, op1.getNumWorkOrders());
-
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(0, op2.getNumWorkOrders());
-
-  unique_ptr<WorkerMessage> worker_message;
-  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
-
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  // Send a message to QueryManager upon workorder (generated by op1) completion.
-  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
-
-  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kRebuildWorkOrder,
-            worker_message->getType());
-
-  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  // op1 generates a rebuild workorder. The block is rebuilt and streamed
-  // to Foreman.
-  EXPECT_FALSE(placeDataPipelineMessage(id1));
-
-  EXPECT_FALSE(placeRebuildWorkOrderCompleteMessage(id1));
-  // Based on the streamed input, op2's getAllWorkOrders should produce a
-  // workorder.
-  EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
-  EXPECT_EQ(1, op2.getNumWorkOrders());
-
-  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
-
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-
-  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-  EXPECT_FALSE(getOperatorFinishedStatus(id2));
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
-
-  // Send a message to QueryManager upon workorder (generated by op2) completion.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));
-
-  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id2));
-}
-
-TEST_F(QueryManagerTest, MultipleNodesNoOutputTest) {
-  // When an operator produces workorders but no output, the QueryManager should
-  // check the dependents of this operator to make progress.
-  const QueryPlan::DAGNodeIndex kNumNodes = 5;
-  std::vector<QueryPlan::DAGNodeIndex> ids;
-  ids.reserve(kNumNodes);
-
-  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
-    if (i == 0) {
-      ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, false));
-    } else {
-      ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, true));
-    }
-    VLOG(3) << ids[i];
-  }
-
-  /**
-   * The DAG looks like this:
-   *
-   * op1 -> op2 -> op3 -> op4 -> op5
-   *
-   **/
-  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes - 1; ++i) {
-    query_plan_->addDirectDependency(ids[i + 1], ids[i], false);
-    static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(ids[i]))
-        ->setOutputRelationID(0xdead);
-  }
-
-  std::vector<const MockOperator*> operators;
-  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
-    operators.push_back(static_cast<const MockOperator*>(&query_plan_->getQueryPlanDAG().getNodePayload(ids[i])));
-  }
-
-  constructQueryManager();
-
-  // operators[0] should have produced a workorder by now.
-  EXPECT_EQ(1, operators[0]->getNumWorkOrders());
-
-  unique_ptr<WorkerMessage> worker_message;
-  worker_message.reset(query_manager_->getNextWorkerMessage(ids[0], -1));
-
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-
-  EXPECT_EQ(ids[0], worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  EXPECT_EQ(1, getNumWorkOrdersInExecution(ids[0]));
-  EXPECT_FALSE(getOperatorFinishedStatus(ids[0]));
-
-  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
-    EXPECT_EQ(1, operators[ids[i]]->getNumCalls(MockOperator::kGetAllWorkOrders));
-  }
-
-  // Send a message to QueryManager upon workorder (generated by operators[0])
-  // completion.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(ids[0]));
-
-  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
-    EXPECT_EQ(0, getNumWorkOrdersInExecution(ids[i]));
-    EXPECT_TRUE(getOperatorFinishedStatus(ids[i]));
-    if (i < kNumNodes - 1) {
-      EXPECT_EQ(1, operators[i + 1]->getNumCalls(MockOperator::kDoneFeedingInputBlocks));
-    }
-  }
-}
-
-TEST_F(QueryManagerTest, OutOfOrderWorkOrderCompletionTest) {
-  // Consider two operators, both generate one workorder each. The dependent's
-  // workorder finishes before dependency's workorder.
-  const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
-  const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1));
-
-  // Create a non-blocking link.
-  query_plan_->addDirectDependency(id2, id1, false);
-
-  constructQueryManager();
-
-  unique_ptr<WorkerMessage> worker_message;
-  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
-
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-
-  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  // Send a message to QueryManager upon a block (output of op1) getting full.
-  EXPECT_FALSE(placeOutputBlockMessage(id1));
-
-  // op1 is not finished yet because the response of workorder completion hasn't
-  // been received yet.
-  EXPECT_FALSE(getOperatorFinishedStatus(id1));
-  EXPECT_FALSE(getOperatorFinishedStatus(id2));
-
-  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
-  EXPECT_TRUE(worker_message != nullptr);
-  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
-            worker_message->getType());
-
-  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());
-
-  delete worker_message->getWorkOrder();
-
-  // As mentioned earlier, op2 finishes before op1.
-  EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));
-
-  // op1's workorder execution is over.
-  EXPECT_TRUE(placeWorkOrderCompleteMessage(id1));
-
-  EXPECT_TRUE(getOperatorFinishedStatus(id1));
-  EXPECT_TRUE(getOperatorFinishedStatus(id2));
-}
-
-}  // namespace quickstep