You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/12/05 01:04:17 UTC

incubator-quickstep git commit: Scheduling based on data locality info.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/data-locality [created] 982cc0653


Scheduling based on data locality info.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/982cc065
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/982cc065
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/982cc065

Branch: refs/heads/data-locality
Commit: 982cc065378535f50fc57a6ded0ddb868aaab02d
Parents: 0859a17
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Dec 4 14:29:32 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Dec 4 15:48:30 2016 -0800

----------------------------------------------------------------------
 query_execution/BlockLocator.cpp                | 11 +++-
 query_execution/BlockLocator.hpp                | 34 +++++++++-
 query_execution/BlockLocatorUtil.cpp            |  2 +
 query_execution/BlockLocatorUtil.hpp            |  2 +
 query_execution/CMakeLists.txt                  |  2 +
 query_execution/ForemanDistributed.cpp          | 65 +++++++++++++++++++-
 query_execution/ForemanDistributed.hpp          | 27 +++++++-
 query_execution/QueryExecutionMessages.proto    |  1 +
 query_execution/QueryExecutionTypedefs.hpp      |  2 +
 query_execution/tests/BlockLocator_unittest.cpp |  3 +-
 .../DistributedExecutionGeneratorTestRunner.cpp |  9 ++-
 storage/tests/DataExchange_unittest.cpp         |  5 +-
 12 files changed, 149 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/982cc065/query_execution/BlockLocator.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index 5de6a54..8d675b1 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -19,6 +19,7 @@
 
 #include "query_execution/BlockLocator.hpp"
 
+#include <cstddef>
 #include <cstdlib>
 #include <string>
 #include <utility>
@@ -38,6 +39,7 @@
 using std::free;
 using std::malloc;
 using std::move;
+using std::size_t;
 
 using tmb::TaggedMessage;
 using tmb::client_id;
