You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/02/23 08:43:56 UTC

[1/6] incubator-quickstep git commit: Adds marcs ssh key to KEYS [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/two-level-tmb 0bcf5ffeb -> 0d8b63350 (forced update)


Adds marcs ssh key to KEYS


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/f6480fb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/f6480fb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/f6480fb5

Branch: refs/heads/two-level-tmb
Commit: f6480fb59a2f3be6d37274f2a2376e733ba98d22
Parents: ab46d78
Author: cramja <ma...@gmail.com>
Authored: Tue Feb 14 17:02:20 2017 -0600
Committer: cramja <ma...@gmail.com>
Committed: Tue Feb 14 17:02:20 2017 -0600

----------------------------------------------------------------------
 KEYS | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 52 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f6480fb5/KEYS
----------------------------------------------------------------------
diff --git a/KEYS b/KEYS
index ce3964f..a10f790 100644
--- a/KEYS
+++ b/KEYS
@@ -49,3 +49,55 @@ iYXZdNoVw3RZC2XRQB+as9wYnz/Ziqbrrw58/E5FdmC8U8+Fa/0lTUh6VsPjpu5u
 E7agqOm2ReVbNLPyHa2oGftKu0Cwyghbys5xNxqbNPQnFR9N9Soi+0n4IGCZ/tj5
 =qv6a
 -----END PGP PUBLIC KEY BLOCK-----
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v2
+
+mQINBFijfvUBEACtbFV+BN1MRgP5s1/W1cKHbXsjBz8C+xlblRSjewrdlrHJEiu7
++qP3DkaJse8odnUbuveYuVyQQ+gQq8Qn+w8CzRBqnFn4V2xGKouQhlZ9Q+QhNzI/
+Y2+2XfDSY8GAxV3e5BnLCAnRhQJ4UX3aIU1ZmdVJ8Hag3Pfv5BKf/DpgEfPJ+G0z
+d4oy9gN96Zjlui/QwMaF4wCCE8P5tXm04QXyK+bs4pV9WsjMf5LW/gBZAgof2Kfp
+rEBM2aAjSmsfysNLXB/aakyjdEMtUV7wmtHe+mhbo8JmTNrhXsIhNbzXIeq2lRLL
+dNzUn5WQ03F69sx1tBXG5Ro8xA0TEjzigsy/4RlIgplws8rR3gP3H9RSlXNvifoC
+ZCHNxFdXHJ908q6SFfCCZR8eUd5mbG3mB04HEXccOl3E0456U0Aee2DK4uGp/RVN
+b4pvZgLRkGcymy4hg+QR56ixq+ovRi75m9zcGe3wvYZNoAL5IscqHvQpn2iCTBuJ
+ZbTOENKTfzncZZhApgzLZhdnAkEsyyhY8qOJnS+0TERSXXyoR1nHpVgnLwwCS7Oi
+rXhQR+YqA3gwcuyukwMSy25+VJLAtls0sH7wi6EWtWE/4mytScQV/AFTLqpA7wYY
+X5ceEdIqp0YwBi8HRrd1sDyRWYHTKLLUS9jgzZzgI54UexggXxn/h46DAwARAQAB
+tCpNYXJjIFNwZWhsbWFubiAoYXBhY2hlKSA8c3BlaGxAYXBhY2hlLm9yZz6JAjcE
+EwEKACEFAlijfvUCGwMFCwkIBwMFFQoJCAsFFgIDAQACHgECF4AACgkQ+r4yMlop
+iZqLDw//Q3MQX6xj1kSAxEHWbKziW49oMQylBwlLd9G0Zvo4VUd4Kgkl/sTF1nRn
+a3gp0SFI68z4I53DjBy5gzMQLhUArj6ygoTIFEqmaqnqXdwbYitUuexFmU7QuffU
+eJGrfbnCR+412SuSbs/yUIiqtzZQMMyX9eHRDKa03bRmXiKOvH4yWoQHmhNSI9jx
+cm0NJA7UirmtZ7Od3U+yEJCAje9RockzIyzY9Wgmmanxw14ig6RvIebBiEUeiyM1
+S5YsQ4r3SjZQyK2IlStDQbIrw/npg7SMpi/1QAgNBtyNuIJN4ehUukVwBzH75kK9
+d8R2tfn4ECNdjyR7zxGTu+C6Xy7i5nMyVDlU5KG+3tTwFh4RmxcuJrR6rNjnBtj+
+GUJHselHEcaDRW3x4w1kIIBqdiCo8JP8F1p786lZiCSYWeHHrXNQlwKnmtoyzn7x
+RuHjY6N6bY/HuJO0DVKW1L7ModtqpANe4e1sJbBmkJcrs9Cybiyfo/uGHG+tirKp
+9+dvoe6PFynA2TpwIluJHni0YNA9ipZHATZciFYo7+/XNXHei1YFYWObgdYTyGKk
+txTpcSsnJiDsNc1QEQ5DqxgeM4BV1/3wn64M83jI+6KkL+FWTKPemAg+OvL/Cmf7
+B/NBXi/QNvhwfSfD09naAnosjeG23Ggq1uOmOawoL7D/Y38+/LG5Ag0EWKN+9QEQ
+AJ01B8e5B6A+h+qS/ttGqZr1KKE0/yE6HNvxaxPo7GIJftqFdLnnZ786GUEecBV0
+XD+bejYv4bMEnlzv9UpQRJqY2npsnjf1UqlK2ODYyn0bD3LZ1nooRRGfUyiacPLy
+7KXwgweIuuwgS/tdTh2c+v9wYx4fmOlb4ZV7cmlyx4s3IJW3b0EhejsUBsIoFRRS
+59x+QlLQMlyPhLEvAUo+OsWaa4Wotjbxfv3VrK+0ZTEGHteL/CoQ+xZI1friLbKY
+OZZJiQTBQQaW6ojp6vtxYHNBrWIVuM41skKSwvHx7tKLJu03EY3/xdgCoKZ5+KTX
+NSPlfzYmWAAqVDGiH76WEHUQtp3E3Er/iloZM4dmBsi7So1yahk2dbCnfpjBHPMi
+a0t81RbSdAURzC+6tCNianUy/COed4+FBPDTHaekf1qtrBJNTV6iTsECf9uvp6qD
+QGDpah4/swKLFFqxClEAW+VaeWB+9nP6yZUGh7gp5SAMoAa5vSC8g8FzuTIsg/MI
+a0GhkHVopuILqzDSlhlJeh1cGvx1Hm2j0MtywMctJ0OXAO+oJcH0cx+JtJz3IFXS
+SwipTc9ySSk9bMOVhuQyouziU1UWW/sPSUvtKX4vV4pAaTL0ERzNH8gRsCf8/9Cu
+XGqJMlrVwZ+yFUaImiseAkzpBXlGYjgQBtd6e2/A8qsVABEBAAGJAh4EGAEKAAkF
+AlijfvUCGwwACgkQ+r4yMlopiZoW+g/1F8nTwnf+l3VcIUMQRwNe8X7/RZ5BrQnN
+oWZ9Nw4Fc40L90l/KZSp/k2pYo6KqRYlcHsYyMdjAuv+hSkPckwnS11wsrwG8CxQ
+1JuW5P9TQUpyN+367/AEOwc4qwJMo1RtLpjdqy3ssl1Rv25BJovzbd1ag7LF+RlM
+2o8o05t0gPkMa8bYu05XBTISKCT4qtAn8RrrUjvDNP7hsGfRUsN2FGIcvRu5Rt7t
+Uwt2JBjZ0+xmOt6f7ytzw6vNuItALDtzP1geXAHM+BddOXut9AnjVuEK+v92duPW
+LHepFB9zyNN5AoENGQvzl413lF9EuXOoiEXmtg+vRyKeEmJTtwzL3FTahvLg0pYm
+7h1InRxXtVilPPn3tcjNIvfpcHNUXwj3YstT380zBYAKxovG9rmHR8sKwgSNiK4N
+gbnVVBcIYQ/ogsdxqr6yjxqrWpeuph9BCeroxxDLMaFq260KyMIPsB34F5g3L1v5
+NM0GzLhrr5xa3B7hWvDmqxMIMsRtlf+37NqV0ylc6n4ZHJa1U60jSJx2BN0jnaMe
+LriD7AMhIAI4Ew70dYSHeVtzDO2MD3GTFqdHPuVCRi3322NSSvNjjZleCNE0q3Ca
+N5xnL7HQ8wj2da/uvr6A0oX46nYMBPwDdP5nkvHmNbMPLF5b2IOeoijL48x+EVQS
+U/KHHqP5JA==
+=mW4v
+-----END PGP PUBLIC KEY BLOCK-----


[3/6] incubator-quickstep git commit: Use BitVector as LIPFilter implementation when applicable

Posted by zu...@apache.org.
Use BitVector as LIPFilter implementation when applicable


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2b2d7ba1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2b2d7ba1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2b2d7ba1

Branch: refs/heads/two-level-tmb
Commit: 2b2d7ba1970ade47b1170cd7974cb2fc53f7ba71
Parents: 1572762
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Wed Feb 22 14:06:55 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Wed Feb 22 14:06:55 2017 -0600

----------------------------------------------------------------------
 query_optimizer/rules/AttachLIPFilters.cpp | 74 +++++++++++++++++++++++--
 query_optimizer/rules/AttachLIPFilters.hpp |  9 +++
 query_optimizer/rules/CMakeLists.txt       |  1 +
 3 files changed, 79 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b2d7ba1/query_optimizer/rules/AttachLIPFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachLIPFilters.cpp b/query_optimizer/rules/AttachLIPFilters.cpp
index 4b6ac59..9a13b48 100644
--- a/query_optimizer/rules/AttachLIPFilters.cpp
+++ b/query_optimizer/rules/AttachLIPFilters.cpp
@@ -20,6 +20,7 @@
 #include "query_optimizer/rules/AttachLIPFilters.hpp"
 
 #include <algorithm>
+#include <cstdint>
 #include <map>
 #include <set>
 #include <unordered_set>
@@ -37,6 +38,7 @@
 #include "query_optimizer/physical/PhysicalType.hpp"
 #include "query_optimizer/physical/Selection.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "types/TypeID.hpp"
 #include "types/TypedValue.hpp"
 #include "utility/lip_filter/LIPFilter.hpp"
 
@@ -126,11 +128,40 @@ void AttachLIPFilters::attachLIPFilters(
         const E::ExprId source_attr_id = pair.second->source_attribute->id();
         if (already_filtered_attributes->find(source_attr_id)
                 == already_filtered_attributes->end()) {
-          lip_filter_configuration_->addBuildInfo(
-              P::SingleIdentityHashFilterBuildInfo::Create(
-                  pair.second->source_attribute,
-                  std::max(64uL, pair.second->estimated_cardinality * 8u)),
-              pair.second->source);
+          bool use_exact_filter = false;
+          std::int64_t min_cpp_value;
+          std::int64_t max_cpp_value;
+          const bool has_exact_min_max_stats =
+              findExactMinMaxValuesForAttributeHelper(pair.second->source,
+                                                      pair.second->source_attribute,
+                                                      &min_cpp_value,
+                                                      &max_cpp_value);
+          if (has_exact_min_max_stats) {
+            const std::int64_t value_range = max_cpp_value - min_cpp_value;
+            DCHECK_GE(value_range, 0);
+            // TODO(jianqiao): Add this threshold as a gflag (together with
+            // InjectJoinFilters::kMaxFilterSize).
+            if (value_range <= 1000000000L) {
+              use_exact_filter = true;
+            }
+          }
+
+          if (use_exact_filter) {
+            lip_filter_configuration_->addBuildInfo(
+                P::BitVectorExactFilterBuildInfo::Create(
+                    pair.second->source_attribute,
+                    min_cpp_value,
+                    max_cpp_value,
+                    false),
+                pair.second->source);
+          } else {
+            lip_filter_configuration_->addBuildInfo(
+                P::SingleIdentityHashFilterBuildInfo::Create(
+                    pair.second->source_attribute,
+                    std::max(64uL, pair.second->estimated_cardinality * 8u)),
+                pair.second->source);
+          }
+
           lip_filter_configuration_->addProbeInfo(
               P::LIPFilterProbeInfo::Create(
                   pair.first,
@@ -258,5 +289,38 @@ const std::vector<AttachLIPFilters::LIPFilterInfoPtr>& AttachLIPFilters
   return probe_side_info_.at(node);
 }
 
+bool AttachLIPFilters::findExactMinMaxValuesForAttributeHelper(
+    const physical::PhysicalPtr &physical_plan,
+    const expressions::AttributeReferencePtr &attribute,
+    std::int64_t *min_cpp_value,
+    std::int64_t *max_cpp_value) const {
+  bool min_value_is_exact;
+  bool max_value_is_exact;
+
+  const TypedValue min_value =
+      cost_model_->findMinValueStat(physical_plan, attribute, &min_value_is_exact);
+  const TypedValue max_value =
+      cost_model_->findMaxValueStat(physical_plan, attribute, &max_value_is_exact);
+  if (min_value.isNull() || max_value.isNull() ||
+      (!min_value_is_exact) || (!max_value_is_exact)) {
+    return false;
+  }
+
+  switch (attribute->getValueType().getTypeID()) {
+    case TypeID::kInt: {
+      *min_cpp_value = min_value.getLiteral<int>();
+      *max_cpp_value = max_value.getLiteral<int>();
+      return true;
+    }
+    case TypeID::kLong: {
+      *min_cpp_value = min_value.getLiteral<std::int64_t>();
+      *max_cpp_value = max_value.getLiteral<std::int64_t>();
+      return true;
+    }
+    default:
+      return false;
+  }
+}
+
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b2d7ba1/query_optimizer/rules/AttachLIPFilters.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachLIPFilters.hpp b/query_optimizer/rules/AttachLIPFilters.hpp
index b8cfc39..36cb010 100644
--- a/query_optimizer/rules/AttachLIPFilters.hpp
+++ b/query_optimizer/rules/AttachLIPFilters.hpp
@@ -21,6 +21,7 @@
 #define QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_LIP_FILTERS_HPP_
 
 #include <cstddef>
+#include <cstdint>
 #include <map>
 #include <memory>
 #include <set>
@@ -135,6 +136,14 @@ class AttachLIPFilters : public Rule<physical::Physical> {
 
   const std::vector<LIPFilterInfoPtr>& getProbeSideInfo(const NodeList &path);
 
+  // TODO(jianqiao): refactor this method as it is a duplication of
+  // InjectJoinFilters::findExactMinMaxValuesForAttributeHelper().
+  bool findExactMinMaxValuesForAttributeHelper(
+      const physical::PhysicalPtr &physical_plan,
+      const expressions::AttributeReferencePtr &attribute,
+      std::int64_t *min_cpp_value,
+      std::int64_t *max_cpp_value) const;
+
   std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
   std::map<physical::PhysicalPtr, std::vector<LIPFilterInfoPtr>> build_side_info_;
   std::map<physical::PhysicalPtr, std::vector<LIPFilterInfoPtr>> probe_side_info_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b2d7ba1/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 427500d..6847951 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -59,6 +59,7 @@ target_link_libraries(quickstep_queryoptimizer_rules_AttachLIPFilters
                       quickstep_queryoptimizer_physical_Selection
                       quickstep_queryoptimizer_physical_TopLevelPlan
                       quickstep_queryoptimizer_rules_Rule
+                      quickstep_types_TypeID
                       quickstep_types_TypedValue
                       quickstep_utility_Macros
                       quickstep_utility_lipfilter_LIPFilter)


[6/6] incubator-quickstep git commit: Used two TMB implementations in Shiftboss.

Posted by zu...@apache.org.
Used two TMB implementations in Shiftboss.

  - Global TMB between Foreman and Shiftboss.
  - Local TMB between Workers and Shiftboss.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0d8b6335
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0d8b6335
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0d8b6335

Branch: refs/heads/two-level-tmb
Commit: 0d8b6335065c65eaa76613ea3b13a7c2108160db
Parents: b21c85b
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Feb 8 12:48:31 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Feb 23 00:43:05 2017 -0800

----------------------------------------------------------------------
 cli/distributed/Executor.cpp                    |   7 +-
 cli/distributed/Executor.hpp                    |   4 +
 query_execution/Shiftboss.cpp                   | 419 +++++++++++--------
 query_execution/Shiftboss.hpp                   |  92 +---
 .../DistributedExecutionGeneratorTestRunner.cpp |   8 +-
 .../DistributedExecutionGeneratorTestRunner.hpp |   1 +
 6 files changed, 279 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0d8b6335/cli/distributed/Executor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp
index 3485298..e248fef 100644
--- a/cli/distributed/Executor.cpp
+++ b/cli/distributed/Executor.cpp
@@ -35,6 +35,7 @@
 
 #include "tmb/id_typedefs.h"
 #include "tmb/native_net_client_message_bus.h"
+#include "tmb/pure_memory_message_bus.h"
 
 #include "glog/logging.h"
 
@@ -47,6 +48,8 @@ using tmb::client_id;
 namespace quickstep {
 
 void Executor::init() {
+  bus_local_.Initialize();
+
   executor_client_id_ = bus_.Connect();
   DLOG(INFO) << "Executor TMB Client ID: " << executor_client_id_;
 
@@ -59,7 +62,7 @@ void Executor::init() {
   for (std::size_t worker_thread_index = 0;
        worker_thread_index < FLAGS_num_workers;
        ++worker_thread_index) {
-    workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_));
+    workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_local_));
     worker_client_ids.push_back(workers_.back()->getBusClientID());
   }
 
@@ -76,7 +79,7 @@ void Executor::init() {
   data_exchanger_.start();
 
   shiftboss_ =
-      make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get(), storage_manager_->hdfs());
+      make_unique<Shiftboss>(&bus_, &bus_local_, storage_manager_.get(), worker_directory_.get(), storage_manager_->hdfs());
   shiftboss_->start();
 
   for (const auto &worker : workers_) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0d8b6335/cli/distributed/Executor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.hpp b/cli/distributed/Executor.hpp
index 6ffa756..aafeeae 100644
--- a/cli/distributed/Executor.hpp
+++ b/cli/distributed/Executor.hpp
@@ -24,6 +24,7 @@
 #include <vector>
 
 #include "cli/distributed/Role.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/Shiftboss.hpp"
 #include "query_execution/Worker.hpp"
 #include "query_execution/WorkerDirectory.hpp"
@@ -65,6 +66,9 @@ class Executor final : public Role {
   void run() override {}
 
  private:
+  // Used between Shiftboss and Workers.
+  MessageBusImpl bus_local_;
+
   tmb::client_id executor_client_id_;
 
   std::vector<std::unique_ptr<Worker>> workers_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0d8b6335/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index bae5205..2f7dc3c 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -64,6 +64,91 @@ namespace quickstep {
 
 class WorkOrder;
 
+Shiftboss::Shiftboss(tmb::MessageBus *bus_global,
+                     tmb::MessageBus *bus_local,
+                     StorageManager *storage_manager,
+                     WorkerDirectory *workers,
+                     void *hdfs,
+                     const int cpu_id)
+    : bus_global_(DCHECK_NOTNULL(bus_global)),
+      bus_local_(DCHECK_NOTNULL(bus_local)),
+      storage_manager_(DCHECK_NOTNULL(storage_manager)),
+      workers_(DCHECK_NOTNULL(workers)),
+      hdfs_(hdfs),
+      cpu_id_(cpu_id),
+      shiftboss_client_id_global_(tmb::kClientIdNone),
+      shiftboss_client_id_local_(tmb::kClientIdNone),
+      foreman_client_id_(tmb::kClientIdNone),
+      max_msgs_per_worker_(1),
+      start_worker_index_(0u) {
+  // Check to have at least one Worker.
+  DCHECK_GT(workers->getNumWorkers(), 0u);
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  if (FLAGS_use_hdfs) {
+    CHECK(hdfs_);
+  }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
+  shiftboss_client_id_global_ = bus_global_->Connect();
+  LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_global_;
+  DCHECK_NE(shiftboss_client_id_global_, tmb::kClientIdNone);
+
+  shiftboss_client_id_local_ = bus_local_->Connect();
+  DCHECK_NE(shiftboss_client_id_local_, tmb::kClientIdNone);
+
+  // Messages between Foreman and Shiftboss.
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kShiftbossRegistrationMessage);
+  bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kShiftbossRegistrationResponseMessage);
+
+  bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kQueryInitiateMessage);
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kQueryInitiateResponseMessage);
+
+  bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kInitiateRebuildMessage);
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kInitiateRebuildResponseMessage);
+
+  bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kSaveQueryResultMessage);
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kSaveQueryResultResponseMessage);
+
+  // Message sent to Worker.
+  bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kShiftbossRegistrationResponseMessage);
+  bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kRebuildWorkOrderMessage);
+
+  // Forward the following message types from Foreman to Workers.
+  bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kWorkOrderMessage);
+  bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kWorkOrderMessage);
+
+  // Forward the following message types from Workers to Foreman.
+  bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kCatalogRelationNewBlockMessage);
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kCatalogRelationNewBlockMessage);
+
+  bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kDataPipelineMessage);
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kDataPipelineMessage);
+
+  bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kWorkOrderFeedbackMessage);
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kWorkOrderFeedbackMessage);
+
+  bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kWorkOrderCompleteMessage);
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kWorkOrderCompleteMessage);
+
+  bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kRebuildWorkOrderCompleteMessage);
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kRebuildWorkOrderCompleteMessage);
+
+  // Clean up query execution states, i.e., QueryContext.
+  bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kQueryTeardownMessage);
+
+  // Stop itself.
+  bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kPoisonMessage);
+  // Stop all workers.
+  bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kPoisonMessage);
+
+  for (std::size_t i = 0; i < workers_->getNumWorkers(); ++i) {
+    worker_addresses_.AddRecipient(workers_->getClientID(i));
+  }
+
+  registerWithForeman();
+}
+
 void Shiftboss::run() {
   if (cpu_id_ >= 0) {
     // We can pin the shiftboss thread to a CPU if specified.
@@ -73,159 +158,161 @@ void Shiftboss::run() {
   processShiftbossRegistrationResponseMessage();
 
   for (;;) {
-    // Receive() is a blocking call, causing this thread to sleep until next
-    // message is received.
-    AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
-    DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-               << "') received the typed '" << annotated_message.tagged_message.message_type()
-               << "' message from client " << annotated_message.sender;
-    switch (annotated_message.tagged_message.message_type()) {
-      case kQueryInitiateMessage: {
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::QueryInitiateMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context());
-        break;
-      }
-      case kWorkOrderMessage: {
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::WorkOrderMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        const std::size_t query_id = proto.query_id();
-        DCHECK_EQ(1u, query_contexts_.count(query_id));
-
-        WorkOrder *work_order = WorkOrderFactory::ReconstructFromProto(proto.work_order(),
-                                                                       shiftboss_index_,
-                                                                       &database_cache_,
-                                                                       query_contexts_[query_id].get(),
-                                                                       storage_manager_,
-                                                                       shiftboss_client_id_,
-                                                                       bus_,
-                                                                       hdfs_);
-
-        unique_ptr<WorkerMessage> worker_message(
-            WorkerMessage::WorkOrderMessage(work_order, proto.operator_index()));
-
-        TaggedMessage worker_tagged_message(worker_message.get(),
-                                            sizeof(*worker_message),
-                                            kWorkOrderMessage);
-
-        const size_t worker_index = getSchedulableWorker();
-        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-                   << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
-                   << "') from Foreman to worker " << worker_index;
-
-        const MessageBus::SendStatus send_status =
-            QueryExecutionUtil::SendTMBMessage(bus_,
-                                               shiftboss_client_id_,
-                                               workers_->getClientID(worker_index),
-                                               move(worker_tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK);
-        break;
-      }
-      case kInitiateRebuildMessage: {
-        // Construct rebuild work orders, and send back their number to
-        // 'ForemanDistributed'.
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::InitiateRebuildMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processInitiateRebuildMessage(proto.query_id(),
-                                      proto.operator_index(),
-                                      proto.insert_destination_index(),
-                                      proto.relation_id());
-        break;
-      }
-      case kCatalogRelationNewBlockMessage:  // Fall through.
-      case kDataPipelineMessage:
-      case kWorkOrderFeedbackMessage:
-      case kWorkOrderCompleteMessage:
-      case kRebuildWorkOrderCompleteMessage: {
-        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-                   << "') forwarded typed '" << annotated_message.tagged_message.message_type()
-                   << "' message from Worker with TMB client ID '" << annotated_message.sender
-                   << "' to Foreman with TMB client ID " << foreman_client_id_;
-
-        DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
-        const MessageBus::SendStatus send_status =
-            QueryExecutionUtil::SendTMBMessage(bus_,
-                                               shiftboss_client_id_,
-                                               foreman_client_id_,
-                                               move(annotated_message.tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK);
-        break;
-      }
-      case kQueryTeardownMessage: {
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    AnnotatedMessage annotated_message;
+    if (bus_global_->ReceiveIfAvailable(shiftboss_client_id_global_, &annotated_message, 0, true)) {
+      DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
+                 << "') received the typed '" << annotated_message.tagged_message.message_type()
+                 << "' message from Foreman " << annotated_message.sender;
+      switch (annotated_message.tagged_message.message_type()) {
+        case kQueryInitiateMessage: {
+          const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+          serialization::QueryInitiateMessage proto;
+          CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+          processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context());
+          break;
+        }
+        case kWorkOrderMessage: {
+          const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+          serialization::WorkOrderMessage proto;
+          CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+          const std::size_t query_id = proto.query_id();
+          DCHECK_EQ(1u, query_contexts_.count(query_id));
+
+          unique_ptr<WorkOrder> work_order(
+              WorkOrderFactory::ReconstructFromProto(proto.work_order(), shiftboss_index_, &database_cache_,
+                                                     query_contexts_[query_id].get(), storage_manager_,
+                                                     shiftboss_client_id_local_, bus_local_, hdfs_));
+
+          unique_ptr<WorkerMessage> worker_message(
+              WorkerMessage::WorkOrderMessage(work_order.release(), proto.operator_index()));
+
+          TaggedMessage worker_tagged_message(worker_message.get(),
+                                              sizeof(*worker_message),
+                                              kWorkOrderMessage);
+
+          const size_t worker_index = getSchedulableWorker();
+          DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_local_
+                     << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
+                     << "') from Foreman to worker " << worker_index;
+
+          const MessageBus::SendStatus send_status =
+              QueryExecutionUtil::SendTMBMessage(bus_local_,
+                                                 shiftboss_client_id_local_,
+                                                 workers_->getClientID(worker_index),
+                                                 move(worker_tagged_message));
+          CHECK(send_status == MessageBus::SendStatus::kOK);
+          break;
+        }
+        case kInitiateRebuildMessage: {
+          // Construct rebuild work orders, and send back their number to
+          // 'ForemanDistributed'.
+          const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+          serialization::InitiateRebuildMessage proto;
+          CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+          processInitiateRebuildMessage(proto.query_id(),
+                                        proto.operator_index(),
+                                        proto.insert_destination_index(),
+                                        proto.relation_id());
+          break;
+        }
+        case kQueryTeardownMessage: {
+          const TaggedMessage &tagged_message = annotated_message.tagged_message;
 
-        serialization::QueryTeardownMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+          serialization::QueryTeardownMessage proto;
+          CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
 
-        query_contexts_.erase(proto.query_id());
-        break;
+          query_contexts_.erase(proto.query_id());
+          break;
+        }
+        case kSaveQueryResultMessage: {
+          const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+          serialization::SaveQueryResultMessage proto;
+          CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+          for (int i = 0; i < proto.blocks_size(); ++i) {
+            storage_manager_->saveBlockOrBlob(proto.blocks(i));
+          }
+
+          // Clean up query execution states, i.e., QueryContext.
+          query_contexts_.erase(proto.query_id());
+
+          serialization::SaveQueryResultResponseMessage proto_response;
+          proto_response.set_query_id(proto.query_id());
+          proto_response.set_relation_id(proto.relation_id());
+          proto_response.set_cli_id(proto.cli_id());
+          proto_response.set_shiftboss_index(shiftboss_index_);
+
+          const size_t proto_response_length = proto_response.ByteSize();
+          char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
+          CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
+
+          TaggedMessage message_response(static_cast<const void*>(proto_response_bytes),
+                                         proto_response_length,
+                                         kSaveQueryResultResponseMessage);
+          free(proto_response_bytes);
+
+          DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
+                     << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
+                     << "') to Foreman with TMB client ID " << foreman_client_id_;
+          const MessageBus::SendStatus send_status =
+              QueryExecutionUtil::SendTMBMessage(bus_global_,
+                                                 shiftboss_client_id_global_,
+                                                 foreman_client_id_,
+                                                 move(message_response));
+          CHECK(send_status == MessageBus::SendStatus::kOK);
+          break;
+        }
+        case kPoisonMessage: {
+          DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
+                     << "') forwarded PoisonMessage (typed '" << kPoisonMessage
+                     << "') from Foreman to all workers";
+
+          tmb::MessageStyle broadcast_style;
+          broadcast_style.Broadcast(true);
+
+          const MessageBus::SendStatus send_status =
+              bus_local_->Send(shiftboss_client_id_local_, worker_addresses_, broadcast_style,
+                               move(annotated_message.tagged_message));
+          CHECK(send_status == MessageBus::SendStatus::kOK);
+          return;
+        }
+        default: {
+          LOG(FATAL) << "Unknown TMB message type";
+        }
       }
-      case kSaveQueryResultMessage: {
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::SaveQueryResultMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+    }
 
-        for (int i = 0; i < proto.blocks_size(); ++i) {
-          storage_manager_->saveBlockOrBlob(proto.blocks(i));
+    while (bus_local_->ReceiveIfAvailable(shiftboss_client_id_local_, &annotated_message, 0, true)) {
+      switch (annotated_message.tagged_message.message_type()) {
+        case kCatalogRelationNewBlockMessage:
+        case kDataPipelineMessage:
+        case kWorkOrderFeedbackMessage:
+        case kWorkOrderCompleteMessage:
+        case kRebuildWorkOrderCompleteMessage: {
+          DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
+                     << "') forwarded typed '" << annotated_message.tagged_message.message_type()
+                     << "' message from Worker with TMB client ID '" << annotated_message.sender
+                     << "' to Foreman with TMB client ID " << foreman_client_id_;
+
+          DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
+          const MessageBus::SendStatus send_status =
+              QueryExecutionUtil::SendTMBMessage(bus_global_,
+                                                 shiftboss_client_id_global_,
+                                                 foreman_client_id_,
+                                                 move(annotated_message.tagged_message));
+          CHECK(send_status == MessageBus::SendStatus::kOK);
+          break;
+        }
+        default: {
+          LOG(FATAL) << "Unknown TMB message type";
         }
-
-        // Clean up query execution states, i.e., QueryContext.
-        query_contexts_.erase(proto.query_id());
-
-        serialization::SaveQueryResultResponseMessage proto_response;
-        proto_response.set_query_id(proto.query_id());
-        proto_response.set_relation_id(proto.relation_id());
-        proto_response.set_cli_id(proto.cli_id());
-        proto_response.set_shiftboss_index(shiftboss_index_);
-
-        const size_t proto_response_length = proto_response.ByteSize();
-        char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
-        CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
-
-        TaggedMessage message_response(static_cast<const void*>(proto_response_bytes),
-                                       proto_response_length,
-                                       kSaveQueryResultResponseMessage);
-        free(proto_response_bytes);
-
-        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-                   << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
-                   << "') to Foreman with TMB client ID " << foreman_client_id_;
-        const MessageBus::SendStatus send_status =
-            QueryExecutionUtil::SendTMBMessage(bus_,
-                                               shiftboss_client_id_,
-                                               foreman_client_id_,
-                                               move(message_response));
-        CHECK(send_status == MessageBus::SendStatus::kOK);
-        break;
-      }
-      case kPoisonMessage: {
-        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-                   << "') forwarded PoisonMessage (typed '" << kPoisonMessage
-                   << "') from Foreman to all workers";
-
-        tmb::MessageStyle broadcast_style;
-        broadcast_style.Broadcast(true);
-
-        const MessageBus::SendStatus send_status =
-            bus_->Send(shiftboss_client_id_,
-                       worker_addresses_,
-                       broadcast_style,
-                       move(annotated_message.tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK);
-        return;
-      }
-      default: {
-        LOG(FATAL) << "Unknown TMB message type";
       }
     }
   }
@@ -265,21 +352,21 @@ void Shiftboss::registerWithForeman() {
                         kShiftbossRegistrationMessage);
   free(proto_bytes);
 
-  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
+  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
              << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
              << "') to all";
   tmb::MessageBus::SendStatus send_status =
-      bus_->Send(shiftboss_client_id_, all_addresses, style, move(message));
+      bus_global_->Send(shiftboss_client_id_global_, all_addresses, style, move(message));
   DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 void Shiftboss::processShiftbossRegistrationResponseMessage() {
-  AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
+  AnnotatedMessage annotated_message(bus_global_->Receive(shiftboss_client_id_global_, 0, true));
   const TaggedMessage &tagged_message = annotated_message.tagged_message;
   DCHECK_EQ(kShiftbossRegistrationResponseMessage, tagged_message.message_type());
 
   foreman_client_id_ = annotated_message.sender;
-  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_local_
              << "') received the typed '" << kShiftbossRegistrationResponseMessage
              << "' message from ForemanDistributed with client " << foreman_client_id_;
 
@@ -290,10 +377,10 @@ void Shiftboss::processShiftbossRegistrationResponseMessage() {
   storage_manager_->sendBlockDomainToShiftbossIndexMessage(shiftboss_index_);
 
   // Forward this message to Workers regarding <shiftboss_index_>.
-  QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_,
+  QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_local_,
                                        worker_addresses_,
                                        move(annotated_message.tagged_message),
-                                       bus_);
+                                       bus_local_);
 }
 
 void Shiftboss::processQueryInitiateMessage(
@@ -303,7 +390,7 @@ void Shiftboss::processQueryInitiateMessage(
   database_cache_.update(catalog_database_cache_proto);
 
   auto query_context = std::make_unique<QueryContext>(
-      query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_, bus_);
+      query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_local_, bus_local_);
   query_contexts_.emplace(query_id, move(query_context));
 
   serialization::QueryInitiateResponseMessage proto;
@@ -318,12 +405,12 @@ void Shiftboss::processQueryInitiateMessage(
                                  kQueryInitiateResponseMessage);
   free(proto_bytes);
 
-  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
+  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
              << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
              << "') to Foreman with TMB client ID " << foreman_client_id_;
   const MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         shiftboss_client_id_,
+      QueryExecutionUtil::SendTMBMessage(bus_global_,
+                                         shiftboss_client_id_global_,
                                          foreman_client_id_,
                                          move(message_response));
   CHECK(send_status == MessageBus::SendStatus::kOK);
@@ -357,12 +444,12 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                                  kInitiateRebuildResponseMessage);
   free(proto_bytes);
 
-  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
+  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
              << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
              << "') to Foreman with TMB client ID " << foreman_client_id_;
   const MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         shiftboss_client_id_,
+      QueryExecutionUtil::SendTMBMessage(bus_global_,
+                                         shiftboss_client_id_global_,
                                          foreman_client_id_,
                                          move(message_response));
   CHECK(send_status == MessageBus::SendStatus::kOK);
@@ -375,8 +462,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                              move(partially_filled_block_refs[i]),
                              op_index,
                              rel_id,
-                             shiftboss_client_id_,
-                             bus_);
+                             shiftboss_client_id_local_,
+                             bus_local_);
 
     unique_ptr<WorkerMessage> worker_message(
         WorkerMessage::RebuildWorkOrderMessage(rebuild_work_order, op_index));
@@ -386,13 +473,13 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                                         kRebuildWorkOrderMessage);
 
     const size_t worker_index = getSchedulableWorker();
-    DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
+    DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_local_
                << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
                << "') to worker " << worker_index;
 
     const MessageBus::SendStatus send_status =
-        QueryExecutionUtil::SendTMBMessage(bus_,
-                                           shiftboss_client_id_,
+        QueryExecutionUtil::SendTMBMessage(bus_local_,
+                                           shiftboss_client_id_local_,
                                            workers_->getClientID(worker_index),
                                            move(worker_tagged_message));
     CHECK(send_status == MessageBus::SendStatus::kOK);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0d8b6335/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index e0b4312..05457bd 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -39,7 +39,8 @@
 
 #include "tmb/address.h"
 #include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
+
+namespace tmb { class MessageBus; };
 
 namespace quickstep {
 
@@ -63,7 +64,8 @@ class Shiftboss : public Thread {
   /**
    * @brief Constructor.
    *
-   * @param bus A pointer to the TMB.
+   * @param bus_global A pointer to the TMB for Foreman.
+   * @param bus_local A pointer to the TMB for Workers.
    * @param storage_manager The StorageManager to use.
    * @param workers A pointer to the WorkerDirectory.
    * @param hdfs The HDFS connector via libhdfs3.
@@ -72,84 +74,12 @@ class Shiftboss : public Thread {
    * @note If cpu_id is not specified, Shiftboss thread can be possibly moved
    *       around on different CPUs by the OS.
   **/
-  Shiftboss(tmb::MessageBus *bus,
+  Shiftboss(tmb::MessageBus *bus_global,
+            tmb::MessageBus *bus_local,
             StorageManager *storage_manager,
             WorkerDirectory *workers,
             void *hdfs,
-            const int cpu_id = -1)
-      : bus_(DCHECK_NOTNULL(bus)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)),
-        workers_(DCHECK_NOTNULL(workers)),
-        hdfs_(hdfs),
-        cpu_id_(cpu_id),
-        shiftboss_client_id_(tmb::kClientIdNone),
-        foreman_client_id_(tmb::kClientIdNone),
-        max_msgs_per_worker_(1),
-        start_worker_index_(0u) {
-    // Check to have at least one Worker.
-    DCHECK_GT(workers->getNumWorkers(), 0u);
-
-#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
-    if (FLAGS_use_hdfs) {
-      CHECK(hdfs_);
-    }
-#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
-
-    shiftboss_client_id_ = bus_->Connect();
-    LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_;
-    DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone);
-
-    // Messages between Foreman and Shiftboss.
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationMessage);
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryInitiateMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryInitiateResponseMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
-
-    // Message sent to Worker.
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);
-
-    // Forward the following message types from Foreman to Workers.
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage);
-
-    // Forward the following message types from Workers to Foreman.
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kDataPipelineMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderFeedbackMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
-
-    // Clean up query execution states, i.e., QueryContext.
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage);
-
-    // Stop itself.
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);
-    // Stop all workers.
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kPoisonMessage);
-
-    for (std::size_t i = 0; i < workers_->getNumWorkers(); ++i) {
-      worker_addresses_.AddRecipient(workers_->getClientID(i));
-    }
-
-    registerWithForeman();
-  }
+            const int cpu_id = -1);
 
   ~Shiftboss() override {
   }
@@ -160,7 +90,7 @@ class Shiftboss : public Thread {
    * @return TMB client ID of shiftboss thread.
    **/
   inline tmb::client_id getBusClientID() const {
-    return shiftboss_client_id_;
+    return shiftboss_client_id_global_;
   }
 
   /**
@@ -231,9 +161,7 @@ class Shiftboss : public Thread {
                                      const QueryContext::insert_destination_id dest_index,
                                      const relation_id rel_id);
 
-  // TODO(zuyu): Use two buses for the message communication between Foreman and Shiftboss,
-  // and Shiftboss and Worker thread pool.
-  tmb::MessageBus *bus_;
+  tmb::MessageBus *bus_global_, *bus_local_;
 
   CatalogDatabaseCache database_cache_;
   StorageManager *storage_manager_;
@@ -245,7 +173,7 @@ class Shiftboss : public Thread {
   // The ID of the CPU that the Shiftboss thread can optionally be pinned to.
   const int cpu_id_;
 
-  tmb::client_id shiftboss_client_id_, foreman_client_id_;
+  tmb::client_id shiftboss_client_id_global_, shiftboss_client_id_local_, foreman_client_id_;
 
   // Unique per Shiftboss instance.
   std::uint64_t shiftboss_index_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0d8b6335/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index c9f5a10..6bd7a1f 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -76,6 +76,7 @@ const char *DistributedExecutionGeneratorTestRunner::kResetOption =
 
 DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner(const string &storage_path)
     : query_id_(0),
+      bus_locals_(kNumInstances),
       data_exchangers_(kNumInstances) {
   bus_.Initialize();
 
@@ -113,7 +114,10 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
                                         kAnyNUMANodeID);
 
   for (int i = 0; i < kNumInstances; ++i) {
-    workers_.push_back(make_unique<Worker>(0 /* worker_thread_index */, &bus_));
+    tmb::MessageBus *bus_local = &bus_locals_[i];
+    bus_local->Initialize();
+
+    workers_.push_back(make_unique<Worker>(0 /* worker_thread_index */, bus_local));
 
     const vector<tmb::client_id> worker_client_ids(1, workers_.back()->getBusClientID());
     worker_directories_.push_back(
@@ -128,7 +132,7 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
 
     data_exchangers_[i].set_storage_manager(storage_manager.get());
     shiftbosses_.push_back(
-        make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get(),
+        make_unique<Shiftboss>(&bus_, bus_local, storage_manager.get(), worker_directories_.back().get(),
                                storage_manager->hdfs()));
 
     storage_managers_.push_back(move(storage_manager));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0d8b6335/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
index 63e320d..2cd2427 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -129,6 +129,7 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner {
 
   std::unique_ptr<ForemanDistributed> foreman_;
 
+  std::vector<MessageBusImpl> bus_locals_;
   std::vector<std::unique_ptr<Worker>> workers_;
   std::vector<std::unique_ptr<WorkerDirectory>> worker_directories_;
   std::vector<DataExchangerAsync> data_exchangers_;



[2/6] incubator-quickstep git commit: patches for missed linenoise changes

Posted by zu...@apache.org.
patches for missed linenoise changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1572762a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1572762a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1572762a

Branch: refs/heads/two-level-tmb
Commit: 1572762a666c1b61b1172beba6d67d3fef5a3a6b
Parents: f6480fb
Author: cramja <ma...@gmail.com>
Authored: Tue Feb 21 10:22:14 2017 -0600
Committer: cramja <ma...@gmail.com>
Committed: Tue Feb 21 10:22:14 2017 -0600

----------------------------------------------------------------------
 third_party/download_and_patch_prerequisites.sh |  4 +
 third_party/patches/linenoise/linenoise.c.patch | 89 ++++++++++++++++++++
 third_party/patches/linenoise/linenoise.h.patch | 29 +++++++
 3 files changed, 122 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1572762a/third_party/download_and_patch_prerequisites.sh
----------------------------------------------------------------------
diff --git a/third_party/download_and_patch_prerequisites.sh b/third_party/download_and_patch_prerequisites.sh
index b5f5cac..fd6106c 100755
--- a/third_party/download_and_patch_prerequisites.sh
+++ b/third_party/download_and_patch_prerequisites.sh
@@ -89,7 +89,11 @@ do
 done
 
 # Apply patches now.
+
+# Apply linenoise patch
 cp ${PATCH_DIR}/linenoise/CMakeLists.txt ${THIRD_PARTY_SRC_DIR}/linenoise
+patch ${THIRD_PARTY_SRC_DIR}/linenoise/linenoise.h ${PATCH_DIR}/linenoise/linenoise.h.patch
+patch ${THIRD_PARTY_SRC_DIR}/linenoise/linenoise.c ${PATCH_DIR}/linenoise/linenoise.c.patch
 
 # Apply gflags patch.
 echo "Patching for gflags:"

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1572762a/third_party/patches/linenoise/linenoise.c.patch
----------------------------------------------------------------------
diff --git a/third_party/patches/linenoise/linenoise.c.patch b/third_party/patches/linenoise/linenoise.c.patch
new file mode 100644
index 0000000..cea6162
--- /dev/null
+++ b/third_party/patches/linenoise/linenoise.c.patch
@@ -0,0 +1,89 @@
+--- linenoise.c.new	2015-04-13 02:38:43.000000000 -0500
++++ linenoise.c	2017-02-21 09:47:42.000000000 -0600
+@@ -1,7 +1,5 @@
+-/* linenoise.c -- VERSION 1.0
+- *
+- * Guerrilla line editing library against the idea that a line editing lib
+- * needs to be 20,000 lines of C code.
++/* linenoise.c -- guerrilla line editing library against the idea that a
++ * line editing lib needs to be 20,000 lines of C code.
+  *
+  * You can find the latest source code at:
+  *
+@@ -120,6 +118,7 @@
+ 
+ #define LINENOISE_DEFAULT_HISTORY_MAX_LEN 100
+ #define LINENOISE_MAX_LINE 4096
++#define LINENOISE_TRIM_NEWLINE 0
+ static char *unsupported_term[] = {"dumb","cons25","emacs",NULL};
+ static linenoiseCompletionCallback *completionCallback = NULL;
+ 
+@@ -774,6 +773,10 @@
+             history_len--;
+             free(history[history_len]);
+             if (mlmode) linenoiseEditMoveEnd(&l);
++#if !LINENOISE_TRIM_NEWLINE
++            l.buf[l.len++] = '\n';
++            l.buf[l.len] = '\0';
++#endif
+             return (int)l.len;
+         case CTRL_C:     /* ctrl-c */
+             errno = EAGAIN;
+@@ -940,10 +943,12 @@
+         /* Not a tty: read from file / pipe. */
+         if (fgets(buf, buflen, stdin) == NULL) return -1;
+         count = strlen(buf);
++#if LINENOISE_TRIM_NEWLINE
+         if (count && buf[count-1] == '\n') {
+             count--;
+             buf[count] = '\0';
+         }
++#endif
+     } else {
+         /* Interactive editing. */
+         if (enableRawMode(STDIN_FILENO) == -1) return -1;
+@@ -970,10 +975,12 @@
+         fflush(stdout);
+         if (fgets(buf,LINENOISE_MAX_LINE,stdin) == NULL) return NULL;
+         len = strlen(buf);
++#if LINENOISE_TRIM_NEWLINE
+         while(len && (buf[len-1] == '\n' || buf[len-1] == '\r')) {
+             len--;
+             buf[len] = '\0';
+         }
++#endif
+         return strdup(buf);
+     } else {
+         count = linenoiseRaw(buf,LINENOISE_MAX_LINE,prompt);
+@@ -1021,12 +1028,29 @@
+         memset(history,0,(sizeof(char*)*history_max_len));
+     }
+ 
++#if LINENOISE_TRIM_NEWLINE
+     /* Don't add duplicated lines. */
+     if (history_len && !strcmp(history[history_len-1], line)) return 0;
+ 
+-    /* Add an heap allocated copy of the line in the history.
+-     * If we reached the max length, remove the older line. */
+     linecopy = strdup(line);
++#else
++    /* Remove trailing newlines so that editing from history doesn't get all wonky. */
++    size_t line_len = strlen(line);
++    while ((line_len > 0) && (line[line_len - 1] == '\n')) {
++      --line_len;
++    }
++    linecopy = (char*) malloc(line_len + 1);
++    memcpy(linecopy, line, line_len);
++    linecopy[line_len] = '\0';
++
++    /* Don't add duplicated lines. */
++    if (history_len && !strcmp(history[history_len-1], linecopy)) {
++        free(linecopy);
++        return 0;
++    }
++#endif
++
++    /* If we reached the max length, remove the older line. */
+     if (!linecopy) return 0;
+     if (history_len == history_max_len) {
+         free(history[0]);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1572762a/third_party/patches/linenoise/linenoise.h.patch
----------------------------------------------------------------------
diff --git a/third_party/patches/linenoise/linenoise.h.patch b/third_party/patches/linenoise/linenoise.h.patch
new file mode 100644
index 0000000..feb597a
--- /dev/null
+++ b/third_party/patches/linenoise/linenoise.h.patch
@@ -0,0 +1,29 @@
+--- linenoise.h.new	2015-04-13 02:38:43.000000000 -0500
++++ linenoise.h	2017-02-21 09:44:05.000000000 -0600
+@@ -1,7 +1,5 @@
+-/* linenoise.h -- VERSION 1.0
+- *
+- * Guerrilla line editing library against the idea that a line editing lib
+- * needs to be 20,000 lines of C code.
++/* linenoise.h -- guerrilla line editing library against the idea that a
++ * line editing lib needs to be 20,000 lines of C code.
+  *
+  * See linenoise.c for more information.
+  *
+@@ -36,9 +34,16 @@
+  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+  */
+ 
++/* Modified from original by Craig Chasseur as follows:
++ *     - include stddef.h in header for size_t
++ *     - do not trim newlines from end of input
++ */
++
+ #ifndef __LINENOISE_H
+ #define __LINENOISE_H
+ 
++#include <stddef.h>
++
+ #ifdef __cplusplus
+ extern "C" {
+ #endif


[5/6] incubator-quickstep git commit: Added HDFS Support For TextScanWorkOrder.

Posted by zu...@apache.org.
Added HDFS Support For TextScanWorkOrder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b21c85b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b21c85b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b21c85b6

Branch: refs/heads/two-level-tmb
Commit: b21c85b6fc2d1a32337b774244eec577a2568727
Parents: f5c063a
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Feb 6 14:42:42 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Feb 23 00:41:23 2017 -0800

----------------------------------------------------------------------
 cli/distributed/Executor.cpp                    |   2 +-
 query_execution/CMakeLists.txt                  |   1 +
 query_execution/Shiftboss.cpp                   |   3 +-
 query_execution/Shiftboss.hpp                   |  14 +++
 .../DistributedExecutionGeneratorTestRunner.cpp |   3 +-
 relational_operators/CMakeLists.txt             |   5 +
 relational_operators/TextScanOperator.cpp       | 107 ++++++++++++++++---
 relational_operators/TextScanOperator.hpp       |  10 +-
 relational_operators/WorkOrderFactory.cpp       |   6 +-
 relational_operators/WorkOrderFactory.hpp       |   4 +-
 storage/FileManagerHdfs.hpp                     |   9 ++
 storage/StorageManager.cpp                      |   9 ++
 storage/StorageManager.hpp                      |   8 +-
 13 files changed, 160 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b21c85b6/cli/distributed/Executor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp
index 1d03579..3485298 100644
--- a/cli/distributed/Executor.cpp
+++ b/cli/distributed/Executor.cpp
@@ -76,7 +76,7 @@ void Executor::init() {
   data_exchanger_.start();
 
   shiftboss_ =
-      make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get());
+      make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get(), storage_manager_->hdfs());
   shiftboss_->start();
 
   for (const auto &worker : workers_) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b21c85b6/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 50bf694..12d6be0 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -295,6 +295,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_WorkerMessage
                         quickstep_relationaloperators_RebuildWorkOrder
                         quickstep_relationaloperators_WorkOrderFactory
+                        quickstep_storage_Flags
                         quickstep_storage_InsertDestination
                         quickstep_storage_StorageBlock
                         quickstep_storage_StorageManager

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b21c85b6/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 2ed42d0..bae5205 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -104,7 +104,8 @@ void Shiftboss::run() {
                                                                        query_contexts_[query_id].get(),
                                                                        storage_manager_,
                                                                        shiftboss_client_id_,
-                                                                       bus_);
+                                                                       bus_,
+                                                                       hdfs_);
 
         unique_ptr<WorkerMessage> worker_message(
             WorkerMessage::WorkOrderMessage(work_order, proto.operator_index()));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b21c85b6/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 6538d48..e0b4312 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -30,6 +30,8 @@
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/WorkerDirectory.hpp"
+#include "storage/Flags.hpp"
+#include "storage/StorageConfig.h"  // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS.
 #include "threading/Thread.hpp"
 #include "utility/Macros.hpp"
 
@@ -64,6 +66,7 @@ class Shiftboss : public Thread {
    * @param bus A pointer to the TMB.
    * @param storage_manager The StorageManager to use.
    * @param workers A pointer to the WorkerDirectory.
+   * @param hdfs The HDFS connector via libhdfs3.
    * @param cpu_id The ID of the CPU to which the Shiftboss thread can be pinned.
    *
    * @note If cpu_id is not specified, Shiftboss thread can be possibly moved
@@ -72,10 +75,12 @@ class Shiftboss : public Thread {
   Shiftboss(tmb::MessageBus *bus,
             StorageManager *storage_manager,
             WorkerDirectory *workers,
+            void *hdfs,
             const int cpu_id = -1)
       : bus_(DCHECK_NOTNULL(bus)),
         storage_manager_(DCHECK_NOTNULL(storage_manager)),
         workers_(DCHECK_NOTNULL(workers)),
+        hdfs_(hdfs),
         cpu_id_(cpu_id),
         shiftboss_client_id_(tmb::kClientIdNone),
         foreman_client_id_(tmb::kClientIdNone),
@@ -84,6 +89,12 @@ class Shiftboss : public Thread {
     // Check to have at least one Worker.
     DCHECK_GT(workers->getNumWorkers(), 0u);
 
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+    if (FLAGS_use_hdfs) {
+      CHECK(hdfs_);
+    }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
     shiftboss_client_id_ = bus_->Connect();
     LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_;
     DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone);
@@ -228,6 +239,9 @@ class Shiftboss : public Thread {
   StorageManager *storage_manager_;
   WorkerDirectory *workers_;
 
+  // Not owned.
+  void *hdfs_;
+
   // The ID of the CPU that the Shiftboss thread can optionally be pinned to.
   const int cpu_id_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b21c85b6/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 2e18467..c9f5a10 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -128,7 +128,8 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
 
     data_exchangers_[i].set_storage_manager(storage_manager.get());
     shiftbosses_.push_back(
-        make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get()));
+        make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get(),
+                               storage_manager->hdfs()));
 
     storage_managers_.push_back(move(storage_manager));
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b21c85b6/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 457d58a..1693ec2 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -491,6 +491,7 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator
                       quickstep_relationaloperators_RelationalOperator
                       quickstep_relationaloperators_WorkOrder
                       quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_storage_Flags
                       quickstep_storage_InsertDestination
                       quickstep_types_Type
                       quickstep_types_TypedValue
@@ -500,6 +501,10 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator
                       quickstep_utility_Glob
                       quickstep_utility_Macros
                       tmb)
+if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
+  target_link_libraries(quickstep_relationaloperators_TextScanOperator
+                        ${LIBHDFS3_LIBRARIES})
+endif(QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
 target_link_libraries(quickstep_relationaloperators_UpdateOperator
                       glog
                       quickstep_catalog_CatalogRelation

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b21c85b6/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 0a83a85..6333813 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -41,7 +41,14 @@
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
 #include "relational_operators/WorkOrder.pb.h"
+#include "storage/Flags.hpp"
 #include "storage/InsertDestination.hpp"
+#include "storage/StorageConfig.h"  // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS.
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+#include <hdfs/hdfs.h>
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
@@ -205,14 +212,56 @@ void TextScanWorkOrder::execute() {
 
   std::vector<TypedValue> vector_tuple_returned;
   constexpr std::size_t kSmallBufferSize = 0x4000;
-  char *buffer = reinterpret_cast<char *>(malloc(std::max(text_segment_size_, kSmallBufferSize)));
-
-  // Read text segment into buffer.
-  FILE *file = std::fopen(filename_.c_str(), "rb");
-  std::fseek(file, text_offset_, SEEK_SET);
-  std::size_t bytes_read = std::fread(buffer, 1, text_segment_size_, file);
-  if (bytes_read != text_segment_size_) {
-    throw TextScanReadError(filename_);
+  const size_t buffer_size = std::max(text_segment_size_, kSmallBufferSize);
+  char *buffer = reinterpret_cast<char *>(malloc(buffer_size));
+
+  bool use_hdfs = false;
+  std::size_t bytes_read;
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  hdfsFS hdfs = nullptr;
+  hdfsFile file_handle = nullptr;
+
+  if (FLAGS_use_hdfs) {
+    use_hdfs = true;
+    hdfs = static_cast<hdfsFS>(hdfs_);
+
+    file_handle = hdfsOpenFile(hdfs, filename_.c_str(), O_RDONLY, buffer_size,
+                               0 /* default replication */, 0 /* default block size */);
+    if (file_handle == nullptr) {
+      LOG(ERROR) << "Failed to open file " << filename_ << " with error: " << strerror(errno);
+      return;
+    }
+
+    if (hdfsSeek(hdfs, file_handle, text_offset_)) {
+      LOG(ERROR) << "Failed to seek in file " << filename_ << " with error: " << strerror(errno);
+
+      hdfsCloseFile(hdfs, file_handle);
+      return;
+    }
+
+    bytes_read = hdfsRead(hdfs, file_handle, buffer, text_segment_size_);
+    if (bytes_read != text_segment_size_) {
+      hdfsCloseFile(hdfs, file_handle);
+      throw TextScanReadError(filename_);
+    }
+  }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
+  FILE *file = nullptr;
+  if (!use_hdfs) {
+    // Avoid unused-private-field warning.
+    (void) hdfs_;
+
+    // Read text segment into buffer.
+    file = std::fopen(filename_.c_str(), "rb");
+    std::fseek(file, text_offset_, SEEK_SET);
+    bytes_read = std::fread(buffer, 1, text_segment_size_, file);
+
+    if (bytes_read != text_segment_size_) {
+      std::fclose(file);
+      throw TextScanReadError(filename_);
+    }
   }
 
   // Locate the first newline character.
@@ -266,10 +315,36 @@ void TextScanWorkOrder::execute() {
   // that the last tuple is very small / very large.
   std::size_t dynamic_read_size = 1024;
   std::string row_string;
-  std::fseek(file, text_offset_ + (end_ptr - buffer), SEEK_SET);
+
+  const size_t dynamic_read_offset = text_offset_ + (end_ptr - buffer);
+  if (use_hdfs) {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+    if (hdfsSeek(hdfs, file_handle, dynamic_read_offset)) {
+      LOG(ERROR) << "Failed to seek in file " << filename_ << " with error: " << strerror(errno);
+
+      hdfsCloseFile(hdfs, file_handle);
+      return;
+    }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  } else {
+    std::fseek(file, dynamic_read_offset, SEEK_SET);
+  }
+
   bool has_reached_end = false;
   do {
-    bytes_read = std::fread(buffer, 1, dynamic_read_size, file);
+    if (use_hdfs) {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+      bytes_read = hdfsRead(hdfs, file_handle, buffer, dynamic_read_size);
+
+      // Read again when acrossing the HDFS block boundary.
+      if (bytes_read != dynamic_read_size) {
+        bytes_read += hdfsRead(hdfs, file_handle, buffer + bytes_read, dynamic_read_size - bytes_read);
+      }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+    } else {
+      bytes_read = std::fread(buffer, 1, dynamic_read_size, file);
+    }
+
     std::size_t bytes_to_copy = bytes_read;
 
     for (std::size_t i = 0; i < bytes_read; ++i) {
@@ -303,7 +378,14 @@ void TextScanWorkOrder::execute() {
     }
   }
 
-  std::fclose(file);
+  if (use_hdfs) {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+    hdfsCloseFile(hdfs, file_handle);
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  } else {
+    std::fclose(file);
+  }
+
   free(buffer);
 
   // Store the tuples in a ColumnVectorsValueAccessor for bulk insert.
@@ -334,7 +416,8 @@ void TextScanWorkOrder::execute() {
 }
 
 std::vector<TypedValue> TextScanWorkOrder::parseRow(const char **row_ptr,
-                                  const CatalogRelationSchema &relation, bool *is_faulty) const {
+                                                    const CatalogRelationSchema &relation,
+                                                    bool *is_faulty) const {
   std::vector<TypedValue> attribute_values;
   // Always assume current row is not faulty initially.
   *is_faulty = false;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b21c85b6/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index eada190..59821fc 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -189,6 +189,7 @@ class TextScanWorkOrder : public WorkOrder {
    * @param process_escape_sequences Whether to decode escape sequences in the
    *        text file.
    * @param output_destination The InsertDestination to insert tuples.
+   * @param hdfs The HDFS connector via libhdfs3.
    **/
   TextScanWorkOrder(
       const std::size_t query_id,
@@ -197,14 +198,16 @@ class TextScanWorkOrder : public WorkOrder {
       const std::size_t text_segment_size,
       const char field_terminator,
       const bool process_escape_sequences,
-      InsertDestination *output_destination)
+      InsertDestination *output_destination,
+      void *hdfs = nullptr)
       : WorkOrder(query_id),
         filename_(filename),
         text_offset_(text_offset),
         text_segment_size_(text_segment_size),
         field_terminator_(field_terminator),
         process_escape_sequences_(process_escape_sequences),
-        output_destination_(DCHECK_NOTNULL(output_destination)) {}
+        output_destination_(DCHECK_NOTNULL(output_destination)),
+        hdfs_(hdfs) {}
 
   ~TextScanWorkOrder() override {}
 
@@ -332,6 +335,9 @@ class TextScanWorkOrder : public WorkOrder {
 
   InsertDestination *output_destination_;
 
+  // Not owned.
+  void *hdfs_;
+
   DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b21c85b6/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index d2c8251..cf0ee74 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -75,7 +75,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
                                                   QueryContext *query_context,
                                                   StorageManager *storage_manager,
                                                   const tmb::client_id shiftboss_client_id,
-                                                  tmb::MessageBus *bus) {
+                                                  tmb::MessageBus *bus,
+                                                  void *hdfs) {
   DCHECK(query_context != nullptr);
   DCHECK(ProtoIsValid(proto, *catalog_database, *query_context))
       << "Attempted to create WorkOrder from an invalid proto description:\n"
@@ -473,7 +474,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
           proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
           query_context->getInsertDestination(
-              proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)));
+              proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
+          hdfs);
     }
     case serialization::UPDATE: {
       LOG(INFO) << "Creating UpdateWorkOrder in Shiftboss " << shiftboss_index;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b21c85b6/relational_operators/WorkOrderFactory.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.hpp b/relational_operators/WorkOrderFactory.hpp
index acf3855..ece687b 100644
--- a/relational_operators/WorkOrderFactory.hpp
+++ b/relational_operators/WorkOrderFactory.hpp
@@ -59,6 +59,7 @@ class WorkOrderFactory {
    * @param storage_manager The StorageManager to use.
    * @param shiftboss_client_id The TMB client id of Shiftboss.
    * @param bus A pointer to the TMB.
+   * @param hdfs The HDFS connector via libhdfs3.
    *
    * @return A new WorkOrder reconstructed from the supplied Protocol Buffer.
    **/
@@ -68,7 +69,8 @@ class WorkOrderFactory {
                                          QueryContext *query_context,
                                          StorageManager *storage_manager,
                                          const tmb::client_id shiftboss_client_id,
-                                         tmb::MessageBus *bus);
+                                         tmb::MessageBus *bus,
+                                         void *hdfs);
 
   /**
    * @brief Check whether a serialization::WorkOrder is fully-formed and

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b21c85b6/storage/FileManagerHdfs.hpp
----------------------------------------------------------------------
diff --git a/storage/FileManagerHdfs.hpp b/storage/FileManagerHdfs.hpp
index f47e4a8..a8feb50 100644
--- a/storage/FileManagerHdfs.hpp
+++ b/storage/FileManagerHdfs.hpp
@@ -55,6 +55,15 @@ class FileManagerHdfs : public FileManager {
 
   block_id_counter getMaxUsedBlockCounter(const block_id_domain block_domain) const override;
 
+  /**
+   * @brief Get the HDFS connector via libhdfs3.
+   *
+   * @return The HDFS connector.
+   **/
+  void* hdfs() {
+    return static_cast<void*>(hdfs_);
+  }
+
  private:
   // libhdfs3 has an API to release this pointer.
   hdfsFS hdfs_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b21c85b6/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 6f7d38b..872e8cc 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -570,6 +570,15 @@ bool StorageManager::DataExchangerClientAsync::Pull(const block_id block,
   return true;
 }
 
+void* StorageManager::hdfs() {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  if (FLAGS_use_hdfs) {
+    return static_cast<FileManagerHdfs*>(file_manager_.get())->hdfs();
+  }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  return nullptr;
+}
+
 vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id block) {
   serialization::BlockMessage proto;
   proto.set_block_id(block);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b21c85b6/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 42176ee..dc4b7e8 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -41,7 +41,6 @@
 #include "storage/StorageBlob.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageConfig.h"
 #include "storage/StorageConstants.hpp"
 #include "threading/SpinSharedMutex.hpp"
 #include "utility/Macros.hpp"
@@ -395,6 +394,13 @@ class StorageManager {
   void pullBlockOrBlob(const block_id block, PullResponse *response) const;
 #endif
 
+  /**
+   * @brief Get the HDFS connector via libhdfs3.
+   *
+   * @return The HDFS connector.
+   **/
+  void* hdfs();
+
  private:
   struct BlockHandle {
     void *block_memory;


[4/6] incubator-quickstep git commit: Visualized Execution DAG in the distributed version.

Posted by zu...@apache.org.
Visualized Execution DAG in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/f5c063a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/f5c063a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/f5c063a1

Branch: refs/heads/two-level-tmb
Commit: f5c063a19d0b9ff4327041f707a1dc38c343f727
Parents: 2b2d7ba
Author: Zuyu Zhang <zu...@apache.org>
Authored: Fri Feb 10 22:01:48 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Feb 23 00:39:05 2017 -0800

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                |  5 ++++-
 query_execution/PolicyEnforcerBase.cpp        |  7 ++----
 query_execution/PolicyEnforcerBase.hpp        | 16 +++++++++++--
 query_execution/PolicyEnforcerDistributed.cpp | 26 +++++++++++++++++++++-
 query_execution/QueryManagerBase.cpp          | 11 +++++++++
 query_execution/QueryManagerBase.hpp          | 12 ++++++++++
 query_optimizer/QueryHandle.hpp               |  7 ++++++
 7 files changed, 75 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f5c063a1/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 5ad6999..50bf694 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -166,6 +166,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_ShiftbossDirectory
                         quickstep_queryoptimizer_QueryHandle
                         quickstep_storage_StorageBlockInfo
+                        quickstep_utility_ExecutionDAGVisualizer
                         quickstep_utility_Macros
                         tmb
                         ${GFLAGS_LIB_NAME})
@@ -246,7 +247,9 @@ target_link_libraries(quickstep_queryexecution_QueryManagerBase
                       quickstep_relationaloperators_WorkOrder
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_DAG
-                      quickstep_utility_Macros)
+                      quickstep_utility_ExecutionDAGVisualizer
+                      quickstep_utility_Macros
+                      ${GFLAGS_LIB_NAME})
 if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution_QueryManagerDistributed
                         quickstep_catalog_CatalogTypedefs

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f5c063a1/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 082f6e9..1ffde4d 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -40,15 +40,12 @@
 
 namespace quickstep {
 
+DECLARE_bool(visualize_execution_dag);
+
 DEFINE_bool(profile_and_report_workorder_perf, false,
     "If true, Quickstep will record the exceution time of all the individual "
     "normal work orders and report it at the end of query execution.");
 
-DEFINE_bool(visualize_execution_dag, false,
-            "If true, visualize the execution plan DAG into a graph in DOT "
-            "format (DOT is a plain text graph description language) which is "
-            "then printed via stderr.");
-
 PolicyEnforcerBase::PolicyEnforcerBase(CatalogDatabaseLite *catalog_database)
     : catalog_database_(catalog_database),
       profile_individual_workorders_(FLAGS_profile_and_report_workorder_perf || FLAGS_visualize_execution_dag) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f5c063a1/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index 4107817..f66134b 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -113,6 +113,19 @@ class PolicyEnforcerBase {
   }
 
   /**
+   * @brief Check if the given query has profiling results.
+   *
+   * @note Even enabled profiling, not every query has profiling results.
+   *       For example, CreateTable and CreateIndex do not produce work orders,
+   *       so they do not have profiling results.
+   *
+   * @return True if it has profiling results, otherwise false.
+   **/
+  bool hasProfilingResults(const std::size_t query_id) const {
+    return workorder_time_recorder_.find(query_id) != workorder_time_recorder_.end();
+  }
+
+  /**
    * @brief Get the profiling results for individual work order execution for a
    *        given query.
    *
@@ -127,8 +140,7 @@ class PolicyEnforcerBase {
   inline const std::vector<WorkOrderTimeEntry>& getProfilingResults(
       const std::size_t query_id) const {
     DCHECK(profile_individual_workorders_);
-    DCHECK(workorder_time_recorder_.find(query_id) !=
-           workorder_time_recorder_.end());
+    DCHECK(hasProfilingResults(query_id));
     return workorder_time_recorder_.at(query_id);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f5c063a1/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 8f0332d..6ee58a8 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -15,11 +15,14 @@
 #include "query_execution/PolicyEnforcerDistributed.hpp"
 
 #include <cstddef>
+#include <cstdio>
 #include <cstdlib>
 #include <memory>
 #include <queue>
-#include <utility>
+#include <sstream>
+#include <string>
 #include <unordered_map>
+#include <utility>
 #include <vector>
 
 #include "catalog/Catalog.pb.h"
@@ -33,6 +36,7 @@
 #include "query_execution/QueryManagerDistributed.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "utility/ExecutionDAGVisualizer.hpp"
 
 #include "gflags/gflags.h"
 #include "glog/logging.h"
@@ -53,6 +57,8 @@ using tmb::TaggedMessage;
 
 namespace quickstep {
 
+DECLARE_bool(visualize_execution_dag);
+
 namespace S = serialization;
 
 DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
@@ -228,6 +234,24 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
   const tmb::client_id cli_id = query_handle->getClientId();
   const std::size_t query_id = query_handle->query_id();
 
+  if (FLAGS_visualize_execution_dag && hasProfilingResults(query_id)) {
+    ExecutionDAGVisualizer* dag_visualizer = query_manager->dag_visualizer();
+    dag_visualizer->bindProfilingStats(getProfilingResults(query_id));
+
+    std::ostringstream dot_filename;
+    dot_filename << query_id << ".dot";
+
+    FILE *fp = std::fopen(dot_filename.str().c_str(), "w");
+    CHECK_NOTNULL(fp);
+    const std::string dot_file_content(dag_visualizer->toDOT());
+    const std::size_t dot_file_length = dot_file_content.length();
+
+    CHECK_EQ(dot_file_length,
+             std::fwrite(dot_file_content.c_str(), sizeof(char), dot_file_length, fp));
+
+    std::fclose(fp);
+  }
+
   // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
   tmb::Address shiftboss_addresses;
   for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f5c063a1/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index 5f8c6a3..f84ad4e 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -30,12 +30,18 @@
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
 
+#include "gflags/gflags.h"
 #include "glog/logging.h"
 
 using std::pair;
 
 namespace quickstep {
 
+DEFINE_bool(visualize_execution_dag, false,
+            "If true, visualize the execution plan DAG into a graph in DOT "
+            "format (DOT is a plain text graph description language) which is "
+            "then printed via stderr.");
+
 QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
     : query_handle_(DCHECK_NOTNULL(query_handle)),
       query_id_(query_handle->query_id()),
@@ -45,6 +51,11 @@ QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
       output_consumers_(num_operators_in_dag_),
       blocking_dependencies_(num_operators_in_dag_),
       query_exec_state_(new QueryExecutionState(num_operators_in_dag_)) {
+  if (FLAGS_visualize_execution_dag) {
+    dag_visualizer_ =
+        std::make_unique<quickstep::ExecutionDAGVisualizer>(query_handle_->getQueryPlan());
+  }
+
   for (dag_node_index node_index = 0;
        node_index < num_operators_in_dag_;
        ++node_index) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f5c063a1/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index d0bb0ea..27fa6dc 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -31,6 +31,7 @@
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/DAG.hpp"
+#include "utility/ExecutionDAGVisualizer.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
@@ -149,6 +150,15 @@ class QueryManagerBase {
    **/
   QueryStatusCode queryStatus(const dag_node_index op_index);
 
+  /**
+   * @brief Get the execution DAG visualizer.
+   *
+   * @return the execution DAG visualizer.
+   **/
+  ExecutionDAGVisualizer* dag_visualizer() {
+    return dag_visualizer_.get();
+  }
+
  protected:
   /**
    * @brief Process a current relational operator: Get its workorders and store
@@ -276,6 +286,8 @@ class QueryManagerBase {
 
   std::unique_ptr<QueryExecutionState> query_exec_state_;
 
+  std::unique_ptr<ExecutionDAGVisualizer> dag_visualizer_;
+
  private:
   /**
    * @brief Check if the given operator's normal execution is over.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f5c063a1/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index cbd1cd9..7cb4f68 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -87,6 +87,13 @@ class QueryHandle {
   }
 
   /**
+   * @return The const query plan.
+   */
+  const QueryPlan& getQueryPlan() const {
+    return *query_plan_;
+  }
+
+  /**
    * @return The mutable query plan.
    */
   QueryPlan* getQueryPlanMutable() {