You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/10/11 02:23:58 UTC

[1/3] incubator-quickstep git commit: IWYU fixes for QueryExecutionUtil. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/multiple_shiftboss b0c374603 -> ec885c7ec (forced update)


IWYU fixes for QueryExecutionUtil.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b5dcb6d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b5dcb6d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b5dcb6d2

Branch: refs/heads/multiple_shiftboss
Commit: b5dcb6d29805dce944c11d5ad0720a3267ad57cf
Parents: 2e02333
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Oct 9 15:48:03 2016 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Oct 9 15:48:03 2016 -0700

----------------------------------------------------------------------
 query_execution/CMakeLists.txt         |  1 -
 query_execution/QueryExecutionUtil.hpp | 34 ++++++++++++++++-------------
 2 files changed, 19 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b5dcb6d2/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 1b27194..6a84be1 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -217,7 +217,6 @@ target_link_libraries(quickstep_queryexecution_QueryExecutionTypedefs
 target_link_libraries(quickstep_queryexecution_QueryExecutionUtil
                       quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_QueryExecutionTypedefs
-                      quickstep_queryexecution_WorkerMessage
                       quickstep_utility_Macros
                       tmb)
 target_link_libraries(quickstep_queryexecution_QueryManagerBase

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b5dcb6d2/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index feb4cc0..7a3a3b3 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -20,26 +20,30 @@
 #ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
 #define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
 
+#include <cstddef>
 #include <memory>
 #include <utility>
 
 #include "query_execution/AdmitRequestMessage.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/WorkerMessage.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
 
 #include "tmb/address.h"
 #include "tmb/id_typedefs.h"
-#include "tmb/message_style.h"
 #include "tmb/message_bus.h"
+#include "tmb/message_style.h"
 #include "tmb/tagged_message.h"
 
 namespace quickstep {
 
 class QueryHandle;
 
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
 /**
  * @brief A static class for reusable methods in query_execution module.
  **/
@@ -57,14 +61,14 @@ class QueryExecutionUtil {
    *         The caller should ensure that the status is SendStatus::kOK.
    **/
   static tmb::MessageBus::SendStatus SendTMBMessage(
-      MessageBus *bus,
-      client_id sender_id,
-      client_id receiver_id,
-      TaggedMessage &&tagged_message) {  // NOLINT(whitespace/operators)
-    Address receiver_address;
+      tmb::MessageBus *bus,
+      tmb::client_id sender_id,
+      tmb::client_id receiver_id,
+      tmb::TaggedMessage &&tagged_message) {  // NOLINT(whitespace/operators)
+    tmb::Address receiver_address;
     receiver_address.AddRecipient(receiver_id);
 
-    MessageStyle single_receiver_style;
+    tmb::MessageStyle single_receiver_style;
     return bus->Send(sender_id,
                      receiver_address,
                      single_receiver_style,
@@ -88,11 +92,11 @@ class QueryExecutionUtil {
       const tmb::client_id sender_id,
       const tmb::client_id receiver_id,
       QueryHandle *query_handle,
-      MessageBus *bus) {
+      tmb::MessageBus *bus) {
     std::unique_ptr<AdmitRequestMessage> request_message(
         new AdmitRequestMessage(query_handle));
     const std::size_t size_of_request_msg = sizeof(*request_message);
-    TaggedMessage admit_tagged_message(
+    tmb::TaggedMessage admit_tagged_message(
         request_message.release(), size_of_request_msg, kAdmitRequestMessage);
 
     return QueryExecutionUtil::SendTMBMessage(
@@ -111,9 +115,9 @@ class QueryExecutionUtil {
    **/
   static void ReceiveQueryCompletionMessage(const tmb::client_id receiver_id,
                                             tmb::MessageBus *bus) {
-    const AnnotatedMessage annotated_msg =
+    const tmb::AnnotatedMessage annotated_msg =
         bus->Receive(receiver_id, 0, true);
-    const TaggedMessage &tagged_message = annotated_msg.tagged_message;
+    const tmb::TaggedMessage &tagged_message = annotated_msg.tagged_message;
     DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
   }
 
@@ -122,11 +126,11 @@ class QueryExecutionUtil {
     // The sender thread broadcasts poison message to the workers and foreman.
     // Each worker dies after receiving poison message. The order of workers'
     // death is irrelavant.
-    MessageStyle style;
+    tmb::MessageStyle style;
     style.Broadcast(true);
-    Address address;
+    tmb::Address address;
     address.All(true);
-    TaggedMessage poison_tagged_message(kPoisonMessage);
+    tmb::TaggedMessage poison_tagged_message(kPoisonMessage);
 
     DLOG(INFO) << "TMB client ID " << sender_id
                << " broadcast PoisonMessage (typed '" << kPoisonMessage << "') to all";


[2/3] incubator-quickstep git commit: Minor bug fix in AggregationOperationState

Posted by zu...@apache.org.
Minor bug fix in AggregationOperationState

- Replace getHashTable() call with getHashTableFast() call.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/80af2332
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/80af2332
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/80af2332

Branch: refs/heads/multiple_shiftboss
Commit: 80af23327f1f19d2f55457f949917a3cb2e467a5
Parents: b5dcb6d
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Mon Oct 10 14:13:09 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Mon Oct 10 14:13:09 2016 -0500

----------------------------------------------------------------------
 storage/AggregationOperationState.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/80af2332/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 073b813..7908db1 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -537,7 +537,7 @@ void AggregationOperationState::finalizeHashTable(
       // However for aggregateOnDistinctifyHashTableForGroupBy to work
       // correctly, we should create an empty group by hash table.
       AggregationStateHashTableBase *new_hash_table =
-          group_by_hashtable_pool_->getHashTable();
+          group_by_hashtable_pool_->getHashTableFast();
       group_by_hashtable_pool_->returnHashTable(new_hash_table);
       hash_tables = group_by_hashtable_pool_->getAllHashTables();
     }


[3/3] incubator-quickstep git commit: Initiated query execution data structure in all Shiftbosses.

Posted by zu...@apache.org.
Initiated query execution data structure in all Shiftbosses.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ec885c7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ec885c7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ec885c7e

Branch: refs/heads/multiple_shiftboss
Commit: ec885c7ec420c9c2fd104607058d771667680f9e
Parents: 80af233
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Oct 9 18:44:00 2016 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Oct 10 19:23:47 2016 -0700

----------------------------------------------------------------------
 query_execution/ForemanDistributed.cpp          | 15 ++++-
 query_execution/ForemanDistributed.hpp          |  5 ++
 query_execution/PolicyEnforcerBase.cpp          |  2 +
 query_execution/PolicyEnforcerBase.hpp          | 13 ++++
 query_execution/PolicyEnforcerDistributed.cpp   | 67 +++++++++-----------
 query_execution/QueryExecutionMessages.proto    |  6 +-
 query_execution/QueryExecutionUtil.hpp          | 13 ++++
 query_execution/Shiftboss.cpp                   |  2 +
 .../DistributedExecutionGeneratorTestRunner.hpp |  2 +-
 9 files changed, 84 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ec885c7e/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 9c20465..56b319b 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -18,6 +18,8 @@
 #include <cstdio>
 #include <cstdlib>
 #include <memory>
+#include <unordered_map>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -163,7 +165,9 @@ void ForemanDistributed::run() {
         break;
       }
       case kQueryInitiateResponseMessage: {
-        // TODO(zuyu): check the query id.
+        S::QueryInitiateResponseMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+        CHECK(policy_enforcer_->existQuery(proto.query_id()));
         break;
       }
       case kCatalogRelationNewBlockMessage:  // Fall through
@@ -183,7 +187,14 @@ void ForemanDistributed::run() {
         S::SaveQueryResultResponseMessage proto;
         CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
 
-        processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
+        const std::size_t query_id = proto.query_id();
+        query_result_saved_shiftbosses_[query_id].insert(proto.shiftboss_index());
+
+        // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
+        if (query_result_saved_shiftbosses_[query_id].size() == shiftboss_directory_.size()) {
+          processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
+          query_result_saved_shiftbosses_.erase(query_id);
+        }
         break;
       }
       case kPoisonMessage: {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ec885c7e/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index fc1ede5..b42795c 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -18,6 +18,8 @@
 #include <cstddef>
 #include <cstdio>
 #include <memory>
+#include <unordered_map>
+#include <unordered_set>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
@@ -120,6 +122,9 @@ class ForemanDistributed final : public ForemanBase {
 
   std::unique_ptr<PolicyEnforcerDistributed> policy_enforcer_;
 
+  // From a query id to a set of Shiftbosses that save query result.
+  std::unordered_map<std::size_t, std::unordered_set<std::size_t>> query_result_saved_shiftbosses_;
+
   DISALLOW_COPY_AND_ASSIGN(ForemanDistributed);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ec885c7e/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 4174bd6..745ded6 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -142,6 +142,8 @@ void PolicyEnforcerBase::removeQuery(const std::size_t query_id) {
                  << " that hasn't finished its execution";
   }
   admitted_queries_.erase(query_id);
+
+  removed_query_ids_.insert(query_id);
 }
 
 bool PolicyEnforcerBase::admitQueries(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ec885c7e/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index 62906e9..25da598 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -24,6 +24,7 @@
 #include <memory>
 #include <queue>
 #include <unordered_map>
+#include <unordered_set>
 #include <vector>
 
 #include "query_execution/QueryExecutionTypedefs.hpp"
@@ -106,6 +107,16 @@ class PolicyEnforcerBase {
   void processMessage(const TaggedMessage &tagged_message);
 
   /**
+   * @brief Check if the given query id ever exists.
+   *
+   * @return True if the query ever exists, otherwise false.
+   **/
+  inline bool existQuery(const std::size_t query_id) const {
+    return admitted_queries_.find(query_id) != admitted_queries_.end() ||
+           removed_query_ids_.find(query_id) != removed_query_ids_.end();
+  }
+
+  /**
    * @brief Check if there are any queries to be executed.
    *
    * @return True if there is at least one active or waiting query, false if
@@ -163,6 +174,8 @@ class PolicyEnforcerBase {
   // Key = query ID, value = QueryManagerBase* for the key query.
   std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_;
 
+  std::unordered_set<std::size_t> removed_query_ids_;
+
   // The queries which haven't been admitted yet.
   std::queue<QueryHandle*> waiting_queries_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ec885c7e/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 47491ed..c06fd86 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -37,6 +37,7 @@
 #include "gflags/gflags.h"
 #include "glog/logging.h"
 
+#include "tmb/address.h"
 #include "tmb/id_typedefs.h"
 #include "tmb/message_bus.h"
 #include "tmb/tagged_message.h"
@@ -170,25 +171,18 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand
                         kQueryInitiateMessage);
   free(proto_bytes);
 
-  // TODO(zuyu): Multiple Shiftbosses support.
+  // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
+  tmb::Address shiftboss_addresses;
+  for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) {
+    shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
+  }
+
   DLOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage
-             << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
-  const tmb::MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         foreman_client_id_,
-                                         shiftboss_directory_->getClientId(0),
-                                         move(message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
-
-  // Wait Shiftboss for QueryInitiateResponseMessage.
-  const tmb::AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
-  const TaggedMessage &tagged_message = annotated_message.tagged_message;
-  DCHECK_EQ(kQueryInitiateResponseMessage, tagged_message.message_type());
-  DLOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type()
-             << "' message from client " << annotated_message.sender;
-
-  S::QueryInitiateResponseMessage proto_response;
-  CHECK(proto_response.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+             << "') to all Shiftbosses";
+  QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
+                                       shiftboss_addresses,
+                                       move(message),
+                                       bus_);
 }
 
 void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manager) {
@@ -198,8 +192,14 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
   const tmb::client_id cli_id = query_handle->getClientId();
   const std::size_t query_id = query_handle->query_id();
 
+  // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
+  tmb::Address shiftboss_addresses;
+  for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) {
+    shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
+  }
+
   if (query_result == nullptr) {
-    // Clean up query execution states, i.e., QueryContext, in Shiftboss.
+    // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
     serialization::QueryTeardownMessage proto;
     proto.set_query_id(query_id);
 
@@ -211,15 +211,12 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
                           proto_length,
                           kQueryTeardownMessage);
 
-    // TODO(zuyu): Support multiple shiftbosses.
     DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
-               << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
-    tmb::MessageBus::SendStatus send_status =
-        QueryExecutionUtil::SendTMBMessage(bus_,
-                                           foreman_client_id_,
-                                           shiftboss_directory_->getClientId(0),
-                                           move(message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+               << "') to all Shiftbosses";
+    QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
+                                         shiftboss_addresses,
+                                         move(message),
+                                         bus_);
 
     TaggedMessage cli_message(kQueryExecutionSuccessMessage);
 
@@ -227,7 +224,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
     DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '"
                << kQueryExecutionSuccessMessage
                << "') to CLI with TMB client id " << cli_id;
-    send_status =
+    const tmb::MessageBus::SendStatus send_status =
         QueryExecutionUtil::SendTMBMessage(bus_,
                                            foreman_client_id_,
                                            cli_id,
@@ -257,15 +254,13 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
                         kSaveQueryResultMessage);
   free(proto_bytes);
 
-  // TODO(zuyu): Support multiple shiftbosses.
+  // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
   DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
-             << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
-  const tmb::MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         foreman_client_id_,
-                                         shiftboss_directory_->getClientId(0),
-                                         move(message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+             << "') to all Shiftbosses";
+  QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
+                                       shiftboss_addresses,
+                                       move(message),
+                                       bus_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ec885c7e/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 060efa1..1a2cb78 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -128,8 +128,10 @@ message SaveQueryResultMessage {
 }
 
 message SaveQueryResultResponseMessage {
-  required int32 relation_id = 1;
-  required uint32 cli_id = 2;  // tmb::client_id.
+  required uint64 query_id = 1;
+  required int32 relation_id = 2;
+  required uint32 cli_id = 3;  // tmb::client_id.
+  required uint64 shiftboss_index = 4;
 }
 
 message QueryExecutionSuccessMessage {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ec885c7e/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 7a3a3b3..b41965c 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -121,6 +121,19 @@ class QueryExecutionUtil {
     DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
   }
 
+  static void BroadcastMessage(const tmb::client_id sender_id,
+                               const tmb::Address &addresses,
+                               tmb::TaggedMessage &&tagged_message,  // NOLINT(whitespace/operators)
+                               tmb::MessageBus *bus) {
+    // The sender broadcasts the given message to all 'addresses'.
+    tmb::MessageStyle style;
+    style.Broadcast(true);
+
+    const tmb::MessageBus::SendStatus send_status =
+        bus->Send(sender_id, addresses, style, std::move(tagged_message));
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+  }
+
   static void BroadcastPoisonMessage(const tmb::client_id sender_id, tmb::MessageBus *bus) {
     // Terminate all threads.
     // The sender thread broadcasts poison message to the workers and foreman.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ec885c7e/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 5c2c5e0..a434527 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -189,8 +189,10 @@ void Shiftboss::run() {
         query_contexts_.erase(proto.query_id());
 
         serialization::SaveQueryResultResponseMessage proto_response;
+        proto_response.set_query_id(proto.query_id());
         proto_response.set_relation_id(proto.relation_id());
         proto_response.set_cli_id(proto.cli_id());
+        proto_response.set_shiftboss_index(shiftboss_index_);
 
         const size_t proto_response_length = proto_response.ByteSize();
         char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ec885c7e/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
index e4d0765..ab10841 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -49,7 +49,7 @@ namespace quickstep {
 namespace optimizer {
 
 namespace {
-constexpr int kNumInstances = 1;
+constexpr int kNumInstances = 3;
 }  // namespace
 
 /**