@@ -62,7 +64,7 @@ void BlockLocator::run() {
         serialization::BlockDomainRegistrationMessage proto;
         CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
 
-        processBlockDomainRegistrationMessage(sender, proto.domain_network_address());
+        processBlockDomainRegistrationMessage(sender, proto.domain_network_address(), proto.shiftboss_index());
         break;
       }
       case kAddBlockLocationMessage: {
@@ -139,12 +141,17 @@ void BlockLocator::run() {
 }
 
 void BlockLocator::processBlockDomainRegistrationMessage(const client_id receiver,
-                                                         const std::string &network_address) {
+                                                         const std::string &network_address,
+                                                         const size_t shiftboss_index) {
   DCHECK_LT(block_domain_, kMaxDomain);
 
   domain_network_addresses_.emplace(++block_domain_, network_address);
   domain_blocks_[block_domain_];
 
+  if (shiftboss_index != kInvalidShiftbossIndex) {
+    block_domain_to_shiftboss_index_.emplace(block_domain_, shiftboss_index);
+  }
+
   serialization::BlockDomainMessage proto;
   proto.set_block_domain(block_domain_);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/982cc065/query_execution/BlockLocator.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
index a83a394..0c17703 100644
--- a/query_execution/BlockLocator.hpp
+++ b/query_execution/BlockLocator.hpp
@@ -21,6 +21,7 @@
 #define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_
 
 #include <atomic>
+#include <cstddef>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
@@ -48,6 +49,9 @@ namespace quickstep {
  **/
 class BlockLocator : public Thread {
  public:
+  typedef std::unordered_map<block_id_domain, std::size_t> BlockDomainToShiftbossIndex;
+  typedef std::unordered_map<block_id, std::unordered_set<block_id_domain>> BlockLocation;
+
   /**
    * @brief Constructor.
    *
@@ -91,11 +95,30 @@ class BlockLocator : public Thread {
     return locator_client_id_;
   }
 
+  /**
+   * @brief Get the mapping from a block domain to its Shiftboss index.
+   *
+   * @return The mapping from a block domain to its Shiftboss index.
+   **/
+  const BlockDomainToShiftbossIndex& block_domain_to_shiftboss_index() const {
+    return block_domain_to_shiftboss_index_;
+  }
+
+  /**
+   * @brief Get the mapping from a block id to its loaded block domain.
+   *
+   * @return The mapping from a block id to its loaded block domain.
+   **/
+  const BlockLocation& block_locations() const {
+    return block_locations_;
+  }
+
  protected:
   void run() override;
 
  private:
-  void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address);
+  void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address,
+                                             const std::size_t shiftboss_index);
   void processLocateBlockMessage(const tmb::client_id receiver, const block_id block);
   void processGetPeerDomainNetworkAddressesMessage(const tmb::client_id receiver, const block_id block);
 
@@ -110,8 +133,15 @@ class BlockLocator : public Thread {
   // "0.0.0.0:0".
   std::unordered_map<block_id_domain, const std::string> domain_network_addresses_;
 
+  // From a block domain to its Shiftboss index, used by ForemanDistributed
+  // to schedule based on the data-locality info.
+  // Note that not every 'block_id_domain' has a Shiftboss index. For example,
+  // DistributedCli has StorageManager with a 'block_id_domain', which is not
+  // a part of Shiftboss.
+  BlockDomainToShiftbossIndex block_domain_to_shiftboss_index_;
+
   // From a block to its domains.
-  std::unordered_map<block_id, std::unordered_set<block_id_domain>> block_locations_;
+  BlockLocation block_locations_;
 
   // From a block domain to all blocks loaded in its buffer pool.
   std::unordered_map<block_id_domain, std::unordered_set<block_id>> domain_blocks_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/982cc065/query_execution/BlockLocatorUtil.cpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocatorUtil.cpp b/query_execution/BlockLocatorUtil.cpp
index d2d1e96..4104155 100644
--- a/query_execution/BlockLocatorUtil.cpp
+++ b/query_execution/BlockLocatorUtil.cpp
@@ -45,6 +45,7 @@ namespace block_locator {
 namespace S = ::quickstep::serialization;
 
 block_id_domain getBlockDomain(const std::string &network_address,
+                               const std::size_t shiftboss_index,
                                const client_id cli_id,
                                client_id *locator_client_id,
                                MessageBus *bus) {
@@ -55,6 +56,7 @@ block_id_domain getBlockDomain(const std::string &network_address,
 
   S::BlockDomainRegistrationMessage proto;
   proto.set_domain_network_address(network_address);
+  proto.set_shiftboss_index(shiftboss_index);
 
   const int proto_length = proto.ByteSize();
   char *proto_bytes = static_cast<char*>(std::malloc(proto_length));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/982cc065/query_execution/BlockLocatorUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/BlockLocatorUtil.hpp b/query_execution/BlockLocatorUtil.hpp
index 74f65e4..80f04ed 100644
--- a/query_execution/BlockLocatorUtil.hpp
+++ b/query_execution/BlockLocatorUtil.hpp
@@ -40,6 +40,7 @@ namespace block_locator {
  * StorageManager with the given network address.
  *
  * @param network_address The network address of the StorageManager.
+ * @param shiftboss_index The index of Shiftboss that manages StorageManager.
  * @param cli_id The client ID of the block domain requester.
  * @param locator_client_id The client ID of BlockLocator to set.
  * @param bus A pointer to the TMB.
@@ -47,6 +48,7 @@ namespace block_locator {
  * @return The requested block domain.
  **/
 block_id_domain getBlockDomain(const std::string &network_address,
+                               const std::size_t shiftboss_index,
                                const tmb::client_id cli_id,
                                tmb::client_id *locator_client_id,
                                tmb::MessageBus *bus);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/982cc065/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 1f7add8..4181f4a 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -105,6 +105,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_catalog_CatalogTypedefs
                         quickstep_catalog_Catalog_proto
                         quickstep_queryexecution_AdmitRequestMessage
+                        quickstep_queryexecution_BlockLocator
                         quickstep_queryexecution_ForemanBase
                         quickstep_queryexecution_PolicyEnforcerBase
                         quickstep_queryexecution_PolicyEnforcerDistributed
@@ -114,6 +115,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_QueryExecutionUtil
                         quickstep_queryexecution_ShiftbossDirectory
                         quickstep_relationaloperators_WorkOrder_proto
+                        quickstep_storage_StorageBlockInfo
                         quickstep_threading_ThreadUtil
                         quickstep_utility_EqualsAnyConstant
                         quickstep_utility_Macros

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/982cc065/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 0dad8b0..db82908 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -28,6 +28,7 @@
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/BlockLocator.hpp"
 #include "query_execution/PolicyEnforcerBase.hpp"
 #include "query_execution/PolicyEnforcerDistributed.hpp"
 #include "query_execution/QueryContext.hpp"
@@ -36,6 +37,7 @@
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/ShiftbossDirectory.hpp"
 #include "relational_operators/WorkOrder.pb.h"
+#include "storage/StorageBlockInfo.hpp"
 #include "threading/ThreadUtil.hpp"
 #include "utility/EqualsAnyConstant.hpp"
 
@@ -64,10 +66,14 @@ namespace S = serialization;
 class QueryHandle;
 
 ForemanDistributed::ForemanDistributed(
+    const BlockLocator::BlockDomainToShiftbossIndex &block_domain_to_shiftboss_index,
+    const BlockLocator::BlockLocation &block_locations,
     MessageBus *bus,
     CatalogDatabaseLite *catalog_database,
     const int cpu_id)
     : ForemanBase(bus, cpu_id),
+      block_domain_to_shiftboss_index_(block_domain_to_shiftboss_index),
+      block_locations_(block_locations),
       catalog_database_(DCHECK_NOTNULL(catalog_database)) {
   const std::vector<QueryExecutionMessageType> sender_message_types{
       kShiftbossRegistrationResponseMessage,
@@ -295,6 +301,62 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
   return true;
 }
 
+bool ForemanDistributed::isNestedLoopsJoinWorkOrder(const serialization::WorkOrder &work_order_proto,
+                                                    std::size_t *shiftboss_index_for_join) {
+  if (work_order_proto.work_order_type() != S::NESTED_LOOP_JOIN) {
+    return false;
+  }
+
+  const block_id left_block = work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::left_block_id);
+  if (hasBlockLocalityInfoHelper(left_block, shiftboss_index_for_join)) {
+    return true;
+  }
+
+  const block_id right_block = work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::right_block_id);
+  return hasBlockLocalityInfoHelper(right_block, shiftboss_index_for_join);
+}
+
+bool ForemanDistributed::hasBlockLocalityInfo(const S::WorkOrder &work_order_proto,
+                                              size_t *shiftboss_index_for_block) {
+  block_id block = kInvalidBlockId;
+  switch (work_order_proto.work_order_type()) {
+    case S::SAVE_BLOCKS: {
+      block = work_order_proto.GetExtension(S::SaveBlocksWorkOrder::block_id);
+      break;
+    }
+    case S::SELECT: {
+      block = work_order_proto.GetExtension(S::SelectWorkOrder::block_id);
+      break;
+    }
+    case S::SORT_RUN_GENERATION: {
+      block = work_order_proto.GetExtension(S::SortRunGenerationWorkOrder::block_id);
+      break;
+    }
+    default:
+      return false;
+  }
+
+  DCHECK_NE(block, kInvalidBlockId);
+  return hasBlockLocalityInfoHelper(block, shiftboss_index_for_block);
+}
+
+bool ForemanDistributed::hasBlockLocalityInfoHelper(const block_id block,
+                                                    size_t *shiftboss_index_for_block) {
+  const auto cit = block_locations_.find(block);
+  if (cit != block_locations_.end()) {
+    for (const block_id_domain block_domain : cit->second) {
+      // TODO(quickstep-team): choose the best node, instead of the first.
+      const auto cit_shiftboss_index = block_domain_to_shiftboss_index_.find(block_domain);
+      if (cit_shiftboss_index != block_domain_to_shiftboss_index_.end()) {
+        *shiftboss_index_for_block = cit_shiftboss_index->second;
+        return true;
+      }
+    }
+  }
+
+  return false;
+}
+
 namespace {
 constexpr size_t kDefaultShiftbossIndex = 0u;
 }  // namespace
@@ -312,8 +374,9 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
       shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex;
     } else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
     } else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
+    } else if (hasBlockLocalityInfo(proto.work_order(), &shiftboss_index_for_particular_work_order_type)) {
+    } else if (isNestedLoopsJoinWorkOrder(proto.work_order(), &shiftboss_index_for_particular_work_order_type)) {
     } else {
-      // TODO(zuyu): Take data-locality into account for scheduling.
       shiftboss_index_for_particular_work_order_type = shiftboss_index;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/982cc065/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 34bac07..92d1e98 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -23,8 +23,10 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/BlockLocator.hpp"
 #include "query_execution/ForemanBase.hpp"
 #include "query_execution/ShiftbossDirectory.hpp"
+#include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
 
 #include "tmb/id_typedefs.h"
@@ -51,6 +53,9 @@ class ForemanDistributed final : public ForemanBase {
   /**
    * @brief Constructor.
    *
+   * @param block_domain_to_shiftboss_index A mapping from a block domain to
+   *        its Shiftboss index in BlockLocator.
+   * @param block_locations Block locality info in BlockLocator.
    * @param bus A pointer to the TMB.
    * @param catalog_database The catalog database where this query is executed.
    * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
@@ -58,9 +63,12 @@ class ForemanDistributed final : public ForemanBase {
    * @note If cpu_id is not specified, Foreman thread can be possibly moved
    *       around on different CPUs by the OS.
   **/
-  ForemanDistributed(tmb::MessageBus *bus,
-                     CatalogDatabaseLite *catalog_database,
-                     const int cpu_id = -1);
+  ForemanDistributed(
+      const BlockLocator::BlockDomainToShiftbossIndex &block_domain_to_shiftboss_index,
+      const BlockLocator::BlockLocation &block_locations,
+      tmb::MessageBus *bus,
+      CatalogDatabaseLite *catalog_database,
+      const int cpu_id = -1);
 
   ~ForemanDistributed() override {}
 
@@ -79,6 +87,15 @@ class ForemanDistributed final : public ForemanBase {
                                   const std::size_t next_shiftboss_index_to_schedule,
                                   std::size_t *shiftboss_index_for_hash_join);
 
+  bool isNestedLoopsJoinWorkOrder(const serialization::WorkOrder &work_order_proto,
+                                  std::size_t *shiftboss_index_for_join);
+
+  bool hasBlockLocalityInfo(const serialization::WorkOrder &work_order_proto,
+                            std::size_t *shiftboss_index_for_block);
+
+  bool hasBlockLocalityInfoHelper(const block_id block,
+                                  std::size_t *shiftboss_index_for_block);
+
   /**
    * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
    *        worker threads.
@@ -111,6 +128,10 @@ class ForemanDistributed final : public ForemanBase {
    **/
   bool canCollectNewMessages(const tmb::message_type_id message_type);
 
+  // Block locality info for scheduling.
+  const BlockLocator::BlockDomainToShiftbossIndex &block_domain_to_shiftboss_index_;
+  const BlockLocator::BlockLocation &block_locations_;
+
   ShiftbossDirectory shiftboss_directory_;
 
   CatalogDatabaseLite *catalog_database_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/982cc065/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index e6d741a..d6dbd8d 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -135,6 +135,7 @@ message QueryExecutionSuccessMessage {
 message BlockDomainRegistrationMessage {
   // Format IP:Port, i.e., "0.0.0.0:0".
   required string domain_network_address = 1;
+  required uint64 shiftboss_index = 2;
 }
 
 // Used for RegistrationResponse, Unregistration, and FailureReport.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/982cc065/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index fb9a9d6..cbfa4bd 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -39,6 +39,8 @@ namespace quickstep {
  *  @{
  */
 
+constexpr std::size_t kInvalidShiftbossIndex = static_cast<std::size_t>(-1);
+
 typedef tmb::Address Address;
 typedef tmb::AnnotatedMessage AnnotatedMessage;
 typedef tmb::MessageBus MessageBus;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/982cc065/query_execution/tests/BlockLocator_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp
index 32437c3..c85cdd6 100644
--- a/query_execution/tests/BlockLocator_unittest.cpp
+++ b/query_execution/tests/BlockLocator_unittest.cpp
@@ -85,7 +85,8 @@ class BlockLocatorTest : public ::testing::Test {
     bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage);
 
     block_domain_ =
-        block_locator::getBlockDomain(kDomainNetworkAddress, worker_client_id_, &locator_client_id_, &bus_);
+        block_locator::getBlockDomain(kDomainNetworkAddress, kInvalidShiftbossIndex, worker_client_id_,
+                                      &locator_client_id_, &bus_);
     DCHECK_EQ(locator_->getBusClientID(), locator_client_id_);
 
     storage_manager_.reset(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/982cc065/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 5100651..ca8ae66 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -86,7 +86,8 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
   test_database_loader_ = make_unique<TestDatabaseLoader>(
       storage_path,
       block_locator::getBlockDomain(
-          test_database_loader_data_exchanger_.network_address(), cli_id_, &locator_client_id_, &bus_),
+          test_database_loader_data_exchanger_.network_address(), kInvalidShiftbossIndex, cli_id_,
+          &locator_client_id_, &bus_),
       locator_client_id_,
       &bus_);
   DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_);
@@ -98,7 +99,9 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
 
   // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
   // could receive a registration message from the latter.
-  foreman_ = make_unique<ForemanDistributed>(&bus_, test_database_loader_->catalog_database());
+  foreman_ = make_unique<ForemanDistributed>(block_locator_->block_domain_to_shiftboss_index(),
+                                             block_locator_->block_locations(),
+                                             &bus_, test_database_loader_->catalog_database());
 
   // We don't use the NUMA aware version of worker code.
   const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,
@@ -114,7 +117,7 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
     auto storage_manager = make_unique<StorageManager>(
         storage_path,
         block_locator::getBlockDomain(
-            data_exchangers_[i].network_address(), cli_id_, &locator_client_id_, &bus_),
+            data_exchangers_[i].network_address(), i, cli_id_, &locator_client_id_, &bus_),
         locator_client_id_, &bus_);
     DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/982cc065/storage/tests/DataExchange_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/DataExchange_unittest.cpp b/storage/tests/DataExchange_unittest.cpp
index ac39728..03f68a0 100644
--- a/storage/tests/DataExchange_unittest.cpp
+++ b/storage/tests/DataExchange_unittest.cpp
@@ -88,7 +88,8 @@ class DataExchangeTest : public ::testing::Test {
     storage_manager_expected_.reset(new StorageManager(
         kStoragePath,
         block_locator::getBlockDomain(
-            data_exchanger_expected_.network_address(), worker_client_id_, &locator_client_id_, &bus_),
+            data_exchanger_expected_.network_address(), kInvalidShiftbossIndex, worker_client_id_,
+            &locator_client_id_, &bus_),
         locator_client_id_,
         &bus_));
     DCHECK_EQ(locator_->getBusClientID(), locator_client_id_);
@@ -99,7 +100,7 @@ class DataExchangeTest : public ::testing::Test {
     storage_manager_checked_.reset(new StorageManager(
         kStoragePath,
         block_locator::getBlockDomain(
-            kCheckedDomainNetworkAddress, worker_client_id_, &locator_client_id_, &bus_),
+            kCheckedDomainNetworkAddress, kInvalidShiftbossIndex, worker_client_id_, &locator_client_id_, &bus_),
         locator_client_id_,
         &bus_));
     DCHECK_EQ(locator_->getBusClientID(), locator_client_id_);