You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/01 22:43:13 UTC

[01/24] incubator-quickstep git commit: Adds marcs ssh key to KEYS [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/dist-patch ce86f7745 -> 57ca76249 (forced update)


Adds marcs ssh key to KEYS


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/f6480fb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/f6480fb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/f6480fb5

Branch: refs/heads/dist-patch
Commit: f6480fb59a2f3be6d37274f2a2376e733ba98d22
Parents: ab46d78
Author: cramja <ma...@gmail.com>
Authored: Tue Feb 14 17:02:20 2017 -0600
Committer: cramja <ma...@gmail.com>
Committed: Tue Feb 14 17:02:20 2017 -0600

----------------------------------------------------------------------
 KEYS | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 52 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f6480fb5/KEYS
----------------------------------------------------------------------
diff --git a/KEYS b/KEYS
index ce3964f..a10f790 100644
--- a/KEYS
+++ b/KEYS
@@ -49,3 +49,55 @@ iYXZdNoVw3RZC2XRQB+as9wYnz/Ziqbrrw58/E5FdmC8U8+Fa/0lTUh6VsPjpu5u
 E7agqOm2ReVbNLPyHa2oGftKu0Cwyghbys5xNxqbNPQnFR9N9Soi+0n4IGCZ/tj5
 =qv6a
 -----END PGP PUBLIC KEY BLOCK-----
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v2
+
+mQINBFijfvUBEACtbFV+BN1MRgP5s1/W1cKHbXsjBz8C+xlblRSjewrdlrHJEiu7
++qP3DkaJse8odnUbuveYuVyQQ+gQq8Qn+w8CzRBqnFn4V2xGKouQhlZ9Q+QhNzI/
+Y2+2XfDSY8GAxV3e5BnLCAnRhQJ4UX3aIU1ZmdVJ8Hag3Pfv5BKf/DpgEfPJ+G0z
+d4oy9gN96Zjlui/QwMaF4wCCE8P5tXm04QXyK+bs4pV9WsjMf5LW/gBZAgof2Kfp
+rEBM2aAjSmsfysNLXB/aakyjdEMtUV7wmtHe+mhbo8JmTNrhXsIhNbzXIeq2lRLL
+dNzUn5WQ03F69sx1tBXG5Ro8xA0TEjzigsy/4RlIgplws8rR3gP3H9RSlXNvifoC
+ZCHNxFdXHJ908q6SFfCCZR8eUd5mbG3mB04HEXccOl3E0456U0Aee2DK4uGp/RVN
+b4pvZgLRkGcymy4hg+QR56ixq+ovRi75m9zcGe3wvYZNoAL5IscqHvQpn2iCTBuJ
+ZbTOENKTfzncZZhApgzLZhdnAkEsyyhY8qOJnS+0TERSXXyoR1nHpVgnLwwCS7Oi
+rXhQR+YqA3gwcuyukwMSy25+VJLAtls0sH7wi6EWtWE/4mytScQV/AFTLqpA7wYY
+X5ceEdIqp0YwBi8HRrd1sDyRWYHTKLLUS9jgzZzgI54UexggXxn/h46DAwARAQAB
+tCpNYXJjIFNwZWhsbWFubiAoYXBhY2hlKSA8c3BlaGxAYXBhY2hlLm9yZz6JAjcE
+EwEKACEFAlijfvUCGwMFCwkIBwMFFQoJCAsFFgIDAQACHgECF4AACgkQ+r4yMlop
+iZqLDw//Q3MQX6xj1kSAxEHWbKziW49oMQylBwlLd9G0Zvo4VUd4Kgkl/sTF1nRn
+a3gp0SFI68z4I53DjBy5gzMQLhUArj6ygoTIFEqmaqnqXdwbYitUuexFmU7QuffU
+eJGrfbnCR+412SuSbs/yUIiqtzZQMMyX9eHRDKa03bRmXiKOvH4yWoQHmhNSI9jx
+cm0NJA7UirmtZ7Od3U+yEJCAje9RockzIyzY9Wgmmanxw14ig6RvIebBiEUeiyM1
+S5YsQ4r3SjZQyK2IlStDQbIrw/npg7SMpi/1QAgNBtyNuIJN4ehUukVwBzH75kK9
+d8R2tfn4ECNdjyR7zxGTu+C6Xy7i5nMyVDlU5KG+3tTwFh4RmxcuJrR6rNjnBtj+
+GUJHselHEcaDRW3x4w1kIIBqdiCo8JP8F1p786lZiCSYWeHHrXNQlwKnmtoyzn7x
+RuHjY6N6bY/HuJO0DVKW1L7ModtqpANe4e1sJbBmkJcrs9Cybiyfo/uGHG+tirKp
+9+dvoe6PFynA2TpwIluJHni0YNA9ipZHATZciFYo7+/XNXHei1YFYWObgdYTyGKk
+txTpcSsnJiDsNc1QEQ5DqxgeM4BV1/3wn64M83jI+6KkL+FWTKPemAg+OvL/Cmf7
+B/NBXi/QNvhwfSfD09naAnosjeG23Ggq1uOmOawoL7D/Y38+/LG5Ag0EWKN+9QEQ
+AJ01B8e5B6A+h+qS/ttGqZr1KKE0/yE6HNvxaxPo7GIJftqFdLnnZ786GUEecBV0
+XD+bejYv4bMEnlzv9UpQRJqY2npsnjf1UqlK2ODYyn0bD3LZ1nooRRGfUyiacPLy
+7KXwgweIuuwgS/tdTh2c+v9wYx4fmOlb4ZV7cmlyx4s3IJW3b0EhejsUBsIoFRRS
+59x+QlLQMlyPhLEvAUo+OsWaa4Wotjbxfv3VrK+0ZTEGHteL/CoQ+xZI1friLbKY
+OZZJiQTBQQaW6ojp6vtxYHNBrWIVuM41skKSwvHx7tKLJu03EY3/xdgCoKZ5+KTX
+NSPlfzYmWAAqVDGiH76WEHUQtp3E3Er/iloZM4dmBsi7So1yahk2dbCnfpjBHPMi
+a0t81RbSdAURzC+6tCNianUy/COed4+FBPDTHaekf1qtrBJNTV6iTsECf9uvp6qD
+QGDpah4/swKLFFqxClEAW+VaeWB+9nP6yZUGh7gp5SAMoAa5vSC8g8FzuTIsg/MI
+a0GhkHVopuILqzDSlhlJeh1cGvx1Hm2j0MtywMctJ0OXAO+oJcH0cx+JtJz3IFXS
+SwipTc9ySSk9bMOVhuQyouziU1UWW/sPSUvtKX4vV4pAaTL0ERzNH8gRsCf8/9Cu
+XGqJMlrVwZ+yFUaImiseAkzpBXlGYjgQBtd6e2/A8qsVABEBAAGJAh4EGAEKAAkF
+AlijfvUCGwwACgkQ+r4yMlopiZoW+g/1F8nTwnf+l3VcIUMQRwNe8X7/RZ5BrQnN
+oWZ9Nw4Fc40L90l/KZSp/k2pYo6KqRYlcHsYyMdjAuv+hSkPckwnS11wsrwG8CxQ
+1JuW5P9TQUpyN+367/AEOwc4qwJMo1RtLpjdqy3ssl1Rv25BJovzbd1ag7LF+RlM
+2o8o05t0gPkMa8bYu05XBTISKCT4qtAn8RrrUjvDNP7hsGfRUsN2FGIcvRu5Rt7t
+Uwt2JBjZ0+xmOt6f7ytzw6vNuItALDtzP1geXAHM+BddOXut9AnjVuEK+v92duPW
+LHepFB9zyNN5AoENGQvzl413lF9EuXOoiEXmtg+vRyKeEmJTtwzL3FTahvLg0pYm
+7h1InRxXtVilPPn3tcjNIvfpcHNUXwj3YstT380zBYAKxovG9rmHR8sKwgSNiK4N
+gbnVVBcIYQ/ogsdxqr6yjxqrWpeuph9BCeroxxDLMaFq260KyMIPsB34F5g3L1v5
+NM0GzLhrr5xa3B7hWvDmqxMIMsRtlf+37NqV0ylc6n4ZHJa1U60jSJx2BN0jnaMe
+LriD7AMhIAI4Ew70dYSHeVtzDO2MD3GTFqdHPuVCRi3322NSSvNjjZleCNE0q3Ca
+N5xnL7HQ8wj2da/uvr6A0oX46nYMBPwDdP5nkvHmNbMPLF5b2IOeoijL48x+EVQS
+U/KHHqP5JA==
+=mW4v
+-----END PGP PUBLIC KEY BLOCK-----


[04/24] incubator-quickstep git commit: Visualized Execution DAG in the distributed version.

Posted by zu...@apache.org.
Visualized Execution DAG in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/f5c063a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/f5c063a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/f5c063a1

Branch: refs/heads/dist-patch
Commit: f5c063a19d0b9ff4327041f707a1dc38c343f727
Parents: 2b2d7ba
Author: Zuyu Zhang <zu...@apache.org>
Authored: Fri Feb 10 22:01:48 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Feb 23 00:39:05 2017 -0800

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                |  5 ++++-
 query_execution/PolicyEnforcerBase.cpp        |  7 ++----
 query_execution/PolicyEnforcerBase.hpp        | 16 +++++++++++--
 query_execution/PolicyEnforcerDistributed.cpp | 26 +++++++++++++++++++++-
 query_execution/QueryManagerBase.cpp          | 11 +++++++++
 query_execution/QueryManagerBase.hpp          | 12 ++++++++++
 query_optimizer/QueryHandle.hpp               |  7 ++++++
 7 files changed, 75 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f5c063a1/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 5ad6999..50bf694 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -166,6 +166,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_ShiftbossDirectory
                         quickstep_queryoptimizer_QueryHandle
                         quickstep_storage_StorageBlockInfo
+                        quickstep_utility_ExecutionDAGVisualizer
                         quickstep_utility_Macros
                         tmb
                         ${GFLAGS_LIB_NAME})
@@ -246,7 +247,9 @@ target_link_libraries(quickstep_queryexecution_QueryManagerBase
                       quickstep_relationaloperators_WorkOrder
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_DAG
-                      quickstep_utility_Macros)
+                      quickstep_utility_ExecutionDAGVisualizer
+                      quickstep_utility_Macros
+                      ${GFLAGS_LIB_NAME})
 if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution_QueryManagerDistributed
                         quickstep_catalog_CatalogTypedefs

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f5c063a1/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 082f6e9..1ffde4d 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -40,15 +40,12 @@
 
 namespace quickstep {
 
+DECLARE_bool(visualize_execution_dag);
+
 DEFINE_bool(profile_and_report_workorder_perf, false,
     "If true, Quickstep will record the exceution time of all the individual "
     "normal work orders and report it at the end of query execution.");
 
-DEFINE_bool(visualize_execution_dag, false,
-            "If true, visualize the execution plan DAG into a graph in DOT "
-            "format (DOT is a plain text graph description language) which is "
-            "then printed via stderr.");
-
 PolicyEnforcerBase::PolicyEnforcerBase(CatalogDatabaseLite *catalog_database)
     : catalog_database_(catalog_database),
       profile_individual_workorders_(FLAGS_profile_and_report_workorder_perf || FLAGS_visualize_execution_dag) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f5c063a1/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index 4107817..f66134b 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -113,6 +113,19 @@ class PolicyEnforcerBase {
   }
 
   /**
+   * @brief Check if the given query has profiling results.
+   *
+   * @note Even enabled profiling, not every query has profiling results.
+   *       For example, CreateTable and CreateIndex do not produce work orders,
+   *       so they do not have profiling results.
+   *
+   * @return True if it has profiling results, otherwise false.
+   **/
+  bool hasProfilingResults(const std::size_t query_id) const {
+    return workorder_time_recorder_.find(query_id) != workorder_time_recorder_.end();
+  }
+
+  /**
    * @brief Get the profiling results for individual work order execution for a
    *        given query.
    *
@@ -127,8 +140,7 @@ class PolicyEnforcerBase {
   inline const std::vector<WorkOrderTimeEntry>& getProfilingResults(
       const std::size_t query_id) const {
     DCHECK(profile_individual_workorders_);
-    DCHECK(workorder_time_recorder_.find(query_id) !=
-           workorder_time_recorder_.end());
+    DCHECK(hasProfilingResults(query_id));
     return workorder_time_recorder_.at(query_id);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f5c063a1/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 8f0332d..6ee58a8 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -15,11 +15,14 @@
 #include "query_execution/PolicyEnforcerDistributed.hpp"
 
 #include <cstddef>
+#include <cstdio>
 #include <cstdlib>
 #include <memory>
 #include <queue>
-#include <utility>
+#include <sstream>
+#include <string>
 #include <unordered_map>
+#include <utility>
 #include <vector>
 
 #include "catalog/Catalog.pb.h"
@@ -33,6 +36,7 @@
 #include "query_execution/QueryManagerDistributed.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "utility/ExecutionDAGVisualizer.hpp"
 
 #include "gflags/gflags.h"
 #include "glog/logging.h"
@@ -53,6 +57,8 @@ using tmb::TaggedMessage;
 
 namespace quickstep {
 
+DECLARE_bool(visualize_execution_dag);
+
 namespace S = serialization;
 
 DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
@@ -228,6 +234,24 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
   const tmb::client_id cli_id = query_handle->getClientId();
   const std::size_t query_id = query_handle->query_id();
 
+  if (FLAGS_visualize_execution_dag && hasProfilingResults(query_id)) {
+    ExecutionDAGVisualizer* dag_visualizer = query_manager->dag_visualizer();
+    dag_visualizer->bindProfilingStats(getProfilingResults(query_id));
+
+    std::ostringstream dot_filename;
+    dot_filename << query_id << ".dot";
+
+    FILE *fp = std::fopen(dot_filename.str().c_str(), "w");
+    CHECK_NOTNULL(fp);
+    const std::string dot_file_content(dag_visualizer->toDOT());
+    const std::size_t dot_file_length = dot_file_content.length();
+
+    CHECK_EQ(dot_file_length,
+             std::fwrite(dot_file_content.c_str(), sizeof(char), dot_file_length, fp));
+
+    std::fclose(fp);
+  }
+
   // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
   tmb::Address shiftboss_addresses;
   for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f5c063a1/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index 5f8c6a3..f84ad4e 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -30,12 +30,18 @@
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
 
+#include "gflags/gflags.h"
 #include "glog/logging.h"
 
 using std::pair;
 
 namespace quickstep {
 
+DEFINE_bool(visualize_execution_dag, false,
+            "If true, visualize the execution plan DAG into a graph in DOT "
+            "format (DOT is a plain text graph description language) which is "
+            "then printed via stderr.");
+
 QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
     : query_handle_(DCHECK_NOTNULL(query_handle)),
       query_id_(query_handle->query_id()),
@@ -45,6 +51,11 @@ QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
       output_consumers_(num_operators_in_dag_),
       blocking_dependencies_(num_operators_in_dag_),
       query_exec_state_(new QueryExecutionState(num_operators_in_dag_)) {
+  if (FLAGS_visualize_execution_dag) {
+    dag_visualizer_ =
+        std::make_unique<quickstep::ExecutionDAGVisualizer>(query_handle_->getQueryPlan());
+  }
+
   for (dag_node_index node_index = 0;
        node_index < num_operators_in_dag_;
        ++node_index) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f5c063a1/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index d0bb0ea..27fa6dc 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -31,6 +31,7 @@
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/DAG.hpp"
+#include "utility/ExecutionDAGVisualizer.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
@@ -149,6 +150,15 @@ class QueryManagerBase {
    **/
   QueryStatusCode queryStatus(const dag_node_index op_index);
 
+  /**
+   * @brief Get the execution DAG visualizer.
+   *
+   * @return the execution DAG visualizer.
+   **/
+  ExecutionDAGVisualizer* dag_visualizer() {
+    return dag_visualizer_.get();
+  }
+
  protected:
   /**
    * @brief Process a current relational operator: Get its workorders and store
@@ -276,6 +286,8 @@ class QueryManagerBase {
 
   std::unique_ptr<QueryExecutionState> query_exec_state_;
 
+  std::unique_ptr<ExecutionDAGVisualizer> dag_visualizer_;
+
  private:
   /**
    * @brief Check if the given operator's normal execution is over.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f5c063a1/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index cbd1cd9..7cb4f68 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -87,6 +87,13 @@ class QueryHandle {
   }
 
   /**
+   * @return The const query plan.
+   */
+  const QueryPlan& getQueryPlan() const {
+    return *query_plan_;
+  }
+
+  /**
    * @return The mutable query plan.
    */
   QueryPlan* getQueryPlanMutable() {


[09/24] incubator-quickstep git commit: Fix the problem with CrossReferenceCoalesceAggregate.

Posted by zu...@apache.org.
Fix the problem with CrossReferenceCoalesceAggregate.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/f4f5ca08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/f4f5ca08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/f4f5ca08

Branch: refs/heads/dist-patch
Commit: f4f5ca08600001e92502a3ad9e597e93e994d068
Parents: b88625d
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Fri Feb 24 10:46:06 2017 -0600
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Feb 24 12:51:51 2017 -0800

----------------------------------------------------------------------
 relational_operators/BuildAggregationExistenceMapOperator.cpp | 2 +-
 relational_operators/BuildAggregationExistenceMapOperator.hpp | 6 +++---
 relational_operators/WorkOrderFactory.cpp                     | 2 +-
 3 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4f5ca08/relational_operators/BuildAggregationExistenceMapOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.cpp b/relational_operators/BuildAggregationExistenceMapOperator.cpp
index 648e291..ff65265 100644
--- a/relational_operators/BuildAggregationExistenceMapOperator.cpp
+++ b/relational_operators/BuildAggregationExistenceMapOperator.cpp
@@ -144,7 +144,7 @@ bool BuildAggregationExistenceMapOperator
 serialization::WorkOrder* BuildAggregationExistenceMapOperator
     ::createWorkOrderProto(const block_id block) {
   serialization::WorkOrder *proto = new serialization::WorkOrder;
-  proto->set_work_order_type(serialization::BUILD_LIP_FILTER);
+  proto->set_work_order_type(serialization::BUILD_AGGREGATION_EXISTENCE_MAP);
   proto->set_query_id(query_id_);
 
   proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4f5ca08/relational_operators/BuildAggregationExistenceMapOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.hpp b/relational_operators/BuildAggregationExistenceMapOperator.hpp
index e2928a8..dde39d8 100644
--- a/relational_operators/BuildAggregationExistenceMapOperator.hpp
+++ b/relational_operators/BuildAggregationExistenceMapOperator.hpp
@@ -80,9 +80,9 @@ class BuildAggregationExistenceMapOperator : public RelationalOperator {
         input_relation_(input_relation),
         build_attribute_(build_attribute),
         input_relation_is_stored_(input_relation_is_stored),
+        aggr_state_index_(aggr_state_index),
         input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
                                                            : std::vector<block_id>()),
-        aggr_state_index_(aggr_state_index),
         num_workorders_generated_(0),
         started_(false) {}
 
@@ -118,9 +118,9 @@ class BuildAggregationExistenceMapOperator : public RelationalOperator {
   const CatalogRelation &input_relation_;
   const attribute_id build_attribute_;
   const bool input_relation_is_stored_;
-  std::vector<block_id> input_relation_block_ids_;
   const QueryContext::aggregation_state_id aggr_state_index_;
 
+  std::vector<block_id> input_relation_block_ids_;
   std::vector<block_id>::size_type num_workorders_generated_;
   bool started_;
 
@@ -163,8 +163,8 @@ class BuildAggregationExistenceMapWorkOrder : public WorkOrder {
   const CatalogRelationSchema &input_relation_;
   const block_id build_block_id_;
   const attribute_id build_attribute_;
-  AggregationOperationState *state_;
 
+  AggregationOperationState *state_;
   StorageManager *storage_manager_;
 
   DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapWorkOrder);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4f5ca08/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index cf0ee74..ae57e6f 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -103,7 +103,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id),
           proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_attribute),
           query_context->getAggregationState(
-              proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)),
+              proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index)),
           storage_manager);
     }
     case serialization::BUILD_LIP_FILTER: {


[23/24] incubator-quickstep git commit: Disabled LIP in the distributed version.

Posted by zu...@apache.org.
Disabled LIP in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/95be4690
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/95be4690
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/95be4690

Branch: refs/heads/dist-patch
Commit: 95be46902a365ba963f64e2a2aade794378b5875
Parents: 88bc03b
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Feb 12 17:48:53 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Mar 1 14:43:02 2017 -0800

----------------------------------------------------------------------
 query_optimizer/PhysicalGenerator.cpp | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/95be4690/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index ac51c31..ca6db3f 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -59,7 +59,7 @@ DEFINE_bool(reorder_hash_joins, true,
             "cardinality and selective tables to be joined first, which is suitable "
             "for queries on star-schema tables.");
 
-DEFINE_bool(use_filter_joins, true,
+DEFINE_bool(use_filter_joins, false,
             "If true, apply an optimization that strength-reduces HashJoins to "
             "FilterJoins (implemented as LIPFilters attached to some anchoring "
             "operators. Briefly speaking, in the case that the join attribute has "
@@ -67,7 +67,7 @@ DEFINE_bool(use_filter_joins, true,
             "build a BitVector on the build-side attribute and use the BitVector "
             "to filter the probe side table.");
 
-DEFINE_bool(use_lip_filters, true,
+DEFINE_bool(use_lip_filters, false,
             "If true, use LIP (Lookahead Information Passing) filters to accelerate "
             "query processing. LIP filters are effective for queries on star schema "
             "tables (e.g. the SSB benchmark) and snowflake schema tables (e.g. the "


[17/24] incubator-quickstep git commit: Fix a bug with SelectOperator

Posted by zu...@apache.org.
Fix a bug with SelectOperator


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e896b61d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e896b61d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e896b61d

Branch: refs/heads/dist-patch
Commit: e896b61d1932d4593cb18033fe4da23a5e238f77
Parents: e41a6aa
Author: jianqiao <ji...@cs.wisc.edu>
Authored: Wed Mar 1 13:16:47 2017 -0600
Committer: jianqiao <ji...@cs.wisc.edu>
Committed: Wed Mar 1 13:16:47 2017 -0600

----------------------------------------------------------------------
 relational_operators/SelectOperator.hpp                       | 7 +++++--
 .../tests/SortRunGenerationOperator_unittest.cpp              | 2 +-
 2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e896b61d/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index b9a4d49..df61c06 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -202,9 +202,12 @@ class SelectOperator : public RelationalOperator {
 
   bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
 
-  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+  void feedInputBlock(const block_id input_block_id,
+                      const relation_id input_relation_id,
                       const partition_id part_id) override {
-    input_relation_block_ids_[part_id].push_back(input_block_id);
+    if (input_relation_id == input_relation_.getID()) {
+      input_relation_block_ids_[part_id].push_back(input_block_id);
+    }
   }
 
   QueryContext::insert_destination_id getInsertDestinationID() const override {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e896b61d/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
index 99fafa8..acdd422 100644
--- a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
+++ b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
@@ -622,7 +622,7 @@ TEST_F(SortRunGenerationOperatorTest, 3Column_NullLast_Asc) {
 
   // Comparator for null-col-1 ASC NULLS LAST, null-col-2 ASC NULLS LAST,
   // null-col-3 ASC NULLS LAST.
-  auto comparator = [this](const Tuple &left, const Tuple &right) -> bool {
+  auto comparator = [](const Tuple &left, const Tuple &right) -> bool {
     TestTupleAttrs l = TupleToTupleAttr(left);
     TestTupleAttrs r = TupleToTupleAttr(right);
     l.c1 = l.null_c4 ? std::numeric_limits<int>::max() : l.c1;


[24/24] incubator-quickstep git commit: Refactored block loading order in StorageManager::loadBlockOrBlob.

Posted by zu...@apache.org.
Refactored block loading order in StorageManager::loadBlockOrBlob.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/57ca7624
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/57ca7624
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/57ca7624

Branch: refs/heads/dist-patch
Commit: 57ca76249965c8161f07101817e7c4eeafc0ae53
Parents: 95be469
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Feb 11 11:24:54 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Mar 1 14:43:02 2017 -0800

----------------------------------------------------------------------
 storage/StorageManager.cpp | 60 ++++++++++++++++++++---------------------
 1 file changed, 29 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57ca7624/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 94e1b67..b920c17 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -666,45 +666,43 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
   // already loaded before this function gets called.
   BlockHandle loaded_handle;
 
-#ifdef QUICKSTEP_DISTRIBUTED
-  // TODO(quickstep-team): Use a cost model to determine whether to load from
-  // a remote peer or the disk.
-  if (BlockIdUtil::Domain(block) != block_domain_) {
-    DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
-    const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
-    for (const string &peer_domain_network_address : peer_domain_network_addresses) {
-      DataExchangerClientAsync client(
-          grpc::CreateChannel(peer_domain_network_address, grpc::InsecureChannelCredentials()),
-          this);
-
-      if (client.Pull(block, numa_node, &loaded_handle)) {
-        sendBlockLocationMessage(block, kAddBlockLocationMessage);
-        return loaded_handle;
-      }
-    }
+  const size_t num_slots = file_manager_->numSlots(block);
+  if (num_slots != 0) {
+    void *block_buffer = allocateSlots(num_slots, numa_node);
 
-    DLOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
-               << " from remote peers, so try to load from disk.";
-  }
-#endif
+    const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
+    CHECK(status) << "Failed to read block from persistent storage: " << block;
 
-  const size_t num_slots = file_manager_->numSlots(block);
-  DEBUG_ASSERT(num_slots != 0);
-  void *block_buffer = allocateSlots(num_slots, numa_node);
+    loaded_handle.block_memory = block_buffer;
+    loaded_handle.block_memory_size = num_slots;
 
-  const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
-  CHECK(status) << "Failed to read block from persistent storage: " << block;
+#ifdef QUICKSTEP_DISTRIBUTED
+    if (bus_) {
+      sendBlockLocationMessage(block, kAddBlockLocationMessage);
+    }
+#endif
 
-  loaded_handle.block_memory = block_buffer;
-  loaded_handle.block_memory_size = num_slots;
+    return loaded_handle;
+  }
 
 #ifdef QUICKSTEP_DISTRIBUTED
-  if (bus_) {
-    sendBlockLocationMessage(block, kAddBlockLocationMessage);
+  // TODO(quickstep-team): Use a cost model to determine whether to load from
+  // a remote peer or the disk.
+  DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
+  const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
+  for (const string &peer_domain_network_address : peer_domain_network_addresses) {
+    DataExchangerClientAsync client(
+        grpc::CreateChannel(peer_domain_network_address, grpc::InsecureChannelCredentials()),
+        this);
+
+    if (client.Pull(block, numa_node, &loaded_handle)) {
+      sendBlockLocationMessage(block, kAddBlockLocationMessage);
+      return loaded_handle;
+    }
   }
+  LOG(FATAL) << "Failed to pull Block " << BlockIdUtil::ToString(block)
+             << " from remote peers.";
 #endif
-
-  return loaded_handle;
 }
 
 void StorageManager::insertBlockHandleAfterLoad(const block_id block,


[16/24] incubator-quickstep git commit: Fixed the bug in CommandExecutorTest.

Posted by zu...@apache.org.
Fixed the bug in CommandExecutorTest.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e41a6aa0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e41a6aa0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e41a6aa0

Branch: refs/heads/dist-patch
Commit: e41a6aa0f54f9ae9ecd929e3a9b9ef487971ca25
Parents: 5f5073f
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Feb 28 17:43:39 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Tue Feb 28 17:43:39 2017 -0800

----------------------------------------------------------------------
 cli/tests/CMakeLists.txt                  | 1 -
 cli/tests/command_executor/CMakeLists.txt | 2 +-
 2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e41a6aa0/cli/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/tests/CMakeLists.txt b/cli/tests/CMakeLists.txt
index 99fa3a3..48f27bb 100644
--- a/cli/tests/CMakeLists.txt
+++ b/cli/tests/CMakeLists.txt
@@ -27,7 +27,6 @@ add_executable(quickstep_cli_tests_CommandExecutorTest
 target_link_libraries(quickstep_cli_tests_CommandExecutorTest
                       glog
                       gtest
-                      gtest_main
                       quickstep_catalog_CatalogDatabase
                       quickstep_cli_CommandExecutor
                       quickstep_cli_DropRelation

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e41a6aa0/cli/tests/command_executor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/tests/command_executor/CMakeLists.txt b/cli/tests/command_executor/CMakeLists.txt
index 2cbf1bf..9cf1869 100644
--- a/cli/tests/command_executor/CMakeLists.txt
+++ b/cli/tests/command_executor/CMakeLists.txt
@@ -18,7 +18,7 @@
 add_test(quickstep_cli_tests_commandexecutor_d
          "../quickstep_cli_tests_CommandExecutorTest"
          "${CMAKE_CURRENT_SOURCE_DIR}/D.test"
-         "${CMAKE_CURRENT_BINARY_DIR}/Dt.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/D.test"
          "${CMAKE_CURRENT_BINARY_DIR}/D/")
 add_test(quickstep_cli_tests_commandexecutor_dt
          "../quickstep_cli_tests_CommandExecutorTest"


[12/24] incubator-quickstep git commit: Minor refactored CommandExecutor.

Posted by zu...@apache.org.
Minor refactored CommandExecutor.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4437b9d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4437b9d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4437b9d4

Branch: refs/heads/dist-patch
Commit: 4437b9d44e888229a3b6b2884ccdd0033c8e5fcb
Parents: b24349c
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Feb 26 23:17:26 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Feb 27 00:24:31 2017 -0800

----------------------------------------------------------------------
 catalog/CatalogDatabase.hpp | 10 +++++
 cli/CMakeLists.txt          |  6 ++-
 cli/CommandExecutor.cpp     | 96 +++++++++++++++++-----------------------
 cli/CommandExecutor.hpp     |  9 ----
 cli/Constants.hpp           | 44 ++++++++++++++++++
 utility/StringUtil.cpp      | 11 +++++
 utility/StringUtil.hpp      |  6 +++
 7 files changed, 117 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4437b9d4/catalog/CatalogDatabase.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogDatabase.hpp b/catalog/CatalogDatabase.hpp
index a0804a2..b071d09 100644
--- a/catalog/CatalogDatabase.hpp
+++ b/catalog/CatalogDatabase.hpp
@@ -343,6 +343,16 @@ class CatalogDatabase : public CatalogDatabaseLite {
   serialization::CatalogDatabase getProto() const;
 
   /**
+   * @brief Check whether this CatalogDatabase is empty.
+   *
+   * @return true if empty, false otherwise.
+   **/
+  bool empty() const {
+    SpinSharedMutexSharedLock<false> lock(relations_mutex_);
+    return rel_map_.empty();
+  }
+
+  /**
    * @brief Get the number of child relations.
    *
    * @return The number of child relations.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4437b9d4/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index c5f3915..7b4319a 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -44,6 +44,7 @@ configure_file (
   "${CMAKE_CURRENT_BINARY_DIR}/CliConfig.h"
 )
 add_library(quickstep_cli_CommandExecutor CommandExecutor.cpp CommandExecutor.hpp)
+add_library(quickstep_cli_Constants ../empty_src.cpp Constants.hpp)
 
 # Declare micro-libs and link dependencies:
 add_library(quickstep_cli_DropRelation DropRelation.cpp DropRelation.hpp)
@@ -86,6 +87,7 @@ target_link_libraries(quickstep_cli_CommandExecutor
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_catalog_CatalogRelationStatistics
+                      quickstep_cli_Constants
                       quickstep_cli_DropRelation
                       quickstep_cli_PrintToScreen
                       quickstep_expressions_aggregation_AggregateFunctionMax
@@ -105,7 +107,8 @@ target_link_libraries(quickstep_cli_CommandExecutor
                       quickstep_types_TypeID
                       quickstep_types_TypedValue
                       quickstep_utility_PtrVector
-                      quickstep_utility_SqlError)
+                      quickstep_utility_SqlError
+                      quickstep_utility_StringUtil)
 
 target_link_libraries(quickstep_cli_DefaultsConfigurator
                       glog
@@ -148,6 +151,7 @@ add_library(quickstep_cli ../empty_src.cpp CliModule.hpp)
 
 target_link_libraries(quickstep_cli
                       quickstep_cli_CommandExecutor
+                      quickstep_cli_Constants
                       quickstep_cli_DefaultsConfigurator
                       quickstep_cli_DropRelation
                       quickstep_cli_Flags

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4437b9d4/cli/CommandExecutor.cpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutor.cpp b/cli/CommandExecutor.cpp
index 78bff98..7f63469 100644
--- a/cli/CommandExecutor.cpp
+++ b/cli/CommandExecutor.cpp
@@ -32,6 +32,7 @@
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogRelationSchema.hpp"
 #include "catalog/CatalogRelationStatistics.hpp"
+#include "cli/Constants.hpp"
 #include "cli/DropRelation.hpp"
 #include "cli/PrintToScreen.hpp"
 #include "expressions/aggregation/AggregateFunctionMax.hpp"
@@ -52,6 +53,7 @@
 #include "types/TypedValue.hpp"
 #include "utility/PtrVector.hpp"
 #include "utility/SqlError.hpp"
+#include "utility/StringUtil.hpp"
 
 #include "glog/logging.h"
 
@@ -70,63 +72,60 @@ namespace quickstep {
 namespace cli {
 namespace {
 
-namespace C = ::quickstep::cli;
-
 void ExecuteDescribeDatabase(
-    const PtrVector<ParseString> *arguments,
+    const PtrVector<ParseString> &arguments,
     const CatalogDatabase &catalog_database,
-    StorageManager *storage_manager,
     FILE *out) {
   // Column width initialized to 6 to take into account the header name
   // and the column value table
-  int max_column_width = C::kInitMaxColumnWidth;
+  int max_column_width = kInitMaxColumnWidth;
   vector<std::size_t> num_blocks;
   const CatalogRelation *relation = nullptr;
-  if (arguments->size() == 0) {
+  if (arguments.empty()) {
     for (const CatalogRelation &rel : catalog_database) {
       max_column_width =
           std::max(static_cast<int>(rel.getName().length()), max_column_width);
       num_blocks.push_back(rel.size_blocks());
     }
   } else {
-    const ParseString &table_name = arguments->front();
+    const ParseString &table_name = arguments.front();
     const std::string &table_name_val = table_name.value();
     relation = catalog_database.getRelationByName(table_name_val);
 
     if (relation == nullptr) {
-      THROW_SQL_ERROR_AT(&(arguments->front())) << " Unrecognized relation " << table_name_val;
+      THROW_SQL_ERROR_AT(&(arguments.front())) << " Unrecognized relation " << table_name_val;
     }
     max_column_width = std::max(static_cast<int>(relation->getName().length()),
                                     max_column_width);
     num_blocks.push_back(relation->size_blocks());
   }
   // Only if we have relations work on the printing logic.
-  if (catalog_database.size() > 0) {
+  if (!catalog_database.empty()) {
     const std::size_t max_num_blocks = *std::max_element(num_blocks.begin(), num_blocks.end());
     const int max_num_blocks_digits = std::max(PrintToScreen::GetNumberOfDigits(max_num_blocks),
-                                      C::kInitMaxColumnWidth+2);
+                                      kInitMaxColumnWidth + 2);
     vector<int> column_widths;
-    column_widths.push_back(max_column_width +1);
-    column_widths.push_back(C::kInitMaxColumnWidth + 1);
+    column_widths.push_back(max_column_width + 1);
+    column_widths.push_back(kInitMaxColumnWidth + 1);
     column_widths.push_back(max_num_blocks_digits + 1);
     fputs("       List of relations\n\n", out);
-    fprintf(out, "%-*s |", max_column_width+1, " Name");
-    fprintf(out, "%-*s |", C::kInitMaxColumnWidth, " Type");
+    fprintf(out, "%-*s |", max_column_width + 1, " Name");
+    fprintf(out, "%-*s |", kInitMaxColumnWidth, " Type");
     fprintf(out, "%-*s\n", max_num_blocks_digits, " Blocks");
     PrintToScreen::printHBar(column_widths, out);
     //  If there are no argument print the entire list of tables
     //  else print the particular table only.
     vector<std::size_t>::const_iterator num_blocks_it = num_blocks.begin();
-    if (arguments->size() == 0) {
+    if (arguments.empty()) {
       for (const CatalogRelation &rel : catalog_database) {
         fprintf(out, " %-*s |", max_column_width, rel.getName().c_str());
-        fprintf(out, " %-*s |", C::kInitMaxColumnWidth - 1, "table");
+        fprintf(out, " %-*s |", kInitMaxColumnWidth - 1, "table");
         fprintf(out, " %-*lu\n", max_num_blocks_digits - 1, *num_blocks_it);
         ++num_blocks_it;
       }
     } else {
       fprintf(out, " %-*s |", max_column_width, relation->getName().c_str());
-      fprintf(out, " %-*s |", C::kInitMaxColumnWidth -1, "table");
+      fprintf(out, " %-*s |", kInitMaxColumnWidth - 1, "table");
       fprintf(out, " %-*lu\n", max_num_blocks_digits - 1, *num_blocks_it);
       ++num_blocks_it;
     }
@@ -135,18 +134,18 @@ void ExecuteDescribeDatabase(
 }
 
 void ExecuteDescribeTable(
-    const PtrVector<ParseString> *arguments,
+    const PtrVector<ParseString> &arguments,
     const CatalogDatabase &catalog_database, FILE *out) {
-  const ParseString &table_name = arguments->front();
+  const ParseString &table_name = arguments.front();
   const std::string &table_name_val = table_name.value();
   const CatalogRelation *relation =
       catalog_database.getRelationByName(table_name_val);
   if (relation == nullptr) {
-    THROW_SQL_ERROR_AT(&(arguments->front())) << " Unrecognized relation "  << table_name_val;
+    THROW_SQL_ERROR_AT(&(arguments.front())) << " Unrecognized relation "  << table_name_val;
   }
   vector<int> column_widths;
-  int max_attr_column_width = C::kInitMaxColumnWidth;
-  int max_type_column_width = C::kInitMaxColumnWidth;
+  int max_attr_column_width = kInitMaxColumnWidth;
+  int max_type_column_width = kInitMaxColumnWidth;
 
   for (const CatalogAttribute &attr : *relation) {
     // Printed column needs to be wide enough to print:
@@ -160,12 +159,12 @@ void ExecuteDescribeTable(
             static_cast<int>(attr.getType().getName().length()));
   }
   // Add room for one extra character to allow spacing between the column ending and the vertical bar
-  column_widths.push_back(max_attr_column_width+1);
-  column_widths.push_back(max_type_column_width+1);
+  column_widths.push_back(max_attr_column_width + 1);
+  column_widths.push_back(max_type_column_width + 1);
 
-  fprintf(out, "%*s \"%s\"\n", C::kInitMaxColumnWidth, "Table", table_name_val.c_str());
-  fprintf(out, "%-*s |", max_attr_column_width+1, " Column");
-  fprintf(out, "%-*s\n", max_type_column_width+1, " Type");
+  fprintf(out, "%*s \"%s\"\n", kInitMaxColumnWidth, "Table", table_name_val.c_str());
+  fprintf(out, "%-*s |", max_attr_column_width + 1, " Column");
+  fprintf(out, "%-*s\n", max_type_column_width + 1, " Type");
   PrintToScreen::printHBar(column_widths, out);
   for (const CatalogAttribute &attr : *relation) {
     fprintf(out, " %-*s |", max_attr_column_width,
@@ -175,7 +174,7 @@ void ExecuteDescribeTable(
   }
   // TODO(rogers): Add handlers for partitioning information.
   if (relation->hasIndexScheme()) {
-    fprintf(out, "%*s\n", C::kInitMaxColumnWidth+2, " Indexes");
+    fprintf(out, "%*s\n", kInitMaxColumnWidth + 2, " Indexes");
     const quickstep::IndexScheme &index_scheme = relation->getIndexScheme();
     for (auto index_it = index_scheme.begin(); index_it != index_scheme.end();
          ++index_it) {
@@ -213,7 +212,7 @@ inline std::vector<TypedValue> ExecuteQueryForSingleRow(
   parser_wrapper->feedNextBuffer(new std::string(query_string));
 
   ParseResult result = parser_wrapper->getNextStatement();
-  DCHECK(result.condition == ParseResult::kSuccess);
+  DCHECK_EQ(ParseResult::kSuccess, result.condition);
 
   const ParseStatement &statement = *result.parsed_statement;
   const CatalogRelation *query_result_relation = nullptr;
@@ -293,21 +292,7 @@ inline TypedValue ExecuteQueryForSingleResult(
   return results[0];
 }
 
-/**
- * @brief A helper function for escaping quotes (i.e. ' or ").
- */
-std::string EscapeQuotes(const std::string &str, const char quote) {
-  std::string ret;
-  for (const char c : str) {
-    ret.push_back(c);
-    if (c == quote) {
-      ret.push_back(c);
-    }
-  }
-  return ret;
-}
-
-void ExecuteAnalyze(const PtrVector<ParseString> *arguments,
+void ExecuteAnalyze(const PtrVector<ParseString> &arguments,
                     const tmb::client_id main_thread_client_id,
                     const tmb::client_id foreman_client_id,
                     MessageBus *bus,
@@ -318,10 +303,10 @@ void ExecuteAnalyze(const PtrVector<ParseString> *arguments,
 
   std::unique_ptr<SqlParserWrapper> parser_wrapper(new SqlParserWrapper());
   std::vector<std::reference_wrapper<const CatalogRelation>> relations;
-  if (arguments->size() == 0) {
+  if (arguments.empty()) {
     relations.insert(relations.begin(), database.begin(), database.end());
   } else {
-    for (const auto &rel_name : *arguments) {
+    for (const auto &rel_name : arguments) {
       const CatalogRelation *rel = database.getRelationByName(rel_name.value());
       if (rel == nullptr) {
         THROW_SQL_ERROR_AT(&rel_name) << "Table does not exist";
@@ -382,7 +367,7 @@ void ExecuteAnalyze(const PtrVector<ParseString> *arguments,
                                    parser_wrapper.get());
 
       auto results_it = results.begin();
-      DCHECK(results_it->getTypeID() == TypeID::kLong);
+      DCHECK_EQ(TypeID::kLong, results_it->getTypeID());
 
       const attribute_id attr_id = attribute.getID();
       mutable_stat->setNumDistinctValues(attr_id,
@@ -411,7 +396,7 @@ void ExecuteAnalyze(const PtrVector<ParseString> *arguments,
                                     query_processor,
                                     parser_wrapper.get());
 
-    DCHECK(num_tuples.getTypeID() == TypeID::kLong);
+    DCHECK_EQ(TypeID::kLong, num_tuples.getTypeID());
     mutable_stat->setNumTuples(num_tuples.getLiteral<std::int64_t>());
 
     mutable_stat->setExactness(true);
@@ -434,17 +419,17 @@ void executeCommand(const ParseStatement &statement,
                     QueryProcessor *query_processor,
                     FILE *out) {
   const ParseCommand &command = static_cast<const ParseCommand &>(statement);
-  const PtrVector<ParseString> *arguments = command.arguments();
+  const PtrVector<ParseString> &arguments = *(command.arguments());
   const std::string &command_str = command.command()->value();
-  if (command_str == C::kDescribeDatabaseCommand) {
-    ExecuteDescribeDatabase(arguments, catalog_database, storage_manager, out);
-  } else if (command_str == C::kDescribeTableCommand) {
-    if (arguments->size() == 0) {
-      ExecuteDescribeDatabase(arguments, catalog_database, storage_manager, out);
+  if (command_str == kDescribeDatabaseCommand) {
+    ExecuteDescribeDatabase(arguments, catalog_database, out);
+  } else if (command_str == kDescribeTableCommand) {
+    if (arguments.empty()) {
+      ExecuteDescribeDatabase(arguments, catalog_database, out);
     } else {
       ExecuteDescribeTable(arguments, catalog_database, out);
     }
-  } else if (command_str == C::kAnalyzeCommand) {
+  } else if (command_str == kAnalyzeCommand) {
     ExecuteAnalyze(arguments,
                    main_thread_client_id,
                    foreman_client_id,
@@ -455,5 +440,6 @@ void executeCommand(const ParseStatement &statement,
     THROW_SQL_ERROR_AT(command.command()) << "Invalid Command";
   }
 }
+
 }  // namespace cli
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4437b9d4/cli/CommandExecutor.hpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutor.hpp b/cli/CommandExecutor.hpp
index a1d9af9..b214884 100644
--- a/cli/CommandExecutor.hpp
+++ b/cli/CommandExecutor.hpp
@@ -39,15 +39,6 @@ namespace cli {
  *  @{
  */
 
-// Adding the max column width as 6  as the default initializer
-// as the length of the word Column is 6 characters.
-// This is used while describing the table.
-constexpr int kInitMaxColumnWidth = 6;
-
-constexpr char kDescribeDatabaseCommand[] = "\\dt";
-constexpr char kDescribeTableCommand[] = "\\d";
-constexpr char kAnalyzeCommand[] = "\\analyze";
-
 /**
   * @brief Executes the command by calling the command handler.
   *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4437b9d4/cli/Constants.hpp
----------------------------------------------------------------------
diff --git a/cli/Constants.hpp b/cli/Constants.hpp
new file mode 100644
index 0000000..1aaa5be
--- /dev/null
+++ b/cli/Constants.hpp
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_CONSTANTS_HPP_
+#define QUICKSTEP_CLI_CONSTANTS_HPP_
+
+namespace quickstep {
+namespace cli {
+
+/** \addtogroup CLI
+ *  @{
+ */
+
+// Adding the max column width as 6 as the default initializer
+// as the length of the word Column is 6 characters.
+// This is used while describing the table.
+constexpr int kInitMaxColumnWidth = 6;
+
+constexpr char kDescribeDatabaseCommand[] = "\\dt";
+constexpr char kDescribeTableCommand[] = "\\d";
+constexpr char kAnalyzeCommand[] = "\\analyze";
+
+/** @} */
+
+}  // namespace cli
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_CLI_CONSTANTS_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4437b9d4/utility/StringUtil.cpp
----------------------------------------------------------------------
diff --git a/utility/StringUtil.cpp b/utility/StringUtil.cpp
index 49ef337..2745457 100644
--- a/utility/StringUtil.cpp
+++ b/utility/StringUtil.cpp
@@ -93,6 +93,17 @@ std::string EscapeSpecialChars(const std::string& text) {
   return new_text;
 }
 
+std::string EscapeQuotes(const std::string &str, const char quote) {
+  std::string ret;
+  for (const char c : str) {
+    ret.push_back(c);
+    if (c == quote) {
+      ret.push_back(c);
+    }
+  }
+  return ret;
+}
+
 bool ParseIntString(const std::string &int_string,
                     const char delimiter,
                     std::vector<int> *parsed_output) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4437b9d4/utility/StringUtil.hpp
----------------------------------------------------------------------
diff --git a/utility/StringUtil.hpp b/utility/StringUtil.hpp
index 9aa57ab..abda8f3 100644
--- a/utility/StringUtil.hpp
+++ b/utility/StringUtil.hpp
@@ -48,6 +48,12 @@ extern std::string ToLower(const std::string &str);
 extern std::string EscapeSpecialChars(const std::string &text);
 
 /**
+ * @brief Escape quotes (i.e. ' or ") in 'str'.
+ * @return Escaped string.
+ */
+extern std::string EscapeQuotes(const std::string &str, const char quote);
+
+/**
  * @brief Join all objects in a iterable container into a single string. The object
  *        must have implemented the operator<< overloading with std::stringstream.
  *


[02/24] incubator-quickstep git commit: patches for missed linenoise changes

Posted by zu...@apache.org.
patches for missed linenoise changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1572762a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1572762a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1572762a

Branch: refs/heads/dist-patch
Commit: 1572762a666c1b61b1172beba6d67d3fef5a3a6b
Parents: f6480fb
Author: cramja <ma...@gmail.com>
Authored: Tue Feb 21 10:22:14 2017 -0600
Committer: cramja <ma...@gmail.com>
Committed: Tue Feb 21 10:22:14 2017 -0600

----------------------------------------------------------------------
 third_party/download_and_patch_prerequisites.sh |  4 +
 third_party/patches/linenoise/linenoise.c.patch | 89 ++++++++++++++++++++
 third_party/patches/linenoise/linenoise.h.patch | 29 +++++++
 3 files changed, 122 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1572762a/third_party/download_and_patch_prerequisites.sh
----------------------------------------------------------------------
diff --git a/third_party/download_and_patch_prerequisites.sh b/third_party/download_and_patch_prerequisites.sh
index b5f5cac..fd6106c 100755
--- a/third_party/download_and_patch_prerequisites.sh
+++ b/third_party/download_and_patch_prerequisites.sh
@@ -89,7 +89,11 @@ do
 done
 
 # Apply patches now.
+
+# Apply linenoise patch
 cp ${PATCH_DIR}/linenoise/CMakeLists.txt ${THIRD_PARTY_SRC_DIR}/linenoise
+patch ${THIRD_PARTY_SRC_DIR}/linenoise/linenoise.h ${PATCH_DIR}/linenoise/linenoise.h.patch
+patch ${THIRD_PARTY_SRC_DIR}/linenoise/linenoise.c ${PATCH_DIR}/linenoise/linenoise.c.patch
 
 # Apply gflags patch.
 echo "Patching for gflags:"

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1572762a/third_party/patches/linenoise/linenoise.c.patch
----------------------------------------------------------------------
diff --git a/third_party/patches/linenoise/linenoise.c.patch b/third_party/patches/linenoise/linenoise.c.patch
new file mode 100644
index 0000000..cea6162
--- /dev/null
+++ b/third_party/patches/linenoise/linenoise.c.patch
@@ -0,0 +1,89 @@
+--- linenoise.c.new	2015-04-13 02:38:43.000000000 -0500
++++ linenoise.c	2017-02-21 09:47:42.000000000 -0600
+@@ -1,7 +1,5 @@
+-/* linenoise.c -- VERSION 1.0
+- *
+- * Guerrilla line editing library against the idea that a line editing lib
+- * needs to be 20,000 lines of C code.
++/* linenoise.c -- guerrilla line editing library against the idea that a
++ * line editing lib needs to be 20,000 lines of C code.
+  *
+  * You can find the latest source code at:
+  *
+@@ -120,6 +118,7 @@
+ 
+ #define LINENOISE_DEFAULT_HISTORY_MAX_LEN 100
+ #define LINENOISE_MAX_LINE 4096
++#define LINENOISE_TRIM_NEWLINE 0
+ static char *unsupported_term[] = {"dumb","cons25","emacs",NULL};
+ static linenoiseCompletionCallback *completionCallback = NULL;
+ 
+@@ -774,6 +773,10 @@
+             history_len--;
+             free(history[history_len]);
+             if (mlmode) linenoiseEditMoveEnd(&l);
++#if !LINENOISE_TRIM_NEWLINE
++            l.buf[l.len++] = '\n';
++            l.buf[l.len] = '\0';
++#endif
+             return (int)l.len;
+         case CTRL_C:     /* ctrl-c */
+             errno = EAGAIN;
+@@ -940,10 +943,12 @@
+         /* Not a tty: read from file / pipe. */
+         if (fgets(buf, buflen, stdin) == NULL) return -1;
+         count = strlen(buf);
++#if LINENOISE_TRIM_NEWLINE
+         if (count && buf[count-1] == '\n') {
+             count--;
+             buf[count] = '\0';
+         }
++#endif
+     } else {
+         /* Interactive editing. */
+         if (enableRawMode(STDIN_FILENO) == -1) return -1;
+@@ -970,10 +975,12 @@
+         fflush(stdout);
+         if (fgets(buf,LINENOISE_MAX_LINE,stdin) == NULL) return NULL;
+         len = strlen(buf);
++#if LINENOISE_TRIM_NEWLINE
+         while(len && (buf[len-1] == '\n' || buf[len-1] == '\r')) {
+             len--;
+             buf[len] = '\0';
+         }
++#endif
+         return strdup(buf);
+     } else {
+         count = linenoiseRaw(buf,LINENOISE_MAX_LINE,prompt);
+@@ -1021,12 +1028,29 @@
+         memset(history,0,(sizeof(char*)*history_max_len));
+     }
+ 
++#if LINENOISE_TRIM_NEWLINE
+     /* Don't add duplicated lines. */
+     if (history_len && !strcmp(history[history_len-1], line)) return 0;
+ 
+-    /* Add an heap allocated copy of the line in the history.
+-     * If we reached the max length, remove the older line. */
+     linecopy = strdup(line);
++#else
++    /* Remove trailing newlines so that editing from history doesn't get all wonky. */
++    size_t line_len = strlen(line);
++    while ((line_len > 0) && (line[line_len - 1] == '\n')) {
++      --line_len;
++    }
++    linecopy = (char*) malloc(line_len + 1);
++    memcpy(linecopy, line, line_len);
++    linecopy[line_len] = '\0';
++
++    /* Don't add duplicated lines. */
++    if (history_len && !strcmp(history[history_len-1], linecopy)) {
++        free(linecopy);
++        return 0;
++    }
++#endif
++
++    /* If we reached the max length, remove the older line. */
+     if (!linecopy) return 0;
+     if (history_len == history_max_len) {
+         free(history[0]);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1572762a/third_party/patches/linenoise/linenoise.h.patch
----------------------------------------------------------------------
diff --git a/third_party/patches/linenoise/linenoise.h.patch b/third_party/patches/linenoise/linenoise.h.patch
new file mode 100644
index 0000000..feb597a
--- /dev/null
+++ b/third_party/patches/linenoise/linenoise.h.patch
@@ -0,0 +1,29 @@
+--- linenoise.h.new	2015-04-13 02:38:43.000000000 -0500
++++ linenoise.h	2017-02-21 09:44:05.000000000 -0600
+@@ -1,7 +1,5 @@
+-/* linenoise.h -- VERSION 1.0
+- *
+- * Guerrilla line editing library against the idea that a line editing lib
+- * needs to be 20,000 lines of C code.
++/* linenoise.h -- guerrilla line editing library against the idea that a
++ * line editing lib needs to be 20,000 lines of C code.
+  *
+  * See linenoise.c for more information.
+  *
+@@ -36,9 +34,16 @@
+  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+  */
+ 
++/* Modified from original by Craig Chasseur as follows:
++ *     - include stddef.h in header for size_t
++ *     - do not trim newlines from end of input
++ */
++
+ #ifndef __LINENOISE_H
+ #define __LINENOISE_H
+ 
++#include <stddef.h>
++
+ #ifdef __cplusplus
+ extern "C" {
+ #endif


[07/24] incubator-quickstep git commit: Added HDFS Support For TextScanWorkOrder.

Posted by zu...@apache.org.
Added HDFS Support For TextScanWorkOrder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c9d1f22e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c9d1f22e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c9d1f22e

Branch: refs/heads/dist-patch
Commit: c9d1f22e75a016f47d3f241a1b78efcbce0e5d52
Parents: 17477f5
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Feb 6 14:42:42 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Feb 24 11:10:40 2017 -0800

----------------------------------------------------------------------
 cli/distributed/Executor.cpp                    |   2 +-
 query_execution/CMakeLists.txt                  |   1 +
 query_execution/Shiftboss.cpp                   |   3 +-
 query_execution/Shiftboss.hpp                   |  14 +++
 .../DistributedExecutionGeneratorTestRunner.cpp |   3 +-
 relational_operators/CMakeLists.txt             |   5 +
 relational_operators/TextScanOperator.cpp       | 107 ++++++++++++++++---
 relational_operators/TextScanOperator.hpp       |  10 +-
 relational_operators/WorkOrderFactory.cpp       |   6 +-
 relational_operators/WorkOrderFactory.hpp       |   4 +-
 storage/FileManagerHdfs.hpp                     |   9 ++
 storage/StorageManager.cpp                      |   9 ++
 storage/StorageManager.hpp                      |   8 +-
 13 files changed, 160 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/cli/distributed/Executor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp
index 1d03579..3485298 100644
--- a/cli/distributed/Executor.cpp
+++ b/cli/distributed/Executor.cpp
@@ -76,7 +76,7 @@ void Executor::init() {
   data_exchanger_.start();
 
   shiftboss_ =
-      make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get());
+      make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get(), storage_manager_->hdfs());
   shiftboss_->start();
 
   for (const auto &worker : workers_) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 50bf694..12d6be0 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -295,6 +295,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_WorkerMessage
                         quickstep_relationaloperators_RebuildWorkOrder
                         quickstep_relationaloperators_WorkOrderFactory
+                        quickstep_storage_Flags
                         quickstep_storage_InsertDestination
                         quickstep_storage_StorageBlock
                         quickstep_storage_StorageManager

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 2ed42d0..bae5205 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -104,7 +104,8 @@ void Shiftboss::run() {
                                                                        query_contexts_[query_id].get(),
                                                                        storage_manager_,
                                                                        shiftboss_client_id_,
-                                                                       bus_);
+                                                                       bus_,
+                                                                       hdfs_);
 
         unique_ptr<WorkerMessage> worker_message(
             WorkerMessage::WorkOrderMessage(work_order, proto.operator_index()));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 6538d48..e0b4312 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -30,6 +30,8 @@
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/WorkerDirectory.hpp"
+#include "storage/Flags.hpp"
+#include "storage/StorageConfig.h"  // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS.
 #include "threading/Thread.hpp"
 #include "utility/Macros.hpp"
 
@@ -64,6 +66,7 @@ class Shiftboss : public Thread {
    * @param bus A pointer to the TMB.
    * @param storage_manager The StorageManager to use.
    * @param workers A pointer to the WorkerDirectory.
+   * @param hdfs The HDFS connector via libhdfs3.
    * @param cpu_id The ID of the CPU to which the Shiftboss thread can be pinned.
    *
    * @note If cpu_id is not specified, Shiftboss thread can be possibly moved
@@ -72,10 +75,12 @@ class Shiftboss : public Thread {
   Shiftboss(tmb::MessageBus *bus,
             StorageManager *storage_manager,
             WorkerDirectory *workers,
+            void *hdfs,
             const int cpu_id = -1)
       : bus_(DCHECK_NOTNULL(bus)),
         storage_manager_(DCHECK_NOTNULL(storage_manager)),
         workers_(DCHECK_NOTNULL(workers)),
+        hdfs_(hdfs),
         cpu_id_(cpu_id),
         shiftboss_client_id_(tmb::kClientIdNone),
         foreman_client_id_(tmb::kClientIdNone),
@@ -84,6 +89,12 @@ class Shiftboss : public Thread {
     // Check to have at least one Worker.
     DCHECK_GT(workers->getNumWorkers(), 0u);
 
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+    if (FLAGS_use_hdfs) {
+      CHECK(hdfs_);
+    }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
     shiftboss_client_id_ = bus_->Connect();
     LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_;
     DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone);
@@ -228,6 +239,9 @@ class Shiftboss : public Thread {
   StorageManager *storage_manager_;
   WorkerDirectory *workers_;
 
+  // Not owned.
+  void *hdfs_;
+
   // The ID of the CPU that the Shiftboss thread can optionally be pinned to.
   const int cpu_id_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 2e18467..c9f5a10 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -128,7 +128,8 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
 
     data_exchangers_[i].set_storage_manager(storage_manager.get());
     shiftbosses_.push_back(
-        make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get()));
+        make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get(),
+                               storage_manager->hdfs()));
 
     storage_managers_.push_back(move(storage_manager));
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 457d58a..1693ec2 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -491,6 +491,7 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator
                       quickstep_relationaloperators_RelationalOperator
                       quickstep_relationaloperators_WorkOrder
                       quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_storage_Flags
                       quickstep_storage_InsertDestination
                       quickstep_types_Type
                       quickstep_types_TypedValue
@@ -500,6 +501,10 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator
                       quickstep_utility_Glob
                       quickstep_utility_Macros
                       tmb)
+if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
+  target_link_libraries(quickstep_relationaloperators_TextScanOperator
+                        ${LIBHDFS3_LIBRARIES})
+endif(QUICKSTEP_HAVE_FILE_MANAGER_HDFS)
 target_link_libraries(quickstep_relationaloperators_UpdateOperator
                       glog
                       quickstep_catalog_CatalogRelation

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 0a83a85..6333813 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -41,7 +41,14 @@
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
 #include "relational_operators/WorkOrder.pb.h"
+#include "storage/Flags.hpp"
 #include "storage/InsertDestination.hpp"
+#include "storage/StorageConfig.h"  // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS.
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+#include <hdfs/hdfs.h>
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
@@ -205,14 +212,56 @@ void TextScanWorkOrder::execute() {
 
   std::vector<TypedValue> vector_tuple_returned;
   constexpr std::size_t kSmallBufferSize = 0x4000;
-  char *buffer = reinterpret_cast<char *>(malloc(std::max(text_segment_size_, kSmallBufferSize)));
-
-  // Read text segment into buffer.
-  FILE *file = std::fopen(filename_.c_str(), "rb");
-  std::fseek(file, text_offset_, SEEK_SET);
-  std::size_t bytes_read = std::fread(buffer, 1, text_segment_size_, file);
-  if (bytes_read != text_segment_size_) {
-    throw TextScanReadError(filename_);
+  const size_t buffer_size = std::max(text_segment_size_, kSmallBufferSize);
+  char *buffer = reinterpret_cast<char *>(malloc(buffer_size));
+
+  bool use_hdfs = false;
+  std::size_t bytes_read;
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  hdfsFS hdfs = nullptr;
+  hdfsFile file_handle = nullptr;
+
+  if (FLAGS_use_hdfs) {
+    use_hdfs = true;
+    hdfs = static_cast<hdfsFS>(hdfs_);
+
+    file_handle = hdfsOpenFile(hdfs, filename_.c_str(), O_RDONLY, buffer_size,
+                               0 /* default replication */, 0 /* default block size */);
+    if (file_handle == nullptr) {
+      LOG(ERROR) << "Failed to open file " << filename_ << " with error: " << strerror(errno);
+      return;
+    }
+
+    if (hdfsSeek(hdfs, file_handle, text_offset_)) {
+      LOG(ERROR) << "Failed to seek in file " << filename_ << " with error: " << strerror(errno);
+
+      hdfsCloseFile(hdfs, file_handle);
+      return;
+    }
+
+    bytes_read = hdfsRead(hdfs, file_handle, buffer, text_segment_size_);
+    if (bytes_read != text_segment_size_) {
+      hdfsCloseFile(hdfs, file_handle);
+      throw TextScanReadError(filename_);
+    }
+  }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
+  FILE *file = nullptr;
+  if (!use_hdfs) {
+    // Avoid unused-private-field warning.
+    (void) hdfs_;
+
+    // Read text segment into buffer.
+    file = std::fopen(filename_.c_str(), "rb");
+    std::fseek(file, text_offset_, SEEK_SET);
+    bytes_read = std::fread(buffer, 1, text_segment_size_, file);
+
+    if (bytes_read != text_segment_size_) {
+      std::fclose(file);
+      throw TextScanReadError(filename_);
+    }
   }
 
   // Locate the first newline character.
@@ -266,10 +315,36 @@ void TextScanWorkOrder::execute() {
   // that the last tuple is very small / very large.
   std::size_t dynamic_read_size = 1024;
   std::string row_string;
-  std::fseek(file, text_offset_ + (end_ptr - buffer), SEEK_SET);
+
+  const size_t dynamic_read_offset = text_offset_ + (end_ptr - buffer);
+  if (use_hdfs) {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+    if (hdfsSeek(hdfs, file_handle, dynamic_read_offset)) {
+      LOG(ERROR) << "Failed to seek in file " << filename_ << " with error: " << strerror(errno);
+
+      hdfsCloseFile(hdfs, file_handle);
+      return;
+    }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  } else {
+    std::fseek(file, dynamic_read_offset, SEEK_SET);
+  }
+
   bool has_reached_end = false;
   do {
-    bytes_read = std::fread(buffer, 1, dynamic_read_size, file);
+    if (use_hdfs) {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+      bytes_read = hdfsRead(hdfs, file_handle, buffer, dynamic_read_size);
+
+      // Read again when acrossing the HDFS block boundary.
+      if (bytes_read != dynamic_read_size) {
+        bytes_read += hdfsRead(hdfs, file_handle, buffer + bytes_read, dynamic_read_size - bytes_read);
+      }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+    } else {
+      bytes_read = std::fread(buffer, 1, dynamic_read_size, file);
+    }
+
     std::size_t bytes_to_copy = bytes_read;
 
     for (std::size_t i = 0; i < bytes_read; ++i) {
@@ -303,7 +378,14 @@ void TextScanWorkOrder::execute() {
     }
   }
 
-  std::fclose(file);
+  if (use_hdfs) {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+    hdfsCloseFile(hdfs, file_handle);
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  } else {
+    std::fclose(file);
+  }
+
   free(buffer);
 
   // Store the tuples in a ColumnVectorsValueAccessor for bulk insert.
@@ -334,7 +416,8 @@ void TextScanWorkOrder::execute() {
 }
 
 std::vector<TypedValue> TextScanWorkOrder::parseRow(const char **row_ptr,
-                                  const CatalogRelationSchema &relation, bool *is_faulty) const {
+                                                    const CatalogRelationSchema &relation,
+                                                    bool *is_faulty) const {
   std::vector<TypedValue> attribute_values;
   // Always assume current row is not faulty initially.
   *is_faulty = false;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index eada190..59821fc 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -189,6 +189,7 @@ class TextScanWorkOrder : public WorkOrder {
    * @param process_escape_sequences Whether to decode escape sequences in the
    *        text file.
    * @param output_destination The InsertDestination to insert tuples.
+   * @param hdfs The HDFS connector via libhdfs3.
    **/
   TextScanWorkOrder(
       const std::size_t query_id,
@@ -197,14 +198,16 @@ class TextScanWorkOrder : public WorkOrder {
       const std::size_t text_segment_size,
       const char field_terminator,
       const bool process_escape_sequences,
-      InsertDestination *output_destination)
+      InsertDestination *output_destination,
+      void *hdfs = nullptr)
       : WorkOrder(query_id),
         filename_(filename),
         text_offset_(text_offset),
         text_segment_size_(text_segment_size),
         field_terminator_(field_terminator),
         process_escape_sequences_(process_escape_sequences),
-        output_destination_(DCHECK_NOTNULL(output_destination)) {}
+        output_destination_(DCHECK_NOTNULL(output_destination)),
+        hdfs_(hdfs) {}
 
   ~TextScanWorkOrder() override {}
 
@@ -332,6 +335,9 @@ class TextScanWorkOrder : public WorkOrder {
 
   InsertDestination *output_destination_;
 
+  // Not owned.
+  void *hdfs_;
+
   DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index d2c8251..cf0ee74 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -75,7 +75,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
                                                   QueryContext *query_context,
                                                   StorageManager *storage_manager,
                                                   const tmb::client_id shiftboss_client_id,
-                                                  tmb::MessageBus *bus) {
+                                                  tmb::MessageBus *bus,
+                                                  void *hdfs) {
   DCHECK(query_context != nullptr);
   DCHECK(ProtoIsValid(proto, *catalog_database, *query_context))
       << "Attempted to create WorkOrder from an invalid proto description:\n"
@@ -473,7 +474,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
           proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
           query_context->getInsertDestination(
-              proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)));
+              proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
+          hdfs);
     }
     case serialization::UPDATE: {
       LOG(INFO) << "Creating UpdateWorkOrder in Shiftboss " << shiftboss_index;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/relational_operators/WorkOrderFactory.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.hpp b/relational_operators/WorkOrderFactory.hpp
index acf3855..ece687b 100644
--- a/relational_operators/WorkOrderFactory.hpp
+++ b/relational_operators/WorkOrderFactory.hpp
@@ -59,6 +59,7 @@ class WorkOrderFactory {
    * @param storage_manager The StorageManager to use.
    * @param shiftboss_client_id The TMB client id of Shiftboss.
    * @param bus A pointer to the TMB.
+   * @param hdfs The HDFS connector via libhdfs3.
    *
    * @return A new WorkOrder reconstructed from the supplied Protocol Buffer.
    **/
@@ -68,7 +69,8 @@ class WorkOrderFactory {
                                          QueryContext *query_context,
                                          StorageManager *storage_manager,
                                          const tmb::client_id shiftboss_client_id,
-                                         tmb::MessageBus *bus);
+                                         tmb::MessageBus *bus,
+                                         void *hdfs);
 
   /**
    * @brief Check whether a serialization::WorkOrder is fully-formed and

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/storage/FileManagerHdfs.hpp
----------------------------------------------------------------------
diff --git a/storage/FileManagerHdfs.hpp b/storage/FileManagerHdfs.hpp
index f47e4a8..a8feb50 100644
--- a/storage/FileManagerHdfs.hpp
+++ b/storage/FileManagerHdfs.hpp
@@ -55,6 +55,15 @@ class FileManagerHdfs : public FileManager {
 
   block_id_counter getMaxUsedBlockCounter(const block_id_domain block_domain) const override;
 
+  /**
+   * @brief Get the HDFS connector via libhdfs3.
+   *
+   * @return The HDFS connector.
+   **/
+  void* hdfs() {
+    return static_cast<void*>(hdfs_);
+  }
+
  private:
   // libhdfs3 has an API to release this pointer.
   hdfsFS hdfs_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 783ccfe..94e1b67 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -570,6 +570,15 @@ bool StorageManager::DataExchangerClientAsync::Pull(const block_id block,
   return true;
 }
 
+void* StorageManager::hdfs() {
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  if (FLAGS_use_hdfs) {
+    return static_cast<FileManagerHdfs*>(file_manager_.get())->hdfs();
+  }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  return nullptr;
+}
+
 vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id block) {
   serialization::BlockMessage proto;
   proto.set_block_id(block);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9d1f22e/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 42176ee..dc4b7e8 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -41,7 +41,6 @@
 #include "storage/StorageBlob.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageConfig.h"
 #include "storage/StorageConstants.hpp"
 #include "threading/SpinSharedMutex.hpp"
 #include "utility/Macros.hpp"
@@ -395,6 +394,13 @@ class StorageManager {
   void pullBlockOrBlob(const block_id block, PullResponse *response) const;
 #endif
 
+  /**
+   * @brief Get the HDFS connector via libhdfs3.
+   *
+   * @return The HDFS connector.
+   **/
+  void* hdfs();
+
  private:
   struct BlockHandle {
     void *block_memory;


[21/24] incubator-quickstep git commit: Set the default tmb_implementation value to purememory.

Posted by zu...@apache.org.
Set the default tmb_implementation value to purememory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3842c6e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3842c6e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3842c6e1

Branch: refs/heads/dist-patch
Commit: 3842c6e15611331417c11313b54b299240f05b14
Parents: 87bbb26
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Feb 11 11:17:55 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Mar 1 14:43:01 2017 -0800

----------------------------------------------------------------------
 third_party/src/tmb/src/tmb_net_server.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3842c6e1/third_party/src/tmb/src/tmb_net_server.cc
----------------------------------------------------------------------
diff --git a/third_party/src/tmb/src/tmb_net_server.cc b/third_party/src/tmb/src/tmb_net_server.cc
index c2be7ef..abe0efe 100644
--- a/third_party/src/tmb/src/tmb_net_server.cc
+++ b/third_party/src/tmb/src/tmb_net_server.cc
@@ -256,7 +256,7 @@ static bool ValidateTmbImplementation(const char *flagname,
                "Invalid value for --%s: %s\n", flagname, value.c_str());
   return false;
 }
-DEFINE_string(tmb_implementation, "nativelog",
+DEFINE_string(tmb_implementation, "purememory",
               "Which underlying TMB implementation to use. Valid choices are "
               "leveldb, nativelog, purememory, sqlite, voltdb, and zookeeper. "
               "Depending on how the TMB library was built, some of these may "


[08/24] incubator-quickstep git commit: Used two TMB implementations in Shiftboss.

Posted by zu...@apache.org.
Used two TMB implementations in Shiftboss.

  - Global TMB between Foreman and Shiftboss.
  - Local TMB between Workers and Shiftboss.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b88625d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b88625d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b88625d8

Branch: refs/heads/dist-patch
Commit: b88625d80dbd7641e159a7c6bf959021fa2cac86
Parents: c9d1f22
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Feb 8 12:48:31 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Feb 24 11:15:41 2017 -0800

----------------------------------------------------------------------
 cli/distributed/Executor.cpp                    |   7 +-
 cli/distributed/Executor.hpp                    |   4 +
 query_execution/Shiftboss.cpp                   | 419 +++++++++++--------
 query_execution/Shiftboss.hpp                   |  92 +---
 .../DistributedExecutionGeneratorTestRunner.cpp |   8 +-
 .../DistributedExecutionGeneratorTestRunner.hpp |   1 +
 6 files changed, 279 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b88625d8/cli/distributed/Executor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp
index 3485298..e248fef 100644
--- a/cli/distributed/Executor.cpp
+++ b/cli/distributed/Executor.cpp
@@ -35,6 +35,7 @@
 
 #include "tmb/id_typedefs.h"
 #include "tmb/native_net_client_message_bus.h"
+#include "tmb/pure_memory_message_bus.h"
 
 #include "glog/logging.h"
 
@@ -47,6 +48,8 @@ using tmb::client_id;
 namespace quickstep {
 
 void Executor::init() {
+  bus_local_.Initialize();
+
   executor_client_id_ = bus_.Connect();
   DLOG(INFO) << "Executor TMB Client ID: " << executor_client_id_;
 
@@ -59,7 +62,7 @@ void Executor::init() {
   for (std::size_t worker_thread_index = 0;
        worker_thread_index < FLAGS_num_workers;
        ++worker_thread_index) {
-    workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_));
+    workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_local_));
     worker_client_ids.push_back(workers_.back()->getBusClientID());
   }
 
@@ -76,7 +79,7 @@ void Executor::init() {
   data_exchanger_.start();
 
   shiftboss_ =
-      make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get(), storage_manager_->hdfs());
+      make_unique<Shiftboss>(&bus_, &bus_local_, storage_manager_.get(), worker_directory_.get(), storage_manager_->hdfs());
   shiftboss_->start();
 
   for (const auto &worker : workers_) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b88625d8/cli/distributed/Executor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.hpp b/cli/distributed/Executor.hpp
index 6ffa756..aafeeae 100644
--- a/cli/distributed/Executor.hpp
+++ b/cli/distributed/Executor.hpp
@@ -24,6 +24,7 @@
 #include <vector>
 
 #include "cli/distributed/Role.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/Shiftboss.hpp"
 #include "query_execution/Worker.hpp"
 #include "query_execution/WorkerDirectory.hpp"
@@ -65,6 +66,9 @@ class Executor final : public Role {
   void run() override {}
 
  private:
+  // Used between Shiftboss and Workers.
+  MessageBusImpl bus_local_;
+
   tmb::client_id executor_client_id_;
 
   std::vector<std::unique_ptr<Worker>> workers_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b88625d8/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index bae5205..2f7dc3c 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -64,6 +64,91 @@ namespace quickstep {
 
 class WorkOrder;
 
+Shiftboss::Shiftboss(tmb::MessageBus *bus_global,
+                     tmb::MessageBus *bus_local,
+                     StorageManager *storage_manager,
+                     WorkerDirectory *workers,
+                     void *hdfs,
+                     const int cpu_id)
+    : bus_global_(DCHECK_NOTNULL(bus_global)),
+      bus_local_(DCHECK_NOTNULL(bus_local)),
+      storage_manager_(DCHECK_NOTNULL(storage_manager)),
+      workers_(DCHECK_NOTNULL(workers)),
+      hdfs_(hdfs),
+      cpu_id_(cpu_id),
+      shiftboss_client_id_global_(tmb::kClientIdNone),
+      shiftboss_client_id_local_(tmb::kClientIdNone),
+      foreman_client_id_(tmb::kClientIdNone),
+      max_msgs_per_worker_(1),
+      start_worker_index_(0u) {
+  // Check to have at least one Worker.
+  DCHECK_GT(workers->getNumWorkers(), 0u);
+
+#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+  if (FLAGS_use_hdfs) {
+    CHECK(hdfs_);
+  }
+#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
+
+  shiftboss_client_id_global_ = bus_global_->Connect();
+  LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_global_;
+  DCHECK_NE(shiftboss_client_id_global_, tmb::kClientIdNone);
+
+  shiftboss_client_id_local_ = bus_local_->Connect();
+  DCHECK_NE(shiftboss_client_id_local_, tmb::kClientIdNone);
+
+  // Messages between Foreman and Shiftboss.
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kShiftbossRegistrationMessage);
+  bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kShiftbossRegistrationResponseMessage);
+
+  bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kQueryInitiateMessage);
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kQueryInitiateResponseMessage);
+
+  bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kInitiateRebuildMessage);
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kInitiateRebuildResponseMessage);
+
+  bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kSaveQueryResultMessage);
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kSaveQueryResultResponseMessage);
+
+  // Message sent to Worker.
+  bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kShiftbossRegistrationResponseMessage);
+  bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kRebuildWorkOrderMessage);
+
+  // Forward the following message types from Foreman to Workers.
+  bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kWorkOrderMessage);
+  bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kWorkOrderMessage);
+
+  // Forward the following message types from Workers to Foreman.
+  bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kCatalogRelationNewBlockMessage);
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kCatalogRelationNewBlockMessage);
+
+  bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kDataPipelineMessage);
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kDataPipelineMessage);
+
+  bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kWorkOrderFeedbackMessage);
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kWorkOrderFeedbackMessage);
+
+  bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kWorkOrderCompleteMessage);
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kWorkOrderCompleteMessage);
+
+  bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kRebuildWorkOrderCompleteMessage);
+  bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kRebuildWorkOrderCompleteMessage);
+
+  // Clean up query execution states, i.e., QueryContext.
+  bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kQueryTeardownMessage);
+
+  // Stop itself.
+  bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kPoisonMessage);
+  // Stop all workers.
+  bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kPoisonMessage);
+
+  for (std::size_t i = 0; i < workers_->getNumWorkers(); ++i) {
+    worker_addresses_.AddRecipient(workers_->getClientID(i));
+  }
+
+  registerWithForeman();
+}
+
 void Shiftboss::run() {
   if (cpu_id_ >= 0) {
     // We can pin the shiftboss thread to a CPU if specified.
@@ -73,159 +158,161 @@ void Shiftboss::run() {
   processShiftbossRegistrationResponseMessage();
 
   for (;;) {
-    // Receive() is a blocking call, causing this thread to sleep until next
-    // message is received.
-    AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
-    DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-               << "') received the typed '" << annotated_message.tagged_message.message_type()
-               << "' message from client " << annotated_message.sender;
-    switch (annotated_message.tagged_message.message_type()) {
-      case kQueryInitiateMessage: {
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::QueryInitiateMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context());
-        break;
-      }
-      case kWorkOrderMessage: {
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::WorkOrderMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        const std::size_t query_id = proto.query_id();
-        DCHECK_EQ(1u, query_contexts_.count(query_id));
-
-        WorkOrder *work_order = WorkOrderFactory::ReconstructFromProto(proto.work_order(),
-                                                                       shiftboss_index_,
-                                                                       &database_cache_,
-                                                                       query_contexts_[query_id].get(),
-                                                                       storage_manager_,
-                                                                       shiftboss_client_id_,
-                                                                       bus_,
-                                                                       hdfs_);
-
-        unique_ptr<WorkerMessage> worker_message(
-            WorkerMessage::WorkOrderMessage(work_order, proto.operator_index()));
-
-        TaggedMessage worker_tagged_message(worker_message.get(),
-                                            sizeof(*worker_message),
-                                            kWorkOrderMessage);
-
-        const size_t worker_index = getSchedulableWorker();
-        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-                   << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
-                   << "') from Foreman to worker " << worker_index;
-
-        const MessageBus::SendStatus send_status =
-            QueryExecutionUtil::SendTMBMessage(bus_,
-                                               shiftboss_client_id_,
-                                               workers_->getClientID(worker_index),
-                                               move(worker_tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK);
-        break;
-      }
-      case kInitiateRebuildMessage: {
-        // Construct rebuild work orders, and send back their number to
-        // 'ForemanDistributed'.
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::InitiateRebuildMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processInitiateRebuildMessage(proto.query_id(),
-                                      proto.operator_index(),
-                                      proto.insert_destination_index(),
-                                      proto.relation_id());
-        break;
-      }
-      case kCatalogRelationNewBlockMessage:  // Fall through.
-      case kDataPipelineMessage:
-      case kWorkOrderFeedbackMessage:
-      case kWorkOrderCompleteMessage:
-      case kRebuildWorkOrderCompleteMessage: {
-        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-                   << "') forwarded typed '" << annotated_message.tagged_message.message_type()
-                   << "' message from Worker with TMB client ID '" << annotated_message.sender
-                   << "' to Foreman with TMB client ID " << foreman_client_id_;
-
-        DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
-        const MessageBus::SendStatus send_status =
-            QueryExecutionUtil::SendTMBMessage(bus_,
-                                               shiftboss_client_id_,
-                                               foreman_client_id_,
-                                               move(annotated_message.tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK);
-        break;
-      }
-      case kQueryTeardownMessage: {
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    AnnotatedMessage annotated_message;
+    if (bus_global_->ReceiveIfAvailable(shiftboss_client_id_global_, &annotated_message, 0, true)) {
+      DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
+                 << "') received the typed '" << annotated_message.tagged_message.message_type()
+                 << "' message from Foreman " << annotated_message.sender;
+      switch (annotated_message.tagged_message.message_type()) {
+        case kQueryInitiateMessage: {
+          const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+          serialization::QueryInitiateMessage proto;
+          CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+          processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context());
+          break;
+        }
+        case kWorkOrderMessage: {
+          const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+          serialization::WorkOrderMessage proto;
+          CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+          const std::size_t query_id = proto.query_id();
+          DCHECK_EQ(1u, query_contexts_.count(query_id));
+
+          unique_ptr<WorkOrder> work_order(
+              WorkOrderFactory::ReconstructFromProto(proto.work_order(), shiftboss_index_, &database_cache_,
+                                                     query_contexts_[query_id].get(), storage_manager_,
+                                                     shiftboss_client_id_local_, bus_local_, hdfs_));
+
+          unique_ptr<WorkerMessage> worker_message(
+              WorkerMessage::WorkOrderMessage(work_order.release(), proto.operator_index()));
+
+          TaggedMessage worker_tagged_message(worker_message.get(),
+                                              sizeof(*worker_message),
+                                              kWorkOrderMessage);
+
+          const size_t worker_index = getSchedulableWorker();
+          DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_local_
+                     << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
+                     << "') from Foreman to worker " << worker_index;
+
+          const MessageBus::SendStatus send_status =
+              QueryExecutionUtil::SendTMBMessage(bus_local_,
+                                                 shiftboss_client_id_local_,
+                                                 workers_->getClientID(worker_index),
+                                                 move(worker_tagged_message));
+          CHECK(send_status == MessageBus::SendStatus::kOK);
+          break;
+        }
+        case kInitiateRebuildMessage: {
+          // Construct rebuild work orders, and send back their number to
+          // 'ForemanDistributed'.
+          const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+          serialization::InitiateRebuildMessage proto;
+          CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+          processInitiateRebuildMessage(proto.query_id(),
+                                        proto.operator_index(),
+                                        proto.insert_destination_index(),
+                                        proto.relation_id());
+          break;
+        }
+        case kQueryTeardownMessage: {
+          const TaggedMessage &tagged_message = annotated_message.tagged_message;
 
-        serialization::QueryTeardownMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+          serialization::QueryTeardownMessage proto;
+          CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
 
-        query_contexts_.erase(proto.query_id());
-        break;
+          query_contexts_.erase(proto.query_id());
+          break;
+        }
+        case kSaveQueryResultMessage: {
+          const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+          serialization::SaveQueryResultMessage proto;
+          CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+          for (int i = 0; i < proto.blocks_size(); ++i) {
+            storage_manager_->saveBlockOrBlob(proto.blocks(i));
+          }
+
+          // Clean up query execution states, i.e., QueryContext.
+          query_contexts_.erase(proto.query_id());
+
+          serialization::SaveQueryResultResponseMessage proto_response;
+          proto_response.set_query_id(proto.query_id());
+          proto_response.set_relation_id(proto.relation_id());
+          proto_response.set_cli_id(proto.cli_id());
+          proto_response.set_shiftboss_index(shiftboss_index_);
+
+          const size_t proto_response_length = proto_response.ByteSize();
+          char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
+          CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
+
+          TaggedMessage message_response(static_cast<const void*>(proto_response_bytes),
+                                         proto_response_length,
+                                         kSaveQueryResultResponseMessage);
+          free(proto_response_bytes);
+
+          DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
+                     << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
+                     << "') to Foreman with TMB client ID " << foreman_client_id_;
+          const MessageBus::SendStatus send_status =
+              QueryExecutionUtil::SendTMBMessage(bus_global_,
+                                                 shiftboss_client_id_global_,
+                                                 foreman_client_id_,
+                                                 move(message_response));
+          CHECK(send_status == MessageBus::SendStatus::kOK);
+          break;
+        }
+        case kPoisonMessage: {
+          DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
+                     << "') forwarded PoisonMessage (typed '" << kPoisonMessage
+                     << "') from Foreman to all workers";
+
+          tmb::MessageStyle broadcast_style;
+          broadcast_style.Broadcast(true);
+
+          const MessageBus::SendStatus send_status =
+              bus_local_->Send(shiftboss_client_id_local_, worker_addresses_, broadcast_style,
+                               move(annotated_message.tagged_message));
+          CHECK(send_status == MessageBus::SendStatus::kOK);
+          return;
+        }
+        default: {
+          LOG(FATAL) << "Unknown TMB message type";
+        }
       }
-      case kSaveQueryResultMessage: {
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::SaveQueryResultMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+    }
 
-        for (int i = 0; i < proto.blocks_size(); ++i) {
-          storage_manager_->saveBlockOrBlob(proto.blocks(i));
+    while (bus_local_->ReceiveIfAvailable(shiftboss_client_id_local_, &annotated_message, 0, true)) {
+      switch (annotated_message.tagged_message.message_type()) {
+        case kCatalogRelationNewBlockMessage:
+        case kDataPipelineMessage:
+        case kWorkOrderFeedbackMessage:
+        case kWorkOrderCompleteMessage:
+        case kRebuildWorkOrderCompleteMessage: {
+          DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
+                     << "') forwarded typed '" << annotated_message.tagged_message.message_type()
+                     << "' message from Worker with TMB client ID '" << annotated_message.sender
+                     << "' to Foreman with TMB client ID " << foreman_client_id_;
+
+          DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
+          const MessageBus::SendStatus send_status =
+              QueryExecutionUtil::SendTMBMessage(bus_global_,
+                                                 shiftboss_client_id_global_,
+                                                 foreman_client_id_,
+                                                 move(annotated_message.tagged_message));
+          CHECK(send_status == MessageBus::SendStatus::kOK);
+          break;
+        }
+        default: {
+          LOG(FATAL) << "Unknown TMB message type";
         }
-
-        // Clean up query execution states, i.e., QueryContext.
-        query_contexts_.erase(proto.query_id());
-
-        serialization::SaveQueryResultResponseMessage proto_response;
-        proto_response.set_query_id(proto.query_id());
-        proto_response.set_relation_id(proto.relation_id());
-        proto_response.set_cli_id(proto.cli_id());
-        proto_response.set_shiftboss_index(shiftboss_index_);
-
-        const size_t proto_response_length = proto_response.ByteSize();
-        char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
-        CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
-
-        TaggedMessage message_response(static_cast<const void*>(proto_response_bytes),
-                                       proto_response_length,
-                                       kSaveQueryResultResponseMessage);
-        free(proto_response_bytes);
-
-        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-                   << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
-                   << "') to Foreman with TMB client ID " << foreman_client_id_;
-        const MessageBus::SendStatus send_status =
-            QueryExecutionUtil::SendTMBMessage(bus_,
-                                               shiftboss_client_id_,
-                                               foreman_client_id_,
-                                               move(message_response));
-        CHECK(send_status == MessageBus::SendStatus::kOK);
-        break;
-      }
-      case kPoisonMessage: {
-        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-                   << "') forwarded PoisonMessage (typed '" << kPoisonMessage
-                   << "') from Foreman to all workers";
-
-        tmb::MessageStyle broadcast_style;
-        broadcast_style.Broadcast(true);
-
-        const MessageBus::SendStatus send_status =
-            bus_->Send(shiftboss_client_id_,
-                       worker_addresses_,
-                       broadcast_style,
-                       move(annotated_message.tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK);
-        return;
-      }
-      default: {
-        LOG(FATAL) << "Unknown TMB message type";
       }
     }
   }
@@ -265,21 +352,21 @@ void Shiftboss::registerWithForeman() {
                         kShiftbossRegistrationMessage);
   free(proto_bytes);
 
-  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
+  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
              << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
              << "') to all";
   tmb::MessageBus::SendStatus send_status =
-      bus_->Send(shiftboss_client_id_, all_addresses, style, move(message));
+      bus_global_->Send(shiftboss_client_id_global_, all_addresses, style, move(message));
   DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 void Shiftboss::processShiftbossRegistrationResponseMessage() {
-  AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
+  AnnotatedMessage annotated_message(bus_global_->Receive(shiftboss_client_id_global_, 0, true));
   const TaggedMessage &tagged_message = annotated_message.tagged_message;
   DCHECK_EQ(kShiftbossRegistrationResponseMessage, tagged_message.message_type());
 
   foreman_client_id_ = annotated_message.sender;
-  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_local_
              << "') received the typed '" << kShiftbossRegistrationResponseMessage
              << "' message from ForemanDistributed with client " << foreman_client_id_;
 
@@ -290,10 +377,10 @@ void Shiftboss::processShiftbossRegistrationResponseMessage() {
   storage_manager_->sendBlockDomainToShiftbossIndexMessage(shiftboss_index_);
 
   // Forward this message to Workers regarding <shiftboss_index_>.
-  QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_,
+  QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_local_,
                                        worker_addresses_,
                                        move(annotated_message.tagged_message),
-                                       bus_);
+                                       bus_local_);
 }
 
 void Shiftboss::processQueryInitiateMessage(
@@ -303,7 +390,7 @@ void Shiftboss::processQueryInitiateMessage(
   database_cache_.update(catalog_database_cache_proto);
 
   auto query_context = std::make_unique<QueryContext>(
-      query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_, bus_);
+      query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_local_, bus_local_);
   query_contexts_.emplace(query_id, move(query_context));
 
   serialization::QueryInitiateResponseMessage proto;
@@ -318,12 +405,12 @@ void Shiftboss::processQueryInitiateMessage(
                                  kQueryInitiateResponseMessage);
   free(proto_bytes);
 
-  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
+  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
              << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
              << "') to Foreman with TMB client ID " << foreman_client_id_;
   const MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         shiftboss_client_id_,
+      QueryExecutionUtil::SendTMBMessage(bus_global_,
+                                         shiftboss_client_id_global_,
                                          foreman_client_id_,
                                          move(message_response));
   CHECK(send_status == MessageBus::SendStatus::kOK);
@@ -357,12 +444,12 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                                  kInitiateRebuildResponseMessage);
   free(proto_bytes);
 
-  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
+  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
              << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
              << "') to Foreman with TMB client ID " << foreman_client_id_;
   const MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         shiftboss_client_id_,
+      QueryExecutionUtil::SendTMBMessage(bus_global_,
+                                         shiftboss_client_id_global_,
                                          foreman_client_id_,
                                          move(message_response));
   CHECK(send_status == MessageBus::SendStatus::kOK);
@@ -375,8 +462,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                              move(partially_filled_block_refs[i]),
                              op_index,
                              rel_id,
-                             shiftboss_client_id_,
-                             bus_);
+                             shiftboss_client_id_local_,
+                             bus_local_);
 
     unique_ptr<WorkerMessage> worker_message(
         WorkerMessage::RebuildWorkOrderMessage(rebuild_work_order, op_index));
@@ -386,13 +473,13 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                                         kRebuildWorkOrderMessage);
 
     const size_t worker_index = getSchedulableWorker();
-    DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
+    DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_local_
                << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
                << "') to worker " << worker_index;
 
     const MessageBus::SendStatus send_status =
-        QueryExecutionUtil::SendTMBMessage(bus_,
-                                           shiftboss_client_id_,
+        QueryExecutionUtil::SendTMBMessage(bus_local_,
+                                           shiftboss_client_id_local_,
                                            workers_->getClientID(worker_index),
                                            move(worker_tagged_message));
     CHECK(send_status == MessageBus::SendStatus::kOK);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b88625d8/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index e0b4312..05457bd 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -39,7 +39,8 @@
 
 #include "tmb/address.h"
 #include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
+
+namespace tmb { class MessageBus; };
 
 namespace quickstep {
 
@@ -63,7 +64,8 @@ class Shiftboss : public Thread {
   /**
    * @brief Constructor.
    *
-   * @param bus A pointer to the TMB.
+   * @param bus_global A pointer to the TMB for Foreman.
+   * @param bus_local A pointer to the TMB for Workers.
    * @param storage_manager The StorageManager to use.
    * @param workers A pointer to the WorkerDirectory.
    * @param hdfs The HDFS connector via libhdfs3.
@@ -72,84 +74,12 @@ class Shiftboss : public Thread {
    * @note If cpu_id is not specified, Shiftboss thread can be possibly moved
    *       around on different CPUs by the OS.
   **/
-  Shiftboss(tmb::MessageBus *bus,
+  Shiftboss(tmb::MessageBus *bus_global,
+            tmb::MessageBus *bus_local,
             StorageManager *storage_manager,
             WorkerDirectory *workers,
             void *hdfs,
-            const int cpu_id = -1)
-      : bus_(DCHECK_NOTNULL(bus)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)),
-        workers_(DCHECK_NOTNULL(workers)),
-        hdfs_(hdfs),
-        cpu_id_(cpu_id),
-        shiftboss_client_id_(tmb::kClientIdNone),
-        foreman_client_id_(tmb::kClientIdNone),
-        max_msgs_per_worker_(1),
-        start_worker_index_(0u) {
-    // Check to have at least one Worker.
-    DCHECK_GT(workers->getNumWorkers(), 0u);
-
-#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
-    if (FLAGS_use_hdfs) {
-      CHECK(hdfs_);
-    }
-#endif  // QUICKSTEP_HAVE_FILE_MANAGER_HDFS
-
-    shiftboss_client_id_ = bus_->Connect();
-    LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_;
-    DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone);
-
-    // Messages between Foreman and Shiftboss.
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationMessage);
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryInitiateMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryInitiateResponseMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
-
-    // Message sent to Worker.
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);
-
-    // Forward the following message types from Foreman to Workers.
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage);
-
-    // Forward the following message types from Workers to Foreman.
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kDataPipelineMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderFeedbackMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage);
-
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
-
-    // Clean up query execution states, i.e., QueryContext.
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage);
-
-    // Stop itself.
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);
-    // Stop all workers.
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kPoisonMessage);
-
-    for (std::size_t i = 0; i < workers_->getNumWorkers(); ++i) {
-      worker_addresses_.AddRecipient(workers_->getClientID(i));
-    }
-
-    registerWithForeman();
-  }
+            const int cpu_id = -1);
 
   ~Shiftboss() override {
   }
@@ -160,7 +90,7 @@ class Shiftboss : public Thread {
    * @return TMB client ID of shiftboss thread.
    **/
   inline tmb::client_id getBusClientID() const {
-    return shiftboss_client_id_;
+    return shiftboss_client_id_global_;
   }
 
   /**
@@ -231,9 +161,7 @@ class Shiftboss : public Thread {
                                      const QueryContext::insert_destination_id dest_index,
                                      const relation_id rel_id);
 
-  // TODO(zuyu): Use two buses for the message communication between Foreman and Shiftboss,
-  // and Shiftboss and Worker thread pool.
-  tmb::MessageBus *bus_;
+  tmb::MessageBus *bus_global_, *bus_local_;
 
   CatalogDatabaseCache database_cache_;
   StorageManager *storage_manager_;
@@ -245,7 +173,7 @@ class Shiftboss : public Thread {
   // The ID of the CPU that the Shiftboss thread can optionally be pinned to.
   const int cpu_id_;
 
-  tmb::client_id shiftboss_client_id_, foreman_client_id_;
+  tmb::client_id shiftboss_client_id_global_, shiftboss_client_id_local_, foreman_client_id_;
 
   // Unique per Shiftboss instance.
   std::uint64_t shiftboss_index_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b88625d8/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index c9f5a10..6bd7a1f 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -76,6 +76,7 @@ const char *DistributedExecutionGeneratorTestRunner::kResetOption =
 
 DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner(const string &storage_path)
     : query_id_(0),
+      bus_locals_(kNumInstances),
       data_exchangers_(kNumInstances) {
   bus_.Initialize();
 
@@ -113,7 +114,10 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
                                         kAnyNUMANodeID);
 
   for (int i = 0; i < kNumInstances; ++i) {
-    workers_.push_back(make_unique<Worker>(0 /* worker_thread_index */, &bus_));
+    tmb::MessageBus *bus_local = &bus_locals_[i];
+    bus_local->Initialize();
+
+    workers_.push_back(make_unique<Worker>(0 /* worker_thread_index */, bus_local));
 
     const vector<tmb::client_id> worker_client_ids(1, workers_.back()->getBusClientID());
     worker_directories_.push_back(
@@ -128,7 +132,7 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
 
     data_exchangers_[i].set_storage_manager(storage_manager.get());
     shiftbosses_.push_back(
-        make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get(),
+        make_unique<Shiftboss>(&bus_, bus_local, storage_manager.get(), worker_directories_.back().get(),
                                storage_manager->hdfs()));
 
     storage_managers_.push_back(move(storage_manager));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b88625d8/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
index 63e320d..2cd2427 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -129,6 +129,7 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner {
 
   std::unique_ptr<ForemanDistributed> foreman_;
 
+  std::vector<MessageBusImpl> bus_locals_;
   std::vector<std::unique_ptr<Worker>> workers_;
   std::vector<std::unique_ptr<WorkerDirectory>> worker_directories_;
   std::vector<DataExchangerAsync> data_exchangers_;



[18/24] incubator-quickstep git commit: Init release scripts Includes: Script to do common release tasks - subcommand to create artifacts - subcommand to publish release candidates - subcommand to test a release candidate

Posted by zu...@apache.org.
Init release scripts
Includes:
Script to do common release tasks
- subcommand to create artifacts
- subcommand to publish release candidates
- subcommand to test a release candidate

All commands are parameterized by the .profile file


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/6a240fcd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/6a240fcd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/6a240fcd

Branch: refs/heads/dist-patch
Commit: 6a240fcd1452abb7870a9e8ec82ed8f60f75e9c4
Parents: e896b61
Author: cramja <ma...@gmail.com>
Authored: Tue Feb 28 11:13:30 2017 -0600
Committer: cramja <ma...@gmail.com>
Committed: Wed Mar 1 16:07:58 2017 -0600

----------------------------------------------------------------------
 .gitattributes                  |   1 +
 release/.gitignore              |   3 +
 release/README.md               |  34 +++++++
 release/release_cmds.sh         | 172 +++++++++++++++++++++++++++++++++++
 release/release_manager.profile |  29 ++++++
 5 files changed, 239 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a240fcd/.gitattributes
----------------------------------------------------------------------
diff --git a/.gitattributes b/.gitattributes
new file mode 100644
index 0000000..c0aaef0
--- /dev/null
+++ b/.gitattributes
@@ -0,0 +1 @@
+release export-ignore

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a240fcd/release/.gitignore
----------------------------------------------------------------------
diff --git a/release/.gitignore b/release/.gitignore
new file mode 100644
index 0000000..0716d9f
--- /dev/null
+++ b/release/.gitignore
@@ -0,0 +1,3 @@
+.release_manager.profile
+svn-*
+apache-*

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a240fcd/release/README.md
----------------------------------------------------------------------
diff --git a/release/README.md b/release/README.md
new file mode 100644
index 0000000..5776e09
--- /dev/null
+++ b/release/README.md
@@ -0,0 +1,34 @@
+## Scripts to help release Quickstep
+
+### Preqs
+
+You must fill out `release_manager.profile` first.
+
+You'll need:
+  * md5sum
+  * shasum
+  * svn
+  * gpg
+  * tar
+  * gzip
+
+This is of course in addition to the regular build tools.
+
+### Usage Overview
+
+```
+# Test a candidate
+./release_cmds.sh test
+
+# Create candidate artifacts
+./release_cmds.sh create
+
+# Publish candidate artifacts (after you've created them)
+./release_cmds.sh publish
+```
+
+* A full guide to releases can be found on [confluence][cwiki-howto].
+* PGP key generation is discussed [here][pgp-keys]
+
+[cwiki-howto]: https://cwiki.apache.org/confluence/display/QUICKSTEP/How+To+Release
+[pgp-keys]: http://quickstep.apache.org/release-signing/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a240fcd/release/release_cmds.sh
----------------------------------------------------------------------
diff --git a/release/release_cmds.sh b/release/release_cmds.sh
new file mode 100755
index 0000000..d23b4ef
--- /dev/null
+++ b/release/release_cmds.sh
@@ -0,0 +1,172 @@
+#!/usr/bin/env bash
+
+# Functions for Releasing Quickstep
+# Usage: Fill out your details in release_manager.profile.
+#        ./release_cmds [create|publish|test]
+#
+# Note: This script is a scratch pad with most of the relevent commands. Modify 
+#       it to see fit. We'll finalize it when we understand the process better.
+#       
+#       Everything in this script assumes that you are running in the release/ 
+#       folder of your quickstep directory.
+#
+# PREREQS:
+#  - full details on how to do a release is in a confluence article: How to Release
+#  - You must have previously created a pgp key using your apache username
+#    and uploaded it to a keyserver [http://quickstep.apache.org/release-signing/]
+#
+
+### Helper functions
+
+create_artifacts() {
+  # Updates submodules, archives everything, signs it using your key (requires 
+  # user input), makes checksums, moves artifacts to release/ folder
+  #
+
+  export RELEASE_DIR=`pwd`
+  cd ..
+
+  # need the submodules to be included for the compile to work.
+  # likely, this will be a no-op
+  git submodule init
+  git submodule update
+
+  # you'll need to push this if you want the tag to be visible to committers
+  # the tag is necesary for the archiving to work correctly
+  git tag -a rc-$VERSION -m 'release candidate $VERSION'
+  git archive --format "tar" --prefix=$PROJECT_NAME-$VERSION/ -o $PROJECT_NAME-$VERSION.tar rc-$VERSION
+  git submodule foreach --recursive 'git archive --verbose --prefix=$PROJECT_NAME-$VERSION/$path/ --format tar master --output $RELEASE_DIR/submodule-$sha1.tar'
+  if [[ $(ls submodule-*.tar | wc -l) != 0  ]]; then
+    # combine all archives into one tar
+    tar --concatenate --file $PROJECT_NAME-$VERSION.tar submodule-*.tar
+    # remove sub tars
+    rm -rf submodule-*.tar
+  fi
+
+  # gzip final tar
+  gzip --force --verbose $PROJECT_NAME-$VERSION.tar
+
+  # Make the signature. This requires human input
+  gpg -u $APACHE_USERNAME@apache.org --armor --output $PROJECT_NAME-$VERSION.tar.gz.asc --detach-sign $PROJECT_NAME-$VERSION.tar.gz
+  # Make hashes
+  md5sum $PROJECT_NAME-$VERSION.tar.gz > $PROJECT_NAME-$VERSION.tar.gz.md5
+  sha1sum $PROJECT_NAME-$VERSION.tar.gz > $PROJECT_NAME-$VERSION.tar.gz.sha
+
+
+  # Make sure these three artifacts are good
+  gpg --verify $PROJECT_NAME-$VERSION.tar.gz.asc
+  md5sum --check $PROJECT_NAME-$VERSION.tar.gz.md5
+  sha1sum --check $PROJECT_NAME-$VERSION.tar.gz.sha
+
+  mv $PROJECT_NAME-$VERSION.tar.gz* $RELEASE_DIR
+
+  cd $RELEASE_DIR
+}
+
+publish_candidate() {
+  # push a RC to subversion. We use SVN sparse directories so as not to pull
+  # every artifact ever. Assumes that you have already created the artifacts
+  # and they reside in the release/ folder.
+  # directory layout is x.y.z/RCw, where w is the release candidate number
+  #
+
+  if [ ! -d "$SVN_DEV" ]; then
+    svn checkout --depth immediates $SVN_DEV_URL $SVN_DEV
+  fi
+
+  BASE_DIR=`pwd`
+  cd $SVN_DEV
+
+  if [ ! -d "$VERSION" ]; then
+    mkdir $VERSION
+    cd $VERSION
+  else
+    cd $VERSION
+    svn update --set-depth immediates .
+  fi
+
+  # now in $VERSION folder
+
+  RCFOLDER=RC$CANDIDATE
+  mkdir $RCFOLDER
+  cd $RCFOLDER
+  cp $BASE_DIR/$PROJECT_NAME-$VERSION.tar.gz* ./
+  cd ..
+
+  svn add $VERSION/$RCFOLDER
+  svn commit --username=$APACHE_USERNAME -m "Quickstep-$VERSION RC$CANDIDATE"
+
+  cd $BASE_DIR
+}
+
+test_candidate() {
+  # This is best run in /tmp
+  # tries checking out a release candidate and building
+  #
+
+  svn checkout --depth immediates $SVN_DEV_URL $SVN_DEV
+  cd $SVN_DEV
+
+  # add the KEYS file from quickstep to your gpg key ring
+  gpg --import KEYS
+
+  cd $VERSION
+  svn update --set-depth infinity RC$CANDIDATE
+  cd RC$CANDIDATE
+
+  tar -xzf $PROJECT_NAME-$VERSION.tar.gz
+
+  # verify artifacts
+  gpg --verify $PROJECT_NAME-$VERSION.tar.gz.asc
+  md5sum --check $PROJECT_NAME-$VERSION.tar.gz.md5
+  sha1sum --check $PROJECT_NAME-$VERSION.tar.gz.sha
+
+  # now build the release artifacts
+  cd $PROJECT_NAME-$VERSION
+
+  # first download third_party, then do your normal build
+  cd third_party/
+  ./download_and_patch_prerequisites.sh
+  cd ..
+  cd build
+  cmake ..
+  make
+}
+
+usage() {
+  echo "usage: $0 [create|publish|test]"
+  exit $1
+}
+
+### main
+
+source release_manager.profile
+if [ -z $APACHE_USERNAME ] || [ -z $VERSION ] || [ -z $CANDIDATE ]; then 
+  echo "please set release_manager.profile"
+  exit 1
+fi
+
+alias svn="svn --non-interactive"
+
+set -e
+
+if [ -z $1 ] ; then
+  usage 0
+fi
+
+echo "user: $APACHE_USERNAME"
+echo "version: $VERSION"
+echo "candidate: $CANDIDATE"
+
+if [ "create" == "$1" ] ; then
+  echo "creating artifacts ..."
+  create_artifacts
+elif [ "publish" == "$1" ] ; then
+  echo "publishing candidate artifacts ..."
+  publish_candidate 
+elif [ "test" == "$1" ] ; then
+  echo "testing candidate artifacts ..."
+  test_candidate
+else
+  usage 0
+fi

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a240fcd/release/release_manager.profile
----------------------------------------------------------------------
diff --git a/release/release_manager.profile b/release/release_manager.profile
new file mode 100644
index 0000000..6b51003
--- /dev/null
+++ b/release/release_manager.profile
@@ -0,0 +1,29 @@
+# Profile for Releasing Quickstep
+#
+# This script is for release managers. It is to be used in conjunction with 
+# the release_cmds script.
+#
+# Usage: Fill out this file with your apache details. Then source this file.
+#        Sourcing the release_cmds file will automatically source this.
+#
+
+# you must be registered with apache
+export APACHE_USERNAME=
+
+# quickstep version, Major.Minor.Increment
+export VERSION=
+
+# release candidate number. For a new version, this starts at 0
+export CANDIDATE=
+
+# folders corresponding to SVN repos where we keep our release artifacts
+export SVN_DEV=svn-quickstep-dev
+export SVN_REL=svn-quickstep-rel
+
+# you probably will not need to change things below this line
+export PROJECT_NAME=apache-quickstep-incubating
+
+# path to apache repo with quickstep releases
+
+export SVN_DEV_URL=https://dist.apache.org/repos/dist/dev/incubator/quickstep
+export SVN_REL_URL=https://dist.apache.org/repos/dist/rel/incubator/quickstep
\ No newline at end of file


[11/24] incubator-quickstep git commit: Fixed the lint issue.

Posted by zu...@apache.org.
Fixed the lint issue.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b24349cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b24349cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b24349cc

Branch: refs/heads/dist-patch
Commit: b24349cc3ef7b88b2e7b1aa8cce1dd65f0d10c28
Parents: def08ce
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Feb 26 21:38:27 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Feb 26 21:38:27 2017 -0800

----------------------------------------------------------------------
 cli/distributed/Executor.cpp | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b24349cc/cli/distributed/Executor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp
index e248fef..a95ed41 100644
--- a/cli/distributed/Executor.cpp
+++ b/cli/distributed/Executor.cpp
@@ -79,7 +79,8 @@ void Executor::init() {
   data_exchanger_.start();
 
   shiftboss_ =
-      make_unique<Shiftboss>(&bus_, &bus_local_, storage_manager_.get(), worker_directory_.get(), storage_manager_->hdfs());
+      make_unique<Shiftboss>(&bus_, &bus_local_, storage_manager_.get(), worker_directory_.get(),
+                             storage_manager_->hdfs());
   shiftboss_->start();
 
   for (const auto &worker : workers_) {


[14/24] incubator-quickstep git commit: Fix PackedPayloadHashTable for gcc build

Posted by zu...@apache.org.
Fix PackedPayloadHashTable for gcc build


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/132fed63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/132fed63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/132fed63

Branch: refs/heads/dist-patch
Commit: 132fed63d14a26917a4f3aacc2337c17b260f708
Parents: c008b7a
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Mon Feb 27 14:45:50 2017 -0600
Committer: cramja <ma...@gmail.com>
Committed: Tue Feb 28 10:13:53 2017 -0600

----------------------------------------------------------------------
 storage/PackedPayloadHashTable.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/132fed63/storage/PackedPayloadHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp
index 3d672f2..8c4a9fc 100644
--- a/storage/PackedPayloadHashTable.cpp
+++ b/storage/PackedPayloadHashTable.cpp
@@ -251,7 +251,7 @@ bool PackedPayloadHashTable::upsertValueAccessorCompositeKey(
       [&](auto use_two_accessors,  // NOLINT(build/c++11)
           auto key_only,
           auto has_variable_size) -> bool {
-    return upsertValueAccessorCompositeKeyInternal<
+    return this->upsertValueAccessorCompositeKeyInternal<
         decltype(use_two_accessors)::value,
         decltype(key_only)::value,
         decltype(has_variable_size)::value>(


[22/24] incubator-quickstep git commit: Enabled dag viz by default.

Posted by zu...@apache.org.
Enabled dag viz by default.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/88bc03b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/88bc03b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/88bc03b8

Branch: refs/heads/dist-patch
Commit: 88bc03b8a52f96990e172bcbe7dbb0c49c2b1f02
Parents: 3842c6e
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Feb 11 11:40:33 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Mar 1 14:43:01 2017 -0800

----------------------------------------------------------------------
 query_execution/QueryManagerBase.cpp                        | 2 +-
 query_optimizer/tests/DistributedExecutionGeneratorTest.cpp | 3 +++
 2 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/88bc03b8/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index f84ad4e..2dd5467 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -37,7 +37,7 @@ using std::pair;
 
 namespace quickstep {
 
-DEFINE_bool(visualize_execution_dag, false,
+DEFINE_bool(visualize_execution_dag, true,
             "If true, visualize the execution plan DAG into a graph in DOT "
             "format (DOT is a plain text graph description language) which is "
             "then printed via stderr.");

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/88bc03b8/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
index b18b5ec..9b96e12 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
@@ -43,6 +43,8 @@ using std::make_unique;
 
 QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_EXECUTION_GENERATOR_TEST);
 
+namespace quickstep { DECLARE_bool(visualize_execution_dag); }
+
 int main(int argc, char** argv) {
   google::InitGoogleLogging(argv[0]);
 
@@ -52,6 +54,7 @@ int main(int argc, char** argv) {
   quickstep::optimizer::FLAGS_use_filter_joins = false;
 
   // Honor FLAGS_buffer_pool_slots in StorageManager.
+  quickstep::FLAGS_visualize_execution_dag = false;
   gflags::ParseCommandLineFlags(&argc, &argv, true);
 
   if (argc < 4) {


[13/24] incubator-quickstep git commit: patch to fix gcc compile error gflags

Posted by zu...@apache.org.
patch to fix gcc compile error gflags


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c008b7ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c008b7ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c008b7ad

Branch: refs/heads/dist-patch
Commit: c008b7ad3b2333f7ea2bf4eae1967ce4ef31726d
Parents: 4437b9d
Author: cramja <ma...@gmail.com>
Authored: Tue Feb 28 09:49:57 2017 -0600
Committer: cramja <ma...@gmail.com>
Committed: Tue Feb 28 09:49:57 2017 -0600

----------------------------------------------------------------------
 third_party/download_and_patch_prerequisites.sh      | 1 +
 third_party/patches/gflags/gflags_reporting.cc.patch | 4 ++++
 2 files changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c008b7ad/third_party/download_and_patch_prerequisites.sh
----------------------------------------------------------------------
diff --git a/third_party/download_and_patch_prerequisites.sh b/third_party/download_and_patch_prerequisites.sh
index fd6106c..d16d4de 100755
--- a/third_party/download_and_patch_prerequisites.sh
+++ b/third_party/download_and_patch_prerequisites.sh
@@ -99,6 +99,7 @@ patch ${THIRD_PARTY_SRC_DIR}/linenoise/linenoise.c ${PATCH_DIR}/linenoise/lineno
 echo "Patching for gflags:"
 cd ${THIRD_PARTY_SRC_DIR}/gflags
 patch -p0 < ${PATCH_DIR}/gflags/CMakeLists.patch
+patch src/gflags_reporting.cc ${PATCH_DIR}/gflags/gflags_reporting.cc.patch
 cd ${THIRD_PARTY_SRC_DIR}
 
 # Apply re2 patch.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c008b7ad/third_party/patches/gflags/gflags_reporting.cc.patch
----------------------------------------------------------------------
diff --git a/third_party/patches/gflags/gflags_reporting.cc.patch b/third_party/patches/gflags/gflags_reporting.cc.patch
new file mode 100644
index 0000000..3e33e37
--- /dev/null
+++ b/third_party/patches/gflags/gflags_reporting.cc.patch
@@ -0,0 +1,4 @@
+129c129
+<     assert(chars_left == strlen(c_string));  // Unless there's a \0 in there?
+---
+>     assert(static_cast<size_t>(chars_left) == strlen(c_string));  // Unless there's a \0 in there?


[10/24] incubator-quickstep git commit: Disabled LIP optimization in the distributed version due to QUICKSTEP-76.

Posted by zu...@apache.org.
Disabled LIP optimization in the distributed version due to QUICKSTEP-76.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/def08ced
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/def08ced
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/def08ced

Branch: refs/heads/dist-patch
Commit: def08ced76d9ac197e9745280bd019dd500d8746
Parents: f4f5ca0
Author: Zuyu Zhang <zu...@apache.org>
Authored: Fri Feb 24 13:34:49 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Feb 24 13:34:49 2017 -0800

----------------------------------------------------------------------
 .../tests/DistributedExecutionGeneratorTest.cpp      | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/def08ced/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
index af310bc..1e2120e 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
@@ -28,12 +28,27 @@
 #include "glog/logging.h"
 #include "gtest/gtest.h"
 
+namespace quickstep {
+namespace optimizer {
+
+DECLARE_bool(use_lip_filters);
+DECLARE_bool(use_filter_joins);
+
+}  // namespace optimizer
+}  // namespace quickstep
+
 using quickstep::TextBasedTest;
 
 QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_EXECUTION_GENERATOR_TEST);
 
 int main(int argc, char** argv) {
   google::InitGoogleLogging(argv[0]);
+
+  // TODO(quickstep-team): Fix JIRA QUICKSTEP-76 for adding LIP filter support
+  // in the distributed version.
+  quickstep::optimizer::FLAGS_use_lip_filters = false;
+  quickstep::optimizer::FLAGS_use_filter_joins = false;
+
   // Honor FLAGS_buffer_pool_slots in StorageManager.
   gflags::ParseCommandLineFlags(&argc, &argv, true);
 


[19/24] incubator-quickstep git commit: Style fixes in the distributed version.

Posted by zu...@apache.org.
Style fixes in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5e0c32ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5e0c32ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5e0c32ac

Branch: refs/heads/dist-patch
Commit: 5e0c32acdf4080f4461e178aeea1af7d6eb80420
Parents: 6a240fc
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Feb 28 14:19:17 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Mar 1 14:32:11 2017 -0800

----------------------------------------------------------------------
 cli/distributed/Cli.cpp                         |   4 +-
 cli/distributed/Conductor.cpp                   |   4 +-
 .../tests/DistributedExecutionGeneratorTest.cpp |  10 +-
 .../DistributedExecutionGeneratorTestRunner.cpp |  49 ++++--
 .../DistributedExecutionGeneratorTestRunner.hpp |  45 +-----
 .../tests/execution_generator/CMakeLists.txt    | 154 ++++++++++---------
 6 files changed, 127 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e0c32ac/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 60b9c8d..6228898 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -48,8 +48,8 @@ typedef quickstep::LineReaderDumb LineReaderImpl;
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "storage/DataExchangerAsync.hpp"
-#include "utility/StringUtil.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "utility/StringUtil.hpp"
 
 #include "tmb/address.h"
 #include "tmb/id_typedefs.h"
@@ -76,7 +76,7 @@ using tmb::client_id;
 
 namespace quickstep {
 
-namespace S = ::quickstep::serialization;
+namespace S = serialization;
 
 void Cli::init() {
   cli_id_ = bus_.Connect();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e0c32ac/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index cf2eb4b..3c68bfb 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -63,13 +63,13 @@ using tmb::client_id;
 
 namespace quickstep {
 
-namespace S = ::quickstep::serialization;
+namespace S = serialization;
 
 void Conductor::init() {
   try {
     string catalog_path = FLAGS_storage_path + kCatalogFilename;
 
-    if (quickstep::FLAGS_initialize_db) {  // Initialize the database
+    if (FLAGS_initialize_db) {  // Initialize the database
       DefaultsConfigurator::InitializeDefaultDatabase(FLAGS_storage_path, catalog_path);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e0c32ac/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
index 1e2120e..b18b5ec 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
@@ -39,6 +39,8 @@ DECLARE_bool(use_filter_joins);
 
 using quickstep::TextBasedTest;
 
+using std::make_unique;
+
 QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_EXECUTION_GENERATOR_TEST);
 
 int main(int argc, char** argv) {
@@ -59,11 +61,9 @@ int main(int argc, char** argv) {
 
   std::ifstream input_file(argv[1]);
   CHECK(input_file.is_open()) << argv[1];
-  std::unique_ptr<quickstep::optimizer::DistributedExecutionGeneratorTestRunner>
-      test_runner(
-          new quickstep::optimizer::DistributedExecutionGeneratorTestRunner(argv[3]));
-  test_driver.reset(
-      new quickstep::TextBasedTestDriver(&input_file, test_runner.get()));
+
+  auto test_runner = make_unique<quickstep::optimizer::DistributedExecutionGeneratorTestRunner>(argv[3]);
+  test_driver = make_unique<quickstep::TextBasedTestDriver>(&input_file, test_runner.get());
   test_driver->registerOption(
       quickstep::optimizer::DistributedExecutionGeneratorTestRunner::kResetOption);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e0c32ac/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 6bd7a1f..3b1259a 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -37,8 +37,10 @@
 #include "query_execution/ForemanDistributed.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
+#include "query_optimizer/Optimizer.hpp"
 #include "query_optimizer/OptimizerContext.hpp"
 #include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/tests/TestDatabaseLoader.hpp"
 #include "storage/DataExchangerAsync.hpp"
 #include "storage/StorageManager.hpp"
 #include "utility/MemStream.hpp"
@@ -67,6 +69,8 @@ namespace optimizer {
 
 namespace {
 
+constexpr int kNumInstances = 3;
+
 void nop() {}
 
 }  // namespace
@@ -147,6 +151,35 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
   }
 }
 
+DistributedExecutionGeneratorTestRunner::~DistributedExecutionGeneratorTestRunner() {
+  const tmb::MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, foreman_->getBusClientID(), TaggedMessage(kPoisonMessage));
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+
+  for (int i = 0; i < kNumInstances; ++i) {
+    workers_[i]->join();
+    shiftbosses_[i]->join();
+  }
+
+  foreman_->join();
+
+  test_database_loader_data_exchanger_.shutdown();
+  test_database_loader_.reset();
+  for (int i = 0; i < kNumInstances; ++i) {
+    data_exchangers_[i].shutdown();
+    storage_managers_[i].reset();
+  }
+
+  CHECK(MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, locator_client_id_, TaggedMessage(kPoisonMessage)));
+
+  test_database_loader_data_exchanger_.join();
+  for (int i = 0; i < kNumInstances; ++i) {
+    data_exchangers_[i].join();
+  }
+  block_locator_->join();
+}
+
 void DistributedExecutionGeneratorTestRunner::runTestCase(
     const string &input, const std::set<string> &options, string *output) {
   // TODO(qzeng): Test multi-threaded query execution when we have a Sort operator.
@@ -174,27 +207,23 @@ void DistributedExecutionGeneratorTestRunner::runTestCase(
     const ParseStatement &parse_statement = *result.parsed_statement;
     std::printf("%s\n", parse_statement.toString().c_str());
 
-    const CatalogRelation *query_result_relation = nullptr;
+    auto query_handle = std::make_unique<QueryHandle>(query_id_++, cli_id_);
     try {
       OptimizerContext optimizer_context;
-      auto query_handle = std::make_unique<QueryHandle>(query_id_++, cli_id_);
-
       optimizer_.generateQueryHandle(parse_statement,
                                      test_database_loader_->catalog_database(),
                                      &optimizer_context,
                                      query_handle.get());
-      query_result_relation = query_handle->getQueryResultRelation();
-
-      QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
-          cli_id_,
-          foreman_->getBusClientID(),
-          query_handle.release(),
-          &bus_);
     } catch (const SqlError &error) {
       *output = error.formatMessage(input);
       break;
     }
 
+    const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
+
+    QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+        cli_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
+
     const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true);
     DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type());
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e0c32ac/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
index 2cd2427..2c0381b 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -31,7 +31,6 @@
 #include "query_execution/BlockLocator.hpp"
 #include "query_execution/ForemanDistributed.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/Shiftboss.hpp"
 #include "query_execution/Worker.hpp"
 #include "query_execution/WorkerDirectory.hpp"
@@ -45,16 +44,10 @@
 #include "glog/logging.h"
 
 #include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
 
 namespace quickstep {
 namespace optimizer {
 
-namespace {
-constexpr int kNumInstances = 3;
-}  // namespace
-
 /**
  * @brief TextBasedTestRunner for testing the ExecutionGenerator in the
  *        distributed version.
@@ -72,43 +65,7 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner {
    */
   explicit DistributedExecutionGeneratorTestRunner(const std::string &storage_path);
 
-  ~DistributedExecutionGeneratorTestRunner() {
-    tmb::TaggedMessage poison_tagged_message(quickstep::kPoisonMessage);
-
-    const tmb::MessageBus::SendStatus send_status =
-        QueryExecutionUtil::SendTMBMessage(
-            &bus_,
-            cli_id_,
-            foreman_->getBusClientID(),
-            std::move(poison_tagged_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
-
-    for (int i = 0; i < kNumInstances; ++i) {
-      workers_[i]->join();
-      shiftbosses_[i]->join();
-    }
-
-    foreman_->join();
-
-    test_database_loader_data_exchanger_.shutdown();
-    test_database_loader_.reset();
-    for (int i = 0; i < kNumInstances; ++i) {
-      data_exchangers_[i].shutdown();
-      storage_managers_[i].reset();
-    }
-
-    CHECK(MessageBus::SendStatus::kOK ==
-        QueryExecutionUtil::SendTMBMessage(&bus_,
-                                           cli_id_,
-                                           locator_client_id_,
-                                           tmb::TaggedMessage(quickstep::kPoisonMessage)));
-
-    test_database_loader_data_exchanger_.join();
-    for (int i = 0; i < kNumInstances; ++i) {
-      data_exchangers_[i].join();
-    }
-    block_locator_->join();
-  }
+  ~DistributedExecutionGeneratorTestRunner();
 
   void runTestCase(const std::string &input,
                    const std::set<std::string> &options,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e0c32ac/query_optimizer/tests/execution_generator/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt
index d38f4aa..40629ee 100644
--- a/query_optimizer/tests/execution_generator/CMakeLists.txt
+++ b/query_optimizer/tests/execution_generator/CMakeLists.txt
@@ -15,68 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-if (ENABLE_DISTRIBUTED)
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_create
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_delete
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Delete.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_distinct
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Distinct.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_drop
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Drop.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_index
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Index.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_insert
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Insert.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_join
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Join.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_partition
-           "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Partition.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedPartition.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedPartition/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_select
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Select.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_stringpatternmatching
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/StringPatternMatching.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_tablegenerator
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/TableGenerator.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_update
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Update.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate/")
-endif(ENABLE_DISTRIBUTED)
 add_test(quickstep_queryoptimizer_tests_executiongenerator_create
          "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
          "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
@@ -138,25 +76,74 @@ add_test(quickstep_queryoptimizer_tests_executiongenerator_update
          "${CMAKE_CURRENT_BINARY_DIR}/Update.test"
          "${CMAKE_CURRENT_BINARY_DIR}/Update/")
 
+if (ENABLE_DISTRIBUTED)
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_create_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/CreateDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/CreateDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_delete_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Delete.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DeleteDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DeleteDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_distinct_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Distinct.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistinctDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistinctDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_drop_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Drop.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DropDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DropDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_index_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Index.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/IndexDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/IndexDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_insert_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Insert.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/InsertDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/InsertDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_join_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Join.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/JoinDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/JoinDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_partition_distributed
+           "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Partition.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/PartitionDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/PartitionDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_select_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Select.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/SelectDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/SelectDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_stringpatternmatching_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/StringPatternMatching.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatchingDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatchingDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_tablegenerator_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/TableGenerator.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/TableGeneratorDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/TableGeneratorDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_update_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Update.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/UpdateDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/UpdateDistributed/")
+endif(ENABLE_DISTRIBUTED)
+
 # Create the folders where the unit tests will store their data blocks for the
 # duration of their test.
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Create)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Delete)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Distinct)
-if (ENABLE_DISTRIBUTED)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedPartition)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate)
-endif(ENABLE_DISTRIBUTED)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Drop)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Index)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Insert)
@@ -166,3 +153,18 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Select)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatching)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/TableGenerator)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)
+
+if (ENABLE_DISTRIBUTED)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/CreateDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DeleteDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistinctDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DropDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/IndexDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/InsertDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/JoinDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/PartitionDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/SelectDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatchingDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/TableGeneratorDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/UpdateDistributed)
+endif(ENABLE_DISTRIBUTED)


[20/24] incubator-quickstep git commit: Added command support in the distributed version.

Posted by zu...@apache.org.
Added command support in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/87bbb262
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/87bbb262
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/87bbb262

Branch: refs/heads/dist-patch
Commit: 87bbb2629ddc8f09c997612fca2cd700ce95d040
Parents: 5e0c32a
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Feb 27 00:30:43 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Mar 1 14:41:27 2017 -0800

----------------------------------------------------------------------
 CMakeLists.txt                                  |   2 +
 cli/distributed/CMakeLists.txt                  |   3 +
 cli/distributed/Cli.cpp                         |  72 ++++--
 cli/distributed/Conductor.cpp                   |  81 +++++-
 cli/distributed/Conductor.hpp                   |   3 +
 cli/tests/CMakeLists.txt                        |  41 ++++
 cli/tests/DistributedCommandExecutorTest.cpp    |  62 +++++
 .../DistributedCommandExecutorTestRunner.cpp    | 246 +++++++++++++++++++
 .../DistributedCommandExecutorTestRunner.hpp    |  99 ++++++++
 cli/tests/command_executor/CMakeLists.txt       |  18 ++
 query_execution/QueryExecutionMessages.proto    |   8 +
 query_execution/QueryExecutionTypedefs.hpp      |  10 +-
 12 files changed, 624 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 918069c..9cd02be 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -817,6 +817,7 @@ if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_distributed_cli_shell
                         glog
                         quickstep_catalog_CatalogRelation
+                        quickstep_cli_Constants
                         quickstep_cli_Flags
                         quickstep_cli_LineReader
                         quickstep_cli_PrintToScreen
@@ -833,6 +834,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_storage_StorageBlockInfo
                         quickstep_storage_StorageManager
                         quickstep_utility_Macros
+                        quickstep_utility_SqlError
                         quickstep_utility_StringUtil
                         tmb
                         ${GFLAGS_LIB_NAME}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/distributed/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt
index 5804321..1f7dee0 100644
--- a/cli/distributed/CMakeLists.txt
+++ b/cli/distributed/CMakeLists.txt
@@ -26,6 +26,8 @@ add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp)
 target_link_libraries(quickstep_cli_distributed_Conductor
                       glog
                       quickstep_catalog_CatalogDatabase
+                      quickstep_cli_CommandExecutorUtil
+                      quickstep_cli_Constants
                       quickstep_cli_DefaultsConfigurator
                       quickstep_cli_Flags
                       quickstep_cli_distributed_Role
@@ -41,6 +43,7 @@ target_link_libraries(quickstep_cli_distributed_Conductor
                       quickstep_storage_StorageConstants
                       quickstep_utility_Macros
                       quickstep_utility_SqlError
+                      quickstep_utility_StringUtil
                       tmb)
 target_link_libraries(quickstep_cli_distributed_Executor
                       glog

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 6228898..49b7dc1 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -30,6 +30,7 @@
 
 #include "catalog/CatalogRelation.hpp"
 #include "cli/CliConfig.h"  // For QUICKSTEP_USE_LINENOISE.
+#include "cli/Constants.hpp"
 #include "cli/Flags.hpp"
 
 #ifdef QUICKSTEP_USE_LINENOISE
@@ -49,6 +50,7 @@ typedef quickstep::LineReaderDumb LineReaderImpl;
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "storage/DataExchangerAsync.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "utility/SqlError.hpp"
 #include "utility/StringUtil.hpp"
 
 #include "tmb/address.h"
@@ -76,6 +78,7 @@ using tmb::client_id;
 
 namespace quickstep {
 
+namespace C = cli;
 namespace S = serialization;
 
 void Cli::init() {
@@ -127,6 +130,10 @@ void Cli::init() {
   bus_.RegisterClientAsSender(cli_id_, kQueryResultTeardownMessage);
 
   bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage);
+
+  // Prepare for submitting a command.
+  bus_.RegisterClientAsSender(cli_id_, kCommandMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kCommandResponseMessage);
 }
 
 void Cli::run() {
@@ -158,27 +165,51 @@ void Cli::run() {
           break;
         }
 
-        CHECK_NE(statement.getStatementType(), ParseStatement::kCommand)
-            << "TODO(quickstep-team)";
+        if (statement.getStatementType() == ParseStatement::kCommand) {
+          const ParseCommand &command = static_cast<const ParseCommand &>(statement);
+          const std::string &command_str = command.command()->value();
+          try {
+            if (command_str == C::kAnalyzeCommand) {
+              // TODO(zuyu): support '\analyze'.
+              THROW_SQL_ERROR_AT(command.command()) << "Unsupported Command";
+            } else if (command_str != C::kDescribeDatabaseCommand &&
+                       command_str != C::kDescribeTableCommand) {
+              THROW_SQL_ERROR_AT(command.command()) << "Invalid Command";
+            }
+          } catch (const SqlError &error) {
+            fprintf(stderr, "%s", error.formatMessage(*command_string).c_str());
+            reset_parser = true;
+            break;
+          }
+
+          DLOG(INFO) << "DistributedCli sent CommandMessage (typed '" << kCommandMessage
+                     << "') to Conductor";
+          S::CommandMessage proto;
+          proto.set_command(*command_string);
+
+          const size_t proto_length = proto.ByteSize();
+          char *proto_bytes = static_cast<char*>(malloc(proto_length));
+          CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-        DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
-                   << "') to Conductor";
-        S::SqlQueryMessage proto;
-        proto.set_sql_query(*command_string);
+          TaggedMessage command_message(static_cast<const void*>(proto_bytes), proto_length, kCommandMessage);
+          free(proto_bytes);
 
-        const size_t proto_length = proto.ByteSize();
-        char *proto_bytes = static_cast<char*>(malloc(proto_length));
-        CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+          QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(command_message));
+        } else {
+          DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
+                     << "') to Conductor";
+          S::SqlQueryMessage proto;
+          proto.set_sql_query(*command_string);
 
-        TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes),
-                                        proto_length,
-                                        kSqlQueryMessage);
-        free(proto_bytes);
+          const size_t proto_length = proto.ByteSize();
+          char *proto_bytes = static_cast<char*>(malloc(proto_length));
+          CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-        QueryExecutionUtil::SendTMBMessage(&bus_,
-                                           cli_id_,
-                                           conductor_client_id_,
-                                           move(sql_query_message));
+          TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), proto_length, kSqlQueryMessage);
+          free(proto_bytes);
+
+          QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(sql_query_message));
+        }
 
         start = std::chrono::steady_clock::now();
 
@@ -187,6 +218,13 @@ void Cli::run() {
         DLOG(INFO) << "DistributedCli received typed '" << tagged_message.message_type()
                    << "' message from client " << annotated_message.sender;
         switch (tagged_message.message_type()) {
+          case kCommandResponseMessage: {
+            S::CommandResponseMessage proto;
+            CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+            printf("%s", proto.command_response().c_str());
+            break;
+          }
           case kQueryExecutionSuccessMessage: {
             end = std::chrono::steady_clock::now();
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index 3c68bfb..b877b04 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -29,6 +29,8 @@
 #include <utility>
 
 #include "catalog/CatalogDatabase.hpp"
+#include "cli/CommandExecutorUtil.hpp"
+#include "cli/Constants.hpp"
 #include "cli/DefaultsConfigurator.hpp"
 #include "cli/Flags.hpp"
 #include "parser/ParseStatement.hpp"
@@ -42,6 +44,7 @@
 #include "query_optimizer/QueryProcessor.hpp"
 #include "storage/StorageConstants.hpp"
 #include "utility/SqlError.hpp"
+#include "utility/StringUtil.hpp"
 
 #include "tmb/id_typedefs.h"
 #include "tmb/native_net_client_message_bus.h"
@@ -63,6 +66,7 @@ using tmb::client_id;
 
 namespace quickstep {
 
+namespace C = cli;
 namespace S = serialization;
 
 void Conductor::init() {
@@ -91,6 +95,9 @@ void Conductor::init() {
   bus_.RegisterClientAsReceiver(conductor_client_id_, kDistributedCliRegistrationMessage);
   bus_.RegisterClientAsSender(conductor_client_id_, kDistributedCliRegistrationResponseMessage);
 
+  bus_.RegisterClientAsReceiver(conductor_client_id_, kCommandMessage);
+  bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage);
+
   bus_.RegisterClientAsReceiver(conductor_client_id_, kSqlQueryMessage);
   bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
   bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
@@ -125,6 +132,14 @@ void Conductor::run() {
             QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
         break;
       }
+      case kCommandMessage: {
+        S::CommandMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+        DLOG(INFO) << "Conductor received the following command: " << proto.command();
+
+        processCommandMessage(sender, new string(move(proto.command())));
+        break;
+      }
       case kSqlQueryMessage: {
         S::SqlQueryMessage proto;
         CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -146,6 +161,69 @@ void Conductor::run() {
   }
 }
 
+void Conductor::processCommandMessage(const tmb::client_id sender, string *command_string) {
+  parser_wrapper_.feedNextBuffer(command_string);
+  ParseResult parse_result = parser_wrapper_.getNextStatement();
+
+  CHECK(parse_result.condition == ParseResult::kSuccess)
+      << "Any syntax error should be addressed in the DistributedCli.";
+
+  const ParseStatement &statement = *parse_result.parsed_statement;
+  DCHECK_EQ(ParseStatement::kCommand, statement.getStatementType());
+
+  const ParseCommand &command = static_cast<const ParseCommand &>(statement);
+  const PtrVector<ParseString> &arguments = *(command.arguments());
+  const string &command_str = command.command()->value();
+
+  string command_response;
+
+  try {
+    if (command_str == C::kDescribeDatabaseCommand) {
+      command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
+    } else if (command_str == C::kDescribeTableCommand) {
+      if (arguments.empty()) {
+        command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
+      } else {
+        command_response = C::ExecuteDescribeTable(arguments, *catalog_database_);
+      }
+    }
+  } catch (const SqlError &command_error) {
+    // Set the query execution status along with the error message.
+    S::QueryExecutionErrorMessage proto;
+    proto.set_error_message(command_error.formatMessage(*command_string));
+
+    const size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kQueryExecutionErrorMessage);
+    free(proto_bytes);
+
+    DLOG(INFO) << "Conductor sent QueryExecutionErrorMessage (typed '"
+               << kQueryExecutionErrorMessage
+               << "') to Distributed CLI " << sender;
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+  }
+
+  S::CommandResponseMessage proto;
+  proto.set_command_response(command_response);
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
+  free(proto_bytes);
+
+  DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage
+             << "') to Distributed CLI " << sender;
+  CHECK(MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+}
+
 void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
   parser_wrapper_.feedNextBuffer(command_string);
   ParseResult parse_result = parser_wrapper_.getNextStatement();
@@ -154,8 +232,7 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
       << "Any SQL syntax error should be addressed in the DistributedCli.";
 
   const ParseStatement &statement = *parse_result.parsed_statement;
-  CHECK(statement.getStatementType() != ParseStatement::kCommand)
-     << "TODO(quickstep-team)";
+  DCHECK_NE(ParseStatement::kCommand, statement.getStatementType());
 
   try {
     auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index 09bf2b9..e7e003f 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -35,6 +35,7 @@
 namespace quickstep {
 
 class CatalogDatabase;
+class ParseCommand;
 
 /** \addtogroup CliDistributed
  *  @{
@@ -60,6 +61,8 @@ class Conductor final : public Role {
   void run() override;
 
  private:
+  void processCommandMessage(const tmb::client_id sender, std::string *command_string);
+
   void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string);
 
   SqlParserWrapper parser_wrapper_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/tests/CMakeLists.txt b/cli/tests/CMakeLists.txt
index 48f27bb..7f8150f 100644
--- a/cli/tests/CMakeLists.txt
+++ b/cli/tests/CMakeLists.txt
@@ -23,6 +23,14 @@ add_executable(quickstep_cli_tests_CommandExecutorTest
                CommandExecutorTestRunner.hpp
                "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
                "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
+if (ENABLE_DISTRIBUTED)
+  add_executable(quickstep_cli_tests_DistributedCommandExecutorTest
+                 DistributedCommandExecutorTest.cpp
+                 DistributedCommandExecutorTestRunner.cpp
+                 DistributedCommandExecutorTestRunner.hpp
+                 "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
+                 "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
+endif(ENABLE_DISTRIBUTED)
 
 target_link_libraries(quickstep_cli_tests_CommandExecutorTest
                       glog
@@ -49,3 +57,36 @@ target_link_libraries(quickstep_cli_tests_CommandExecutorTest
                       quickstep_utility_TextBasedTestDriver
                       tmb
                       ${LIBS})
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_cli_tests_DistributedCommandExecutorTest
+                        glog
+                        gtest
+                        quickstep_catalog_CatalogTypedefs
+                        quickstep_cli_CommandExecutorUtil
+                        quickstep_cli_Constants
+                        quickstep_cli_DropRelation
+                        quickstep_cli_PrintToScreen
+                        quickstep_parser_ParseStatement
+                        quickstep_parser_SqlParserWrapper
+                        quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_BlockLocatorUtil
+                        quickstep_queryexecution_ForemanDistributed
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryexecution_Shiftboss
+                        quickstep_queryexecution_Worker
+                        quickstep_queryexecution_WorkerDirectory
+                        quickstep_queryoptimizer_Optimizer
+                        quickstep_queryoptimizer_OptimizerContext
+                        quickstep_queryoptimizer_QueryHandle
+                        quickstep_queryoptimizer_tests_TestDatabaseLoader
+                        quickstep_storage_DataExchangerAsync
+                        quickstep_storage_StorageManager
+                        quickstep_utility_Macros
+                        quickstep_utility_MemStream
+                        quickstep_utility_SqlError
+                        quickstep_utility_TextBasedTestDriver
+                        tmb
+                        ${GFLAGS_LIB_NAME}
+                        ${LIBS})
+endif(ENABLE_DISTRIBUTED)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/tests/DistributedCommandExecutorTest.cpp
----------------------------------------------------------------------
diff --git a/cli/tests/DistributedCommandExecutorTest.cpp b/cli/tests/DistributedCommandExecutorTest.cpp
new file mode 100644
index 0000000..b41a70f
--- /dev/null
+++ b/cli/tests/DistributedCommandExecutorTest.cpp
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include <iostream>
+#include <fstream>
+#include <memory>
+
+#include "cli/tests/DistributedCommandExecutorTestRunner.hpp"
+#include "utility/textbased_test/TextBasedTestDriver.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+using quickstep::TextBasedTest;
+
+using std::make_unique;
+
+QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_COMMAND_EXECUTOR_TEST);
+
+int main(int argc, char** argv) {
+  google::InitGoogleLogging(argv[0]);
+  // Honor FLAGS_buffer_pool_slots in StorageManager.
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  if (argc < 4) {
+    LOG(ERROR) << "Must have at least 3 arguments, but " << argc - 1
+               << " are provided";
+  }
+
+  std::ifstream input_file(argv[1]);
+  CHECK(input_file.is_open()) << argv[1];
+
+  auto test_runner = make_unique<quickstep::DistributedCommandExecutorTestRunner>(argv[3]);
+  test_driver = make_unique<quickstep::TextBasedTestDriver>(&input_file, test_runner.get());
+  test_driver->registerOption(
+      quickstep::DistributedCommandExecutorTestRunner::kResetOption);
+
+  ::testing::InitGoogleTest(&argc, argv);
+  const int success = RUN_ALL_TESTS();
+  if (success != 0) {
+    test_driver->writeActualOutputToFile(argv[2]);
+  }
+
+  return success;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/tests/DistributedCommandExecutorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/cli/tests/DistributedCommandExecutorTestRunner.cpp b/cli/tests/DistributedCommandExecutorTestRunner.cpp
new file mode 100644
index 0000000..66d0767
--- /dev/null
+++ b/cli/tests/DistributedCommandExecutorTestRunner.cpp
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/tests/DistributedCommandExecutorTestRunner.hpp"
+
+#include <cstdio>
+#include <functional>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "cli/CommandExecutorUtil.hpp"
+#include "cli/Constants.hpp"
+#include "cli/DropRelation.hpp"
+#include "cli/PrintToScreen.hpp"
+#include "parser/ParseStatement.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_optimizer/Optimizer.hpp"
+#include "query_optimizer/OptimizerContext.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/tests/TestDatabaseLoader.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/MemStream.hpp"
+#include "utility/SqlError.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::make_unique;
+using std::string;
+using std::vector;
+
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+class CatalogRelation;
+
+namespace {
+
+void nop() {}
+
+}  // namespace
+
+namespace C = cli;
+
+const char *DistributedCommandExecutorTestRunner::kResetOption =
+    "reset_before_execution";
+
+DistributedCommandExecutorTestRunner::DistributedCommandExecutorTestRunner(const string &storage_path)
+    : query_id_(0) {
+  bus_.Initialize();
+
+  cli_id_ = bus_.Connect();
+  bus_.RegisterClientAsSender(cli_id_, kAdmitRequestMessage);
+  bus_.RegisterClientAsSender(cli_id_, kPoisonMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
+
+  bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage);
+
+  block_locator_ = make_unique<BlockLocator>(&bus_);
+  block_locator_->start();
+
+  test_database_loader_ = make_unique<optimizer::TestDatabaseLoader>(
+      storage_path,
+      block_locator::getBlockDomain(
+          test_database_loader_data_exchanger_.network_address(), cli_id_, &locator_client_id_, &bus_),
+      locator_client_id_,
+      &bus_);
+  DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_);
+  test_database_loader_data_exchanger_.set_storage_manager(test_database_loader_->storage_manager());
+  test_database_loader_data_exchanger_.start();
+
+  test_database_loader_->createTestRelation(false /* allow_vchar */);
+  test_database_loader_->loadTestRelation();
+
+  // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
+  // could receive a registration message from the latter.
+  foreman_ = make_unique<ForemanDistributed>(*block_locator_, std::bind(&nop), &bus_,
+                                             test_database_loader_->catalog_database());
+
+  // We don't use the NUMA aware version of worker code.
+  const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,
+                                        kAnyNUMANodeID);
+
+  bus_local_.Initialize();
+
+  worker_ = make_unique<Worker>(0 /* worker_thread_index */, &bus_local_);
+
+  const vector<tmb::client_id> worker_client_ids(1, worker_->getBusClientID());
+  worker_directory_ = make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids, numa_nodes);
+
+  storage_manager_ = make_unique<StorageManager>(
+      storage_path,
+      block_locator::getBlockDomain(
+          data_exchanger_.network_address(), cli_id_, &locator_client_id_, &bus_),
+      locator_client_id_, &bus_);
+  DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_);
+
+  data_exchanger_.set_storage_manager(storage_manager_.get());
+  shiftboss_ =
+      make_unique<Shiftboss>(&bus_, &bus_local_, storage_manager_.get(), worker_directory_.get(),
+                             storage_manager_->hdfs());
+
+  foreman_->start();
+
+  data_exchanger_.start();
+  shiftboss_->start();
+  worker_->start();
+}
+
+DistributedCommandExecutorTestRunner::~DistributedCommandExecutorTestRunner() {
+  const tmb::MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, foreman_->getBusClientID(), TaggedMessage(kPoisonMessage));
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+
+  worker_->join();
+  shiftboss_->join();
+
+  foreman_->join();
+
+  test_database_loader_data_exchanger_.shutdown();
+  test_database_loader_.reset();
+  data_exchanger_.shutdown();
+  storage_manager_.reset();
+
+  CHECK(MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, locator_client_id_, TaggedMessage(kPoisonMessage)));
+
+  test_database_loader_data_exchanger_.join();
+  data_exchanger_.join();
+  block_locator_->join();
+}
+
+void DistributedCommandExecutorTestRunner::runTestCase(
+    const string &input, const std::set<string> &options, string *output) {
+  // TODO(qzeng): Test multi-threaded query execution when we have a Sort operator.
+
+  VLOG(4) << "Test SQL(s): " << input;
+
+  if (options.find(kResetOption) != options.end()) {
+    test_database_loader_->clear();
+    test_database_loader_->createTestRelation(false /* allow_vchar */);
+    test_database_loader_->loadTestRelation();
+  }
+
+  MemStream output_stream;
+  sql_parser_.feedNextBuffer(new string(input));
+
+  while (true) {
+    ParseResult result = sql_parser_.getNextStatement();
+    if (result.condition != ParseResult::kSuccess) {
+      if (result.condition == ParseResult::kError) {
+        *output = result.error_message;
+      }
+      break;
+    }
+
+    const ParseStatement &parse_statement = *result.parsed_statement;
+    std::printf("%s\n", parse_statement.toString().c_str());
+
+    try {
+      if (parse_statement.getStatementType() == ParseStatement::kCommand) {
+        const ParseCommand &command = static_cast<const ParseCommand &>(parse_statement);
+        const PtrVector<ParseString> &arguments = *(command.arguments());
+        const string &command_str = command.command()->value();
+
+        string command_response;
+        if (command_str == C::kDescribeDatabaseCommand) {
+          command_response = C::ExecuteDescribeDatabase(arguments, *test_database_loader_->catalog_database());
+        } else if (command_str == C::kDescribeTableCommand) {
+          if (arguments.empty()) {
+            command_response = C::ExecuteDescribeDatabase(arguments, *test_database_loader_->catalog_database());
+          } else {
+            command_response = C::ExecuteDescribeTable(arguments, *test_database_loader_->catalog_database());
+          }
+        } else {
+          THROW_SQL_ERROR_AT(command.command()) << "Unsupported command";
+        }
+
+        std::fprintf(output_stream.file(), "%s", command_response.c_str());
+      } else {
+        optimizer::OptimizerContext optimizer_context;
+        auto query_handle = std::make_unique<QueryHandle>(query_id_++, cli_id_);
+
+        optimizer_.generateQueryHandle(parse_statement,
+                                       test_database_loader_->catalog_database(),
+                                       &optimizer_context,
+                                       query_handle.get());
+        const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
+
+        QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+            cli_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
+
+        const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true);
+        DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type());
+
+        if (query_result_relation) {
+          PrintToScreen::PrintRelation(*query_result_relation,
+                                       test_database_loader_->storage_manager(),
+                                       output_stream.file());
+          DropRelation::Drop(*query_result_relation,
+                             test_database_loader_->catalog_database(),
+                             test_database_loader_->storage_manager());
+        }
+      }
+    } catch (const SqlError &error) {
+      *output = error.formatMessage(input);
+      break;
+    }
+  }
+
+  if (output->empty()) {
+    *output = output_stream.str();
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/tests/DistributedCommandExecutorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/cli/tests/DistributedCommandExecutorTestRunner.hpp b/cli/tests/DistributedCommandExecutorTestRunner.hpp
new file mode 100644
index 0000000..0427a85
--- /dev/null
+++ b/cli/tests/DistributedCommandExecutorTestRunner.hpp
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_TESTS_DISTRIBUTED_COMMAND_EXECUTOR_TEST_RUNNER_HPP_
+#define QUICKSTEP_CLI_TESTS_DISTRIBUTED_COMMAND_EXECUTOR_TEST_RUNNER_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_optimizer/Optimizer.hpp"
+#include "query_optimizer/tests/TestDatabaseLoader.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/Macros.hpp"
+#include "utility/textbased_test/TextBasedTestRunner.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/**
+ * @brief TextBasedTestRunner for testing the CommandExecutor in the
+ *        distributed version.
+ */
+class DistributedCommandExecutorTestRunner : public TextBasedTestRunner {
+ public:
+  /**
+   * @brief If this option is enabled, recreate the entire database and
+   *        repopulate the data before every test.
+   */
+  static const char *kResetOption;
+
+  /**
+   * @brief Constructor.
+   */
+  explicit DistributedCommandExecutorTestRunner(const std::string &storage_path);
+
+  ~DistributedCommandExecutorTestRunner();
+
+  void runTestCase(const std::string &input,
+                   const std::set<std::string> &options,
+                   std::string *output) override;
+
+ private:
+  std::size_t query_id_;
+
+  SqlParserWrapper sql_parser_;
+  std::unique_ptr<optimizer::TestDatabaseLoader> test_database_loader_;
+  DataExchangerAsync test_database_loader_data_exchanger_;
+  optimizer::Optimizer optimizer_;
+
+  MessageBusImpl bus_;
+  tmb::client_id cli_id_, locator_client_id_;
+
+  std::unique_ptr<BlockLocator> block_locator_;
+
+  std::unique_ptr<ForemanDistributed> foreman_;
+
+  MessageBusImpl bus_local_;
+  std::unique_ptr<Worker> worker_;
+  std::unique_ptr<WorkerDirectory> worker_directory_;
+  DataExchangerAsync data_exchanger_;
+  std::unique_ptr<StorageManager> storage_manager_;
+  std::unique_ptr<Shiftboss> shiftboss_;
+
+  DISALLOW_COPY_AND_ASSIGN(DistributedCommandExecutorTestRunner);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_CLI_TESTS_DISTRIBUTED_COMMAND_EXECUTOR_TEST_RUNNER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/tests/command_executor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/tests/command_executor/CMakeLists.txt b/cli/tests/command_executor/CMakeLists.txt
index 9cf1869..e62d954 100644
--- a/cli/tests/command_executor/CMakeLists.txt
+++ b/cli/tests/command_executor/CMakeLists.txt
@@ -26,7 +26,25 @@ add_test(quickstep_cli_tests_commandexecutor_dt
          "${CMAKE_CURRENT_BINARY_DIR}/Dt.test"
          "${CMAKE_CURRENT_BINARY_DIR}/Dt/")
 
+if (ENABLE_DISTRIBUTED)
+  add_test(quickstep_cli_tests_commandexecutor_d_distributed
+           "../quickstep_cli_tests_DistributedCommandExecutorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/D.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DDistributed/")
+  add_test(quickstep_cli_tests_commandexecutor_dt_distributed
+           "../quickstep_cli_tests_DistributedCommandExecutorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Dt.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DtDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DtDistributed/")
+endif(ENABLE_DISTRIBUTED)
+
 # Create the folders where the unit tests will store their data blocks for the
 # duration of their test.
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/D)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Dt)
+
+if (ENABLE_DISTRIBUTED)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DtDistributed)
+endif(ENABLE_DISTRIBUTED)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 68f286d..47246d8 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -81,6 +81,10 @@ message ShiftbossRegistrationResponseMessage {
   required uint64 shiftboss_index = 1;
 }
 
+message CommandMessage {
+  required string command = 1;
+}
+
 message SqlQueryMessage {
   required string sql_query = 1;
 }
@@ -134,6 +138,10 @@ message SaveQueryResultResponseMessage {
   required uint64 shiftboss_index = 4;
 }
 
+message CommandResponseMessage {
+  required string command_response = 1;
+}
+
 message QueryExecutionSuccessMessage {
   optional CatalogRelationSchema result_relation = 1;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 994bd60..0fd0bdf 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -89,7 +89,11 @@ enum QueryExecutionMessageType : message_type_id {
                                           // Shiftboss to Worker.
   kDistributedCliRegistrationMessage,  // From CLI to Conductor.
   kDistributedCliRegistrationResponseMessage,  // From Conductor to CLI.
-  kSqlQueryMessage,  // From CLI to Conductor.
+
+  // From CLI to Conductor.
+  kCommandMessage,
+  kSqlQueryMessage,
+
   kQueryInitiateMessage,  // From Foreman to Shiftboss.
   kQueryInitiateResponseMessage,  // From Shiftboss to Foreman.
 
@@ -101,8 +105,10 @@ enum QueryExecutionMessageType : message_type_id {
   kSaveQueryResultMessage,  // From Foreman to Shiftboss.
   kSaveQueryResultResponseMessage,  // From Shiftboss to Foreman.
 
+  kQueryExecutionSuccessMessage,  // From Foreman to CLI.
+
   // From Foreman / Conductor to CLI.
-  kQueryExecutionSuccessMessage,
+  kCommandResponseMessage,
   kQueryExecutionErrorMessage,
 
   kQueryResultTeardownMessage,  // From CLI to Conductor.



[06/24] incubator-quickstep git commit: Use partitioned aggregation for single-function DISTINCT aggregation.

Posted by zu...@apache.org.
Use partitioned aggregation for single-function DISTINCT aggregation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/17477f57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/17477f57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/17477f57

Branch: refs/heads/dist-patch
Commit: 17477f5756e599b4276d6d366c3144cad0be536f
Parents: 4be8e91
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Mon Feb 20 20:05:08 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Fri Feb 24 11:10:56 2017 -0600

----------------------------------------------------------------------
 storage/AggregationOperationState.cpp | 158 ++++++++++++++++++++---------
 storage/AggregationOperationState.hpp |   3 +
 storage/CMakeLists.txt                |   3 +-
 storage/PackedPayloadHashTable.cpp    |  33 +++---
 storage/PackedPayloadHashTable.hpp    |  32 ++++--
 utility/TemplateUtil.hpp              |  74 +++++++++++++-
 6 files changed, 228 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 0f39b41..eef2c9d 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -19,8 +19,10 @@
 
 #include "storage/AggregationOperationState.hpp"
 
+#include <algorithm>
 #include <cstddef>
 #include <cstdint>
+#include <functional>
 #include <memory>
 #include <string>
 #include <utility>
@@ -87,6 +89,8 @@ AggregationOperationState::AggregationOperationState(
       is_aggregate_partitioned_(false),
       predicate_(predicate),
       is_distinct_(std::move(is_distinct)),
+      all_distinct_(std::accumulate(is_distinct_.begin(), is_distinct_.end(),
+                                    !is_distinct_.empty(), std::logical_and<bool>())),
       storage_manager_(storage_manager) {
   if (!group_by.empty()) {
     if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
@@ -163,11 +167,6 @@ AggregationOperationState::AggregationOperationState(
     handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
 
     if (!group_by_key_ids_.empty()) {
-      // Aggregation with GROUP BY: combined payload is partially updated in
-      // the presence of DISTINCT.
-      if (*is_distinct_it) {
-        handles_.back()->blockUpdate();
-      }
       group_by_handles.emplace_back(handles_.back().get());
     } else {
       // Aggregation without GROUP BY: create a single global state.
@@ -180,17 +179,32 @@ AggregationOperationState::AggregationOperationState(
       std::vector<const Type *> key_types(group_by_types_);
       key_types.insert(
           key_types.end(), argument_types.begin(), argument_types.end());
+
       // TODO(jianqiao): estimated_num_entries is quite inaccurate for
       // estimating the number of entries in the distinctify hash table.
       // We need to estimate for each distinct aggregation an
       // estimated_num_distinct_keys value during query optimization.
-      distinctify_hashtables_.emplace_back(
-          AggregationStateHashTableFactory::CreateResizable(
-              *distinctify_hash_table_impl_types_it,
-              key_types,
-              estimated_num_entries,
-              {} /* handles */,
-              storage_manager));
+      if (is_aggregate_partitioned_) {
+        DCHECK(partitioned_group_by_hashtable_pool_ == nullptr);
+        partitioned_group_by_hashtable_pool_.reset(
+            new PartitionedHashTablePool(estimated_num_entries,
+                                         FLAGS_num_aggregation_partitions,
+                                         *distinctify_hash_table_impl_types_it,
+                                         key_types,
+                                         {},
+                                         storage_manager));
+      } else {
+        distinctify_hashtables_.emplace_back(
+            AggregationStateHashTableFactory::CreateResizable(
+                *distinctify_hash_table_impl_types_it,
+                key_types,
+                estimated_num_entries,
+                {} /* handles */,
+                storage_manager));
+
+        // Combined payload is partially updated in the presence of DISTINCT.
+        handles_.back()->blockUpdate();
+      }
       ++distinctify_hash_table_impl_types_it;
     } else {
       distinctify_hashtables_.emplace_back(nullptr);
@@ -208,13 +222,24 @@ AggregationOperationState::AggregationOperationState(
               group_by_handles,
               storage_manager));
     } else if (is_aggregate_partitioned_) {
-      partitioned_group_by_hashtable_pool_.reset(
-          new PartitionedHashTablePool(estimated_num_entries,
-                                       FLAGS_num_aggregation_partitions,
-                                       hash_table_impl_type,
-                                       group_by_types_,
-                                       group_by_handles,
-                                       storage_manager));
+      if (all_distinct_) {
+        DCHECK_EQ(1u, group_by_handles.size());
+        DCHECK(partitioned_group_by_hashtable_pool_ != nullptr);
+        group_by_hashtable_pool_.reset(
+            new HashTablePool(estimated_num_entries,
+                              hash_table_impl_type,
+                              group_by_types_,
+                              group_by_handles,
+                              storage_manager));
+      } else {
+        partitioned_group_by_hashtable_pool_.reset(
+            new PartitionedHashTablePool(estimated_num_entries,
+                                         FLAGS_num_aggregation_partitions,
+                                         hash_table_impl_type,
+                                         group_by_types_,
+                                         group_by_handles,
+                                         storage_manager));
+      }
     } else {
       group_by_hashtable_pool_.reset(
           new HashTablePool(estimated_num_entries,
@@ -362,11 +387,13 @@ bool AggregationOperationState::checkAggregatePartitioned(
   if (aggregate_functions.empty()) {
     return false;
   }
-  // Check if there's a distinct operation involved in any aggregate, if so
-  // the aggregate can't be partitioned.
-  for (auto distinct : is_distinct) {
-    if (distinct) {
-      return false;
+  // If there is only only aggregate function, we allow distinct aggregation.
+  // Otherwise it can't be partitioned with distinct aggregation.
+  if (aggregate_functions.size() > 1) {
+    for (auto distinct : is_distinct) {
+      if (distinct) {
+        return false;
+      }
     }
   }
   // There's no distinct aggregation involved, Check if there's at least one
@@ -384,12 +411,17 @@ bool AggregationOperationState::checkAggregatePartitioned(
     }
   }
 
+  // Currently we always use partitioned aggregation to parallelize distinct
+  // aggregation.
+  if (all_distinct_) {
+    return true;
+  }
+
   // There are GROUP BYs without DISTINCT. Check if the estimated number of
   // groups is large enough to warrant a partitioned aggregation.
   return estimated_num_groups >
          static_cast<std::size_t>(
              FLAGS_partition_aggregation_num_groups_threshold);
-  return false;
 }
 
 std::size_t AggregationOperationState::getNumInitializationPartitions() const {
@@ -599,10 +631,19 @@ void AggregationOperationState::aggregateBlockHashTableImplPartitioned(
       }
 
       ValueAccessorMultiplexer local_mux(base_adapter.get(), derived_adapter.get());
-      partitioned_group_by_hashtable_pool_->getHashTable(partition)
-          ->upsertValueAccessorCompositeKey(argument_ids_,
-                                            group_by_key_ids_,
-                                            local_mux);
+      if (all_distinct_) {
+        DCHECK_EQ(1u, handles_.size());
+        handles_.front()->insertValueAccessorIntoDistinctifyHashTable(
+            argument_ids_.front(),
+            group_by_key_ids_,
+            local_mux,
+            partitioned_group_by_hashtable_pool_->getHashTable(partition));
+      } else {
+        partitioned_group_by_hashtable_pool_->getHashTable(partition)
+            ->upsertValueAccessorCompositeKey(argument_ids_,
+                                              group_by_key_ids_,
+                                              local_mux);
+      }
     }
   });
 }
@@ -621,13 +662,15 @@ void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate(
     }
   }
 
-  AggregationStateHashTableBase *agg_hash_table =
-      group_by_hashtable_pool_->getHashTable();
+  if (!all_distinct_) {
+    AggregationStateHashTableBase *agg_hash_table =
+        group_by_hashtable_pool_->getHashTable();
 
-  agg_hash_table->upsertValueAccessorCompositeKey(argument_ids_,
-                                                  group_by_key_ids_,
-                                                  accessor_mux);
-  group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+    agg_hash_table->upsertValueAccessorCompositeKey(argument_ids_,
+                                                    group_by_key_ids_,
+                                                    accessor_mux);
+    group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+  }
 }
 
 void AggregationOperationState::finalizeAggregate(
@@ -711,10 +754,24 @@ void AggregationOperationState::finalizeHashTableImplCollisionFree(
 void AggregationOperationState::finalizeHashTableImplPartitioned(
     const std::size_t partition_id,
     InsertDestination *output_destination) {
-  PackedPayloadHashTable *hash_table =
+  PackedPayloadHashTable *partitioned_hash_table =
       static_cast<PackedPayloadHashTable *>(
           partitioned_group_by_hashtable_pool_->getHashTable(partition_id));
 
+  PackedPayloadHashTable *hash_table;
+  if (all_distinct_) {
+    DCHECK_EQ(1u, handles_.size());
+    DCHECK(group_by_hashtable_pool_ != nullptr);
+
+    hash_table = static_cast<PackedPayloadHashTable *>(
+        group_by_hashtable_pool_->getHashTable());
+    handles_.front()->aggregateOnDistinctifyHashTableForGroupBy(
+        *partitioned_hash_table, 0, hash_table);
+    partitioned_hash_table->destroyPayload();
+  } else {
+    hash_table = partitioned_hash_table;
+  }
+
   // Each element of 'group_by_keys' is a vector of values for a particular
   // group (which is also the prefix of the finalized Tuple for that group).
   std::vector<std::vector<TypedValue>> group_by_keys;
@@ -790,19 +847,24 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivate(
   // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
   // e.g. Keep merging entries from smaller hash tables to larger.
 
-  auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
-  DCHECK(hash_tables != nullptr);
-  if (hash_tables->empty()) {
-    return;
-  }
+  std::unique_ptr<AggregationStateHashTableBase> final_hash_table_ptr;
 
-  std::unique_ptr<AggregationStateHashTableBase> final_hash_table_ptr(
-      hash_tables->back().release());
-  for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
-    std::unique_ptr<AggregationStateHashTableBase> hash_table(
-        hash_tables->at(i).release());
-    mergeGroupByHashTables(hash_table.get(), final_hash_table_ptr.get());
-    hash_table->destroyPayload();
+  if (all_distinct_) {
+    final_hash_table_ptr.reset(group_by_hashtable_pool_->getHashTable());
+  } else {
+    auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+    DCHECK(hash_tables != nullptr);
+    if (hash_tables->empty()) {
+      return;
+    }
+
+    final_hash_table_ptr.reset(hash_tables->back().release());
+    for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+      std::unique_ptr<AggregationStateHashTableBase> hash_table(
+          hash_tables->at(i).release());
+      mergeGroupByHashTables(hash_table.get(), final_hash_table_ptr.get());
+      hash_table->destroyPayload();
+    }
   }
 
   PackedPayloadHashTable *final_hash_table =

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index c8930ee..6c9690a 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -273,6 +273,9 @@ class AggregationOperationState {
   // arguments.
   std::vector<bool> is_distinct_;
 
+  // A flag indicating whether all aggregate functions are DISTINCT aggregations.
+  const bool all_distinct_;
+
   // Non-trivial group-by/argument expressions that need to be evaluated.
   std::vector<std::unique_ptr<const Scalar>> non_trivial_expressions_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 293be17..8b68150 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -817,7 +817,8 @@ target_link_libraries(quickstep_storage_PackedPayloadHashTable
                       quickstep_utility_Alignment
                       quickstep_utility_HashPair
                       quickstep_utility_Macros
-                      quickstep_utility_PrimeNumber)
+                      quickstep_utility_PrimeNumber
+                      quickstep_utility_TemplateUtil)
 target_link_libraries(quickstep_storage_PartitionedHashTablePool
                       glog
                       quickstep_storage_HashTableBase

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/storage/PackedPayloadHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp
index bf5eaee..3d672f2 100644
--- a/storage/PackedPayloadHashTable.cpp
+++ b/storage/PackedPayloadHashTable.cpp
@@ -40,6 +40,7 @@
 #include "utility/Alignment.hpp"
 #include "utility/Macros.hpp"
 #include "utility/PrimeNumber.hpp"
+#include "utility/TemplateUtil.hpp"
 
 #include "glog/logging.h"
 
@@ -234,23 +235,31 @@ bool PackedPayloadHashTable::upsertValueAccessorCompositeKey(
   ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
   ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor();
 
+  const bool has_derived_accessor = (derived_accessor != nullptr);
+
   base_accessor->beginIterationVirtual();
-  if (derived_accessor == nullptr) {
-    return upsertValueAccessorCompositeKeyInternal<false>(
-        argument_ids,
-        key_attr_ids,
-        base_accessor,
-        nullptr);
-  } else {
+  if (has_derived_accessor) {
     DCHECK(derived_accessor->getImplementationType()
                == ValueAccessor::Implementation::kColumnVectors);
     derived_accessor->beginIterationVirtual();
-    return upsertValueAccessorCompositeKeyInternal<true>(
-        argument_ids,
-        key_attr_ids,
-        base_accessor,
-        static_cast<ColumnVectorsValueAccessor *>(derived_accessor));
   }
+
+  return InvokeOnBools(
+      has_derived_accessor,
+      handles_.empty(),
+      !all_keys_inline_,
+      [&](auto use_two_accessors,  // NOLINT(build/c++11)
+          auto key_only,
+          auto has_variable_size) -> bool {
+    return upsertValueAccessorCompositeKeyInternal<
+        decltype(use_two_accessors)::value,
+        decltype(key_only)::value,
+        decltype(has_variable_size)::value>(
+            argument_ids,
+            key_attr_ids,
+            base_accessor,
+            static_cast<ColumnVectorsValueAccessor *>(derived_accessor));
+  });
 }
 
 void PackedPayloadHashTable::resize(const std::size_t extra_buckets,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/storage/PackedPayloadHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
index f87a1de..c49bdb4 100644
--- a/storage/PackedPayloadHashTable.hpp
+++ b/storage/PackedPayloadHashTable.hpp
@@ -20,10 +20,12 @@
 #ifndef QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
 #define QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
 
+#include <algorithm>
 #include <atomic>
 #include <cstddef>
 #include <cstdint>
 #include <cstring>
+#include <functional>
 #include <limits>
 #include <vector>
 
@@ -336,11 +338,12 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
                                        const std::uint8_t **value,
                                        std::size_t *entry_num) const;
 
+  template <bool key_only = false>
   inline std::uint8_t* upsertCompositeKeyInternal(
       const std::vector<TypedValue> &key,
       const std::size_t variable_key_size);
 
-  template <bool use_two_accessors>
+  template <bool use_two_accessors, bool key_only, bool has_variable_size>
   inline bool upsertValueAccessorCompositeKeyInternal(
       const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
       const std::vector<MultiSourceAttributeId> &key_ids,
@@ -355,8 +358,9 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
   // comes from a HashTableKeyManager, and is set by the constructor of a
   // subclass of HashTable.
   inline void setKeyInline(const std::vector<bool> *key_inline) {
-    scalar_key_inline_ = key_inline->front();
     key_inline_ = key_inline;
+    all_keys_inline_ = std::accumulate(key_inline_->begin(), key_inline_->end(),
+                                       true, std::logical_and<bool>());
   }
 
   inline static std::size_t ComputeTotalPayloadSize(
@@ -407,7 +411,7 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
   // Information about whether key components are stored inline or in a
   // separate variable-length storage region. This is usually determined by a
   // HashTableKeyManager and set by calling setKeyInline().
-  bool scalar_key_inline_;
+  bool all_keys_inline_;
   const std::vector<bool> *key_inline_;
 
   const std::size_t num_handles_;
@@ -763,7 +767,7 @@ inline bool PackedPayloadHashTable::upsertCompositeKey(
   }
 }
 
-
+template <bool key_only>
 inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal(
     const std::vector<TypedValue> &key,
     const std::size_t variable_key_size) {
@@ -809,7 +813,9 @@ inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal(
   writeCompositeKeyToBucket(key, hash_code, bucket);
 
   std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset;
-  std::memcpy(value, init_payload_, this->total_payload_size_);
+  if (!key_only) {
+    std::memcpy(value, init_payload_, this->total_payload_size_);
+  }
 
   // Update the previous chaing pointer to point to the new bucket.
   pending_chain_ptr->store(pending_chain_ptr_finish_value,
@@ -819,13 +825,13 @@ inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal(
   return value;
 }
 
-template <bool use_two_accessors>
+template <bool use_two_accessors, bool key_only, bool has_variable_size>
 inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal(
     const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
     const std::vector<MultiSourceAttributeId> &key_ids,
     ValueAccessor *base_accessor,
     ColumnVectorsValueAccessor *derived_accessor) {
-  std::size_t variable_size;
+  std::size_t variable_size = 0;
   std::vector<TypedValue> key_vector;
   key_vector.resize(key_ids.size());
 
@@ -848,13 +854,17 @@ inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal(
                   &key_vector)) {
             continue;
           }
-          variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector);
-          std::uint8_t *value = this->upsertCompositeKeyInternal(
-              key_vector, variable_size);
+          if (has_variable_size) {
+            variable_size =
+                this->calculateVariableLengthCompositeKeyCopySize(key_vector);
+          }
+          std::uint8_t *value =
+              this->template upsertCompositeKeyInternal<key_only>(
+                  key_vector, variable_size);
           if (value == nullptr) {
             continuing = true;
             break;
-          } else {
+          } else if (!key_only) {
             SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
             for (unsigned int k = 0; k < num_handles_; ++k) {
               const auto &ids = argument_ids[k];

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/utility/TemplateUtil.hpp
----------------------------------------------------------------------
diff --git a/utility/TemplateUtil.hpp b/utility/TemplateUtil.hpp
index 33e4f42..dfae8e4 100644
--- a/utility/TemplateUtil.hpp
+++ b/utility/TemplateUtil.hpp
@@ -30,6 +30,8 @@ namespace quickstep {
  *  @{
  */
 
+namespace template_util_inner {
+
 /**
  * @brief Represents a compile-time sequence of integers.
  *
@@ -58,7 +60,6 @@ struct MakeSequence<0, S...> {
   typedef Sequence<S...> type;
 };
 
-
 /**
  * @brief Final step of CreateBoolInstantiatedInstance. Now all bool_values are
  *        ready. Instantiate the template and create (i.e. new) an instance.
@@ -72,6 +73,42 @@ inline ReturnT* CreateBoolInstantiatedInstanceInner(Tuple &&args,
 }
 
 /**
+ * @brief Invoke the functor with the compile-time bool values wrapped as
+ *        integral_constant types.
+ */
+template <typename FunctorT, bool ...bool_values>
+inline auto InvokeOnBoolsInner(const FunctorT &functor) {
+  return functor(std::integral_constant<bool, bool_values>()...);
+}
+
+/**
+ * @brief Recursive dispatching.
+ */
+template <typename FunctorT, bool ...bool_values, typename ...Bools>
+inline auto InvokeOnBoolsInner(const FunctorT &functor,
+                               const bool tparam,
+                               const Bools ...rest_params) {
+  if (tparam) {
+    return InvokeOnBoolsInner<FunctorT, bool_values..., true>(
+        functor, rest_params...);
+  } else {
+    return InvokeOnBoolsInner<FunctorT, bool_values..., false>(
+        functor, rest_params...);
+  }
+}
+
+/**
+ * @brief Move the functor to the first position in argument list.
+ */
+template <std::size_t last, std::size_t ...i, typename TupleT>
+inline auto InvokeOnBoolsInner(TupleT &&args, Sequence<i...> &&indices) {
+  return InvokeOnBoolsInner(std::get<last>(std::forward<TupleT>(args)),
+                            std::get<i>(std::forward<TupleT>(args))...);
+}
+
+}  // namespace template_util_inner
+
+/**
  * @brief Edge case of the recursive CreateBoolInstantiatedInstance function
  *        when all bool variables have been branched and replaced with compile-time
  *        bool constants.
@@ -85,8 +122,10 @@ inline ReturnT* CreateBoolInstantiatedInstance(Tuple &&args) {
   // for the tuple, so that the tuple can be unpacked as a sequence of constructor
   // parameters in CreateBoolInstantiatedInstanceInner.
   constexpr std::size_t n_args = std::tuple_size<Tuple>::value;
-  return CreateBoolInstantiatedInstanceInner<T, ReturnT, bool_values...>(
-      std::forward<Tuple>(args), typename MakeSequence<n_args>::type());
+  return template_util_inner::CreateBoolInstantiatedInstanceInner<
+      T, ReturnT, bool_values...>(
+          std::forward<Tuple>(args),
+          typename template_util_inner::MakeSequence<n_args>::type());
 }
 
 /**
@@ -160,6 +199,35 @@ inline ReturnT* CreateBoolInstantiatedInstance(Tuple &&args,
   }
 }
 
+/**
+ * @brief A helper function for bool branched template specialization.
+ *
+ * Usage example:
+ * --
+ * bool c1 = true, c2 = false;
+ *
+ * InvokeOnBools(
+ *     c1, c2,
+ *     [&](auto c1, auto c2) -> SomeBaseClass* {
+ *   using T1 = decltype(c1);  // T1 == std::true_type
+ *   using T2 = decltype(c2);  // T2 == std::false_type
+ *
+ *   constexpr bool cv1 = T1::value;  // cv1 == true
+ *   constexpr bool cv2 = T2::value;  // cv2 == false
+ *
+ *   SomeFunction<cv1, cv2>(...);
+ *   return new SomeClass<cv1, cv2>(...);
+ * });
+ * --
+ */
+template <typename ...ArgTypes>
+inline auto InvokeOnBools(ArgTypes ...args) {
+  constexpr std::size_t last = sizeof...(args) - 1;
+  return template_util_inner::InvokeOnBoolsInner<last>(
+      std::forward_as_tuple(args...),
+      typename template_util_inner::MakeSequence<last>::type());
+}
+
 /** @} */
 
 }  // namespace quickstep


[15/24] incubator-quickstep git commit: Moved Describe command to an util file.

Posted by zu...@apache.org.
Moved Describe command to an util file.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5f5073f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5f5073f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5f5073f3

Branch: refs/heads/dist-patch
Commit: 5f5073f34f3fd8b06907d65e7d503c77067d336e
Parents: 132fed6
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Feb 28 01:34:10 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Tue Feb 28 14:22:37 2017 -0800

----------------------------------------------------------------------
 cli/CMakeLists.txt                        |  56 +++++---
 cli/CommandExecutor.cpp                   | 136 +------------------
 cli/CommandExecutorUtil.cpp               | 180 +++++++++++++++++++++++++
 cli/CommandExecutorUtil.hpp               |  63 +++++++++
 cli/Constants.hpp                         |   5 -
 cli/PrintToScreen.cpp                     |  29 ++--
 cli/PrintToScreen.hpp                     |   4 +-
 cli/tests/command_executor/CMakeLists.txt |   1 -
 8 files changed, 307 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f5073f3/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index 7b4319a..ffeed2f 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -43,18 +43,15 @@ configure_file (
   "${CMAKE_CURRENT_SOURCE_DIR}/CliConfig.h.in"
   "${CMAKE_CURRENT_BINARY_DIR}/CliConfig.h"
 )
-add_library(quickstep_cli_CommandExecutor CommandExecutor.cpp CommandExecutor.hpp)
-add_library(quickstep_cli_Constants ../empty_src.cpp Constants.hpp)
 
 # Declare micro-libs and link dependencies:
+add_library(quickstep_cli_CommandExecutor CommandExecutor.cpp CommandExecutor.hpp)
+add_library(quickstep_cli_CommandExecutorUtil CommandExecutorUtil.cpp CommandExecutorUtil.hpp)
+add_library(quickstep_cli_Constants ../empty_src.cpp Constants.hpp)
+add_library(quickstep_cli_DefaultsConfigurator DefaultsConfigurator.cpp DefaultsConfigurator.hpp)
 add_library(quickstep_cli_DropRelation DropRelation.cpp DropRelation.hpp)
-target_link_libraries(quickstep_cli_DropRelation
-                      quickstep_catalog_CatalogDatabase
-                      quickstep_catalog_CatalogRelation
-                      quickstep_storage_StorageBlockInfo
-                      quickstep_storage_StorageManager
-                      quickstep_utility_Macros)
 add_library(quickstep_cli_Flags Flags.cpp Flags.hpp)
+add_library(quickstep_cli_InputParserUtil InputParserUtil.cpp InputParserUtil.hpp)
 
 if(USE_LINENOISE)
   add_library(quickstep_cli_LineReader
@@ -62,21 +59,14 @@ if(USE_LINENOISE)
               LineReaderLineNoise.cpp
               LineReader.hpp
               LineReaderLineNoise.hpp)
-  target_link_libraries(quickstep_cli_LineReader
-                        linenoise
-                        quickstep_utility_Macros)
 else()
   add_library(quickstep_cli_LineReader
               LineReader.cpp
               LineReaderDumb.cpp
               LineReader.hpp
               LineReaderDumb.hpp)
-  target_link_libraries(quickstep_cli_LineReader
-                        quickstep_utility_Macros)
 endif()
 
-add_library(quickstep_cli_DefaultsConfigurator DefaultsConfigurator.cpp DefaultsConfigurator.hpp)
-add_library(quickstep_cli_InputParserUtil InputParserUtil.cpp InputParserUtil.hpp)
 add_library(quickstep_cli_PrintToScreen PrintToScreen.cpp PrintToScreen.hpp)
 
 # Link dependencies:
@@ -87,6 +77,7 @@ target_link_libraries(quickstep_cli_CommandExecutor
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_catalog_CatalogRelationStatistics
+                      quickstep_cli_CommandExecutorUtil
                       quickstep_cli_Constants
                       quickstep_cli_DropRelation
                       quickstep_cli_PrintToScreen
@@ -108,8 +99,22 @@ target_link_libraries(quickstep_cli_CommandExecutor
                       quickstep_types_TypedValue
                       quickstep_utility_PtrVector
                       quickstep_utility_SqlError
+                      quickstep_utility_StringUtil
+                      tmb)
+target_link_libraries(quickstep_cli_CommandExecutorUtil
+                      quickstep_catalog_CatalogAttribute
+                      quickstep_catalog_CatalogDatabase
+                      quickstep_catalog_CatalogRelation
+                      quickstep_catalog_IndexScheme
+                      quickstep_cli_PrintToScreen
+                      quickstep_parser_ParseString
+                      quickstep_storage_StorageBlockLayout_proto
+                      quickstep_types_Type
+                      quickstep_types_TypeID
+                      quickstep_types_TypedValue
+                      quickstep_utility_PtrVector
+                      quickstep_utility_SqlError
                       quickstep_utility_StringUtil)
-
 target_link_libraries(quickstep_cli_DefaultsConfigurator
                       glog
                       quickstep_catalog_Catalog
@@ -120,6 +125,12 @@ if(QUICKSTEP_HAVE_LIBNUMA)
   target_link_libraries(quickstep_cli_DefaultsConfigurator
                         ${LIBNUMA_LIBRARY})
 endif()
+target_link_libraries(quickstep_cli_DropRelation
+                      quickstep_catalog_CatalogDatabase
+                      quickstep_catalog_CatalogRelation
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageManager
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_cli_Flags
                       quickstep_cli_DefaultsConfigurator
                       quickstep_storage_StorageConstants
@@ -129,8 +140,16 @@ target_link_libraries(quickstep_cli_InputParserUtil
                       quickstep_utility_Macros
                       quickstep_utility_StringUtil)
 if(QUICKSTEP_HAVE_LIBNUMA)
-target_link_libraries(quickstep_cli_InputParserUtil
-                      ${LIBNUMA_LIBRARY})
+  target_link_libraries(quickstep_cli_InputParserUtil
+                        ${LIBNUMA_LIBRARY})
+endif()
+if(USE_LINENOISE)
+  target_link_libraries(quickstep_cli_LineReader
+                        linenoise
+                        quickstep_utility_Macros)
+else()
+  target_link_libraries(quickstep_cli_LineReader
+                        quickstep_utility_Macros)
 endif()
 target_link_libraries(quickstep_cli_PrintToScreen
                       ${GFLAGS_LIB_NAME}
@@ -151,6 +170,7 @@ add_library(quickstep_cli ../empty_src.cpp CliModule.hpp)
 
 target_link_libraries(quickstep_cli
                       quickstep_cli_CommandExecutor
+                      quickstep_cli_CommandExecutorUtil
                       quickstep_cli_Constants
                       quickstep_cli_DefaultsConfigurator
                       quickstep_cli_DropRelation

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f5073f3/cli/CommandExecutor.cpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutor.cpp b/cli/CommandExecutor.cpp
index 7f63469..6a84672 100644
--- a/cli/CommandExecutor.cpp
+++ b/cli/CommandExecutor.cpp
@@ -32,6 +32,7 @@
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogRelationSchema.hpp"
 #include "catalog/CatalogRelationStatistics.hpp"
+#include "cli/CommandExecutorUtil.hpp"
 #include "cli/Constants.hpp"
 #include "cli/DropRelation.hpp"
 #include "cli/PrintToScreen.hpp"
@@ -72,132 +73,6 @@ namespace quickstep {
 namespace cli {
 namespace {
 
-void ExecuteDescribeDatabase(
-    const PtrVector<ParseString> &arguments,
-    const CatalogDatabase &catalog_database,
-    FILE *out) {
-  // Column width initialized to 6 to take into account the header name
-  // and the column value table
-  int max_column_width = kInitMaxColumnWidth;
-  vector<std::size_t> num_blocks;
-  const CatalogRelation *relation = nullptr;
-  if (arguments.empty()) {
-    for (const CatalogRelation &rel : catalog_database) {
-      max_column_width =
-          std::max(static_cast<int>(rel.getName().length()), max_column_width);
-      num_blocks.push_back(rel.size_blocks());
-    }
-  } else {
-    const ParseString &table_name = arguments.front();
-    const std::string &table_name_val = table_name.value();
-    relation = catalog_database.getRelationByName(table_name_val);
-
-    if (relation == nullptr) {
-      THROW_SQL_ERROR_AT(&(arguments.front())) << " Unrecognized relation " << table_name_val;
-    }
-    max_column_width = std::max(static_cast<int>(relation->getName().length()),
-                                    max_column_width);
-    num_blocks.push_back(relation->size_blocks());
-  }
-  // Only if we have relations work on the printing logic.
-  if (!catalog_database.empty()) {
-    const std::size_t max_num_blocks = *std::max_element(num_blocks.begin(), num_blocks.end());
-    const int max_num_blocks_digits = std::max(PrintToScreen::GetNumberOfDigits(max_num_blocks),
-                                      kInitMaxColumnWidth + 2);
-    vector<int> column_widths;
-    column_widths.push_back(max_column_width + 1);
-    column_widths.push_back(kInitMaxColumnWidth + 1);
-    column_widths.push_back(max_num_blocks_digits + 1);
-    fputs("       List of relations\n\n", out);
-    fprintf(out, "%-*s |", max_column_width + 1, " Name");
-    fprintf(out, "%-*s |", kInitMaxColumnWidth, " Type");
-    fprintf(out, "%-*s\n", max_num_blocks_digits, " Blocks");
-    PrintToScreen::printHBar(column_widths, out);
-    //  If there are no argument print the entire list of tables
-    //  else print the particular table only.
-    vector<std::size_t>::const_iterator num_blocks_it = num_blocks.begin();
-    if (arguments.empty()) {
-      for (const CatalogRelation &rel : catalog_database) {
-        fprintf(out, " %-*s |", max_column_width, rel.getName().c_str());
-        fprintf(out, " %-*s |", kInitMaxColumnWidth - 1, "table");
-        fprintf(out, " %-*lu\n", max_num_blocks_digits - 1, *num_blocks_it);
-        ++num_blocks_it;
-      }
-    } else {
-      fprintf(out, " %-*s |", max_column_width, relation->getName().c_str());
-      fprintf(out, " %-*s |", kInitMaxColumnWidth - 1, "table");
-      fprintf(out, " %-*lu\n", max_num_blocks_digits - 1, *num_blocks_it);
-      ++num_blocks_it;
-    }
-    fputc('\n', out);
-  }
-}
-
-void ExecuteDescribeTable(
-    const PtrVector<ParseString> &arguments,
-    const CatalogDatabase &catalog_database, FILE *out) {
-  const ParseString &table_name = arguments.front();
-  const std::string &table_name_val = table_name.value();
-  const CatalogRelation *relation =
-      catalog_database.getRelationByName(table_name_val);
-  if (relation == nullptr) {
-    THROW_SQL_ERROR_AT(&(arguments.front())) << " Unrecognized relation "  << table_name_val;
-  }
-  vector<int> column_widths;
-  int max_attr_column_width = kInitMaxColumnWidth;
-  int max_type_column_width = kInitMaxColumnWidth;
-
-  for (const CatalogAttribute &attr : *relation) {
-    // Printed column needs to be wide enough to print:
-    //   1. The attribute name (in the printed "header").
-    //   2. Any value of the attribute's Type.
-    max_attr_column_width =
-        std::max(max_attr_column_width,
-            static_cast<int>(attr.getDisplayName().length()));
-    max_type_column_width =
-        std::max(max_type_column_width,
-            static_cast<int>(attr.getType().getName().length()));
-  }
-  // Add room for one extra character to allow spacing between the column ending and the vertical bar
-  column_widths.push_back(max_attr_column_width + 1);
-  column_widths.push_back(max_type_column_width + 1);
-
-  fprintf(out, "%*s \"%s\"\n", kInitMaxColumnWidth, "Table", table_name_val.c_str());
-  fprintf(out, "%-*s |", max_attr_column_width + 1, " Column");
-  fprintf(out, "%-*s\n", max_type_column_width + 1, " Type");
-  PrintToScreen::printHBar(column_widths, out);
-  for (const CatalogAttribute &attr : *relation) {
-    fprintf(out, " %-*s |", max_attr_column_width,
-            attr.getDisplayName().c_str());
-    fprintf(out, " %-*s\n", max_type_column_width,
-            attr.getType().getName().c_str());
-  }
-  // TODO(rogers): Add handlers for partitioning information.
-  if (relation->hasIndexScheme()) {
-    fprintf(out, "%*s\n", kInitMaxColumnWidth + 2, " Indexes");
-    const quickstep::IndexScheme &index_scheme = relation->getIndexScheme();
-    for (auto index_it = index_scheme.begin(); index_it != index_scheme.end();
-         ++index_it) {
-      fprintf(out, "  \"%-*s\" %s", static_cast<int>(index_it->first.length()),
-              index_it->first.c_str(),
-              index_it->second.IndexSubBlockType_Name(
-                  index_it->second.sub_block_type()).c_str());
-      fputc(' ', out);
-      fputc('(', out);
-      fprintf(out, "%s", relation->getAttributeById(index_it->second.indexed_attribute_ids(0))
-                             ->getDisplayName().c_str());
-      for (std::size_t i = 1; i < static_cast<std::size_t>(index_it->second.indexed_attribute_ids_size()); ++i) {
-        const char *attribute_display_name = relation->getAttributeById(
-                                                 index_it->second.indexed_attribute_ids(i))
-                                                     ->getDisplayName().c_str();
-        fprintf(out, ", %s", attribute_display_name);
-      }
-      fputc(')', out);
-      fputc('\n', out);
-    }
-  }
-}
-
 /**
  * @brief A helper function that executes a SQL query to obtain a row of results.
  */
@@ -422,12 +297,15 @@ void executeCommand(const ParseStatement &statement,
   const PtrVector<ParseString> &arguments = *(command.arguments());
   const std::string &command_str = command.command()->value();
   if (command_str == kDescribeDatabaseCommand) {
-    ExecuteDescribeDatabase(arguments, catalog_database, out);
+    const string database_description = ExecuteDescribeDatabase(arguments, catalog_database);
+    fprintf(out, "%s", database_description.c_str());
   } else if (command_str == kDescribeTableCommand) {
     if (arguments.empty()) {
-      ExecuteDescribeDatabase(arguments, catalog_database, out);
+      const string database_description = ExecuteDescribeDatabase(arguments, catalog_database);
+      fprintf(out, "%s", database_description.c_str());
     } else {
-      ExecuteDescribeTable(arguments, catalog_database, out);
+      const string table_description = ExecuteDescribeTable(arguments, catalog_database);
+      fprintf(out, "%s", table_description.c_str());
     }
   } else if (command_str == kAnalyzeCommand) {
     ExecuteAnalyze(arguments,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f5073f3/cli/CommandExecutorUtil.cpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutorUtil.cpp b/cli/CommandExecutorUtil.cpp
new file mode 100644
index 0000000..d17617f
--- /dev/null
+++ b/cli/CommandExecutorUtil.cpp
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/CommandExecutorUtil.hpp"
+
+#include <algorithm>
+#include <cstddef>
+#include <iomanip>
+#include <string>
+#include <vector>
+
+#include "catalog/CatalogAttribute.hpp"
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/IndexScheme.hpp"
+#include "cli/PrintToScreen.hpp"
+#include "parser/ParseString.hpp"
+#include "storage/StorageBlockLayout.pb.h"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "utility/PtrVector.hpp"
+#include "utility/SqlError.hpp"
+#include "utility/StringUtil.hpp"
+
+using std::ostringstream;
+using std::setw;
+using std::size_t;
+using std::string;
+using std::vector;
+
+namespace quickstep {
+namespace cli {
+
+namespace {
+
+// Adding the max column width as 6 as the default initializer
+// as the length of the word Column is 6 characters.
+// This is used while describing the table.
+
+constexpr int kInitMaxColumnWidth = 6;
+
+}  // namespace
+
+string ExecuteDescribeDatabase(
+    const PtrVector<ParseString> &arguments,
+    const CatalogDatabase &catalog_database) {
+  // Column width initialized to 6 to take into account the header name
+  // and the column value table
+  int max_column_width = kInitMaxColumnWidth;
+  vector<std::size_t> num_blocks;
+  const CatalogRelation *relation = nullptr;
+  if (arguments.empty()) {
+    for (const CatalogRelation &rel : catalog_database) {
+      max_column_width =
+          std::max(static_cast<int>(rel.getName().length()), max_column_width);
+      num_blocks.push_back(rel.size_blocks());
+    }
+  } else {
+    const ParseString &table_name = arguments.front();
+    const std::string &table_name_val = table_name.value();
+    relation = catalog_database.getRelationByName(table_name_val);
+
+    if (relation == nullptr) {
+      THROW_SQL_ERROR_AT(&(arguments.front())) << " Unrecognized relation " << table_name_val;
+    }
+    max_column_width = std::max(static_cast<int>(relation->getName().length()),
+                                    max_column_width);
+    num_blocks.push_back(relation->size_blocks());
+  }
+
+  ostringstream oss;
+  // Only if we have relations work on the printing logic.
+  if (!catalog_database.empty()) {
+    const std::size_t max_num_blocks = *std::max_element(num_blocks.begin(), num_blocks.end());
+    const int max_num_blocks_digits = std::max(PrintToScreen::GetNumberOfDigits(max_num_blocks),
+                                               kInitMaxColumnWidth + 2);
+
+    oss << "       List of relations\n\n"
+        << std::left << setw(max_column_width + 1) << " Name" << " |"
+        << setw(kInitMaxColumnWidth) << " Type" << " |"
+        << setw(max_num_blocks_digits) << " Blocks" << '\n'
+        << PrintToScreen::GenerateHBar({ max_column_width + 1, kInitMaxColumnWidth + 1, max_num_blocks_digits + 1 });
+
+    //  If there are no argument print the entire list of tables
+    //  else print the particular table only.
+    vector<std::size_t>::const_iterator num_blocks_it = num_blocks.begin();
+    if (arguments.empty()) {
+      for (const CatalogRelation &rel : catalog_database) {
+        oss << ' ' << setw(max_column_width) << rel.getName() << " |"
+            << setw(kInitMaxColumnWidth) << " table" << " | "
+            << setw(max_num_blocks_digits - 1) << *num_blocks_it << '\n';
+        ++num_blocks_it;
+      }
+    } else {
+      oss << ' ' << setw(max_column_width) << relation->getName() << " |"
+          << setw(kInitMaxColumnWidth) << " table" << " | "
+          << setw(max_num_blocks_digits - 1) << *num_blocks_it << '\n';
+      ++num_blocks_it;
+    }
+    oss << '\n';
+  }
+
+  return oss.str();
+}
+
+string ExecuteDescribeTable(
+    const PtrVector<ParseString> &arguments,
+    const CatalogDatabase &catalog_database) {
+  const ParseString &table_name = arguments.front();
+  const std::string &table_name_val = table_name.value();
+  const CatalogRelation *relation =
+      catalog_database.getRelationByName(table_name_val);
+  if (relation == nullptr) {
+    THROW_SQL_ERROR_AT(&(arguments.front())) << " Unrecognized relation "  << table_name_val;
+  }
+
+  int max_attr_column_width = kInitMaxColumnWidth;
+  int max_type_column_width = kInitMaxColumnWidth;
+  for (const CatalogAttribute &attr : *relation) {
+    // Printed column needs to be wide enough to print:
+    //   1. The attribute name (in the printed "header").
+    //   2. Any value of the attribute's Type.
+    max_attr_column_width =
+        std::max(max_attr_column_width,
+            static_cast<int>(attr.getDisplayName().length()));
+    max_type_column_width =
+        std::max(max_type_column_width,
+            static_cast<int>(attr.getType().getName().length()));
+  }
+
+  ostringstream oss;
+  oss << setw(kInitMaxColumnWidth) << "Table" << " \"" << table_name_val << "\"\n";
+  oss << std::left << setw(max_attr_column_width + 1) << " Column" << " |";
+  oss << setw(max_type_column_width + 1) << " Type" << '\n';
+
+  // Add room for one extra character to allow spacing between the column ending and the vertical bar
+  oss << PrintToScreen::GenerateHBar({ max_attr_column_width + 1, max_type_column_width + 1 });
+
+  for (const CatalogAttribute &attr : *relation) {
+    oss << ' ' << setw(max_attr_column_width) << attr.getDisplayName() << " | "
+        << setw(max_type_column_width) << attr.getType().getName() << '\n';
+  }
+  // TODO(rogers): Add handlers for partitioning information.
+  if (relation->hasIndexScheme()) {
+    oss << setw(kInitMaxColumnWidth + 2) << " Indexes" << '\n';
+    for (const auto &index : relation->getIndexScheme()) {
+      const IndexSubBlockDescription &index_description = index.second;
+      oss << "  \"" << index.first << "\" "
+          << index_description.IndexSubBlockType_Name(index_description.sub_block_type())
+          << " ("
+          << relation->getAttributeById(index_description.indexed_attribute_ids(0))->getDisplayName();
+      for (int i = 1; i < index_description.indexed_attribute_ids_size(); ++i) {
+        oss << ", " << relation->getAttributeById(index_description.indexed_attribute_ids(i))->getDisplayName();
+      }
+      oss << ")\n";
+    }
+  }
+
+  return oss.str();
+}
+
+}  // namespace cli
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f5073f3/cli/CommandExecutorUtil.hpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutorUtil.hpp b/cli/CommandExecutorUtil.hpp
new file mode 100644
index 0000000..7f8c4df
--- /dev/null
+++ b/cli/CommandExecutorUtil.hpp
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_COMMAND_COMMAND_EXECUTOR_UTIL_HPP_
+#define QUICKSTEP_CLI_COMMAND_COMMAND_EXECUTOR_UTIL_HPP_
+
+#include <string>
+
+#include "utility/PtrVector.hpp"
+
+namespace quickstep {
+
+class CatalogDatabase;
+class ParseString;
+
+namespace cli {
+
+/** \addtogroup CLI
+ *  @{
+ */
+
+/**
+  * @brief Executes the command by calling the command handler.
+  *
+  * @param statement The parsed statement from the cli.
+  * @param catalog_database The catalog information about the current database.
+  * @param main_thread_client_id The TMB client ID of the main thread.
+  * @param foreman_client_id The TMB client ID of the Foreman thread.
+  * @param bus A pointer to the TMB.
+  * @param query_processor The query processor to generate plans for SQL queries.
+  * @param foreman The foreman to execute query plans.
+  * @param out The stream where the output of the command has to be redirected to.
+*/
+extern std::string ExecuteDescribeDatabase(
+    const PtrVector<ParseString> &arguments,
+    const CatalogDatabase &catalog_database);
+
+extern std::string ExecuteDescribeTable(
+    const PtrVector<ParseString> &arguments,
+    const CatalogDatabase &catalog_database);
+
+/** @} */
+
+}  // namespace cli
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_CLI_COMMAND_COMMAND_EXECUTOR_UTIL_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f5073f3/cli/Constants.hpp
----------------------------------------------------------------------
diff --git a/cli/Constants.hpp b/cli/Constants.hpp
index 1aaa5be..0b4a37b 100644
--- a/cli/Constants.hpp
+++ b/cli/Constants.hpp
@@ -27,11 +27,6 @@ namespace cli {
  *  @{
  */
 
-// Adding the max column width as 6 as the default initializer
-// as the length of the word Column is 6 characters.
-// This is used while describing the table.
-constexpr int kInitMaxColumnWidth = 6;
-
 constexpr char kDescribeDatabaseCommand[] = "\\dt";
 constexpr char kDescribeTableCommand[] = "\\d";
 constexpr char kAnalyzeCommand[] = "\\analyze";

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f5073f3/cli/PrintToScreen.cpp
----------------------------------------------------------------------
diff --git a/cli/PrintToScreen.cpp b/cli/PrintToScreen.cpp
index 7d06474..bb64c93 100644
--- a/cli/PrintToScreen.cpp
+++ b/cli/PrintToScreen.cpp
@@ -19,10 +19,13 @@
 
 #include "cli/PrintToScreen.hpp"
 
+#include <cmath>
 #include <cstddef>
 #include <cstdio>
-#include <cmath>
+#include <iomanip>
 #include <memory>
+#include <sstream>
+#include <string>
 #include <vector>
 
 #include "catalog/CatalogAttribute.hpp"
@@ -42,6 +45,7 @@
 using std::fprintf;
 using std::fputc;
 using std::size_t;
+using std::string;
 using std::vector;
 
 namespace quickstep {
@@ -88,7 +92,8 @@ void PrintToScreen::PrintRelation(const CatalogRelation &relation,
     column_widths.push_back(column_width);
   }
 
-  printHBar(column_widths, out);
+  const string hbar = GenerateHBar(column_widths);
+  fprintf(out, "%s", hbar.c_str());
 
   fputc('|', out);
   vector<int>::const_iterator width_it = column_widths.begin();
@@ -101,7 +106,7 @@ void PrintToScreen::PrintRelation(const CatalogRelation &relation,
   }
   fputc('\n', out);
 
-  printHBar(column_widths, out);
+  fprintf(out, "%s", hbar.c_str());
 
   std::vector<block_id> blocks = relation.getBlocksSnapshot();
   for (const block_id current_block_id : blocks) {
@@ -120,19 +125,19 @@ void PrintToScreen::PrintRelation(const CatalogRelation &relation,
     }
   }
 
-  printHBar(column_widths, out);
+  fprintf(out, "%s", hbar.c_str());
 }
 
-void PrintToScreen::printHBar(const vector<int> &column_widths,
-                              FILE *out) {
-  fputc('+', out);
+string PrintToScreen::GenerateHBar(const vector<int> &column_widths) {
+  string hbar("+");
+
   for (const int width : column_widths) {
-    for (int i = 0; i < width; ++i) {
-      fputc('-', out);
-    }
-    fputc('+', out);
+    hbar.append(width, '-');
+    hbar.push_back('+');
   }
-  fputc('\n', out);
+  hbar.push_back('\n');
+
+  return hbar;
 }
 
 void PrintToScreen::printTuple(const TupleStorageSubBlock &tuple_store,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f5073f3/cli/PrintToScreen.hpp
----------------------------------------------------------------------
diff --git a/cli/PrintToScreen.hpp b/cli/PrintToScreen.hpp
index 2b5fd7e..f444857 100644
--- a/cli/PrintToScreen.hpp
+++ b/cli/PrintToScreen.hpp
@@ -21,6 +21,7 @@
 #define QUICKSTEP_CLI_PRINT_TO_SCREEN_HPP_
 
 #include <cstdio>
+#include <string>
 #include <vector>
 
 #include "storage/StorageBlockInfo.hpp"
@@ -46,8 +47,7 @@ class PrintToScreen {
                             StorageManager *storage_manager,
                             FILE *out);
 
-  static void printHBar(const std::vector<int> &column_widths,
-                        FILE *out);
+  static std::string GenerateHBar(const std::vector<int> &column_widths);
 
   /**
    * @brief Get the total number of tuples in the given relation.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5f5073f3/cli/tests/command_executor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/tests/command_executor/CMakeLists.txt b/cli/tests/command_executor/CMakeLists.txt
index 0bdf865..2cbf1bf 100644
--- a/cli/tests/command_executor/CMakeLists.txt
+++ b/cli/tests/command_executor/CMakeLists.txt
@@ -30,4 +30,3 @@ add_test(quickstep_cli_tests_commandexecutor_dt
 # duration of their test.
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/D)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Dt)
-


[03/24] incubator-quickstep git commit: Use BitVector as LIPFilter implementation when applicable

Posted by zu...@apache.org.
Use BitVector as LIPFilter implementation when applicable


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2b2d7ba1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2b2d7ba1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2b2d7ba1

Branch: refs/heads/dist-patch
Commit: 2b2d7ba1970ade47b1170cd7974cb2fc53f7ba71
Parents: 1572762
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Wed Feb 22 14:06:55 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Wed Feb 22 14:06:55 2017 -0600

----------------------------------------------------------------------
 query_optimizer/rules/AttachLIPFilters.cpp | 74 +++++++++++++++++++++++--
 query_optimizer/rules/AttachLIPFilters.hpp |  9 +++
 query_optimizer/rules/CMakeLists.txt       |  1 +
 3 files changed, 79 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b2d7ba1/query_optimizer/rules/AttachLIPFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachLIPFilters.cpp b/query_optimizer/rules/AttachLIPFilters.cpp
index 4b6ac59..9a13b48 100644
--- a/query_optimizer/rules/AttachLIPFilters.cpp
+++ b/query_optimizer/rules/AttachLIPFilters.cpp
@@ -20,6 +20,7 @@
 #include "query_optimizer/rules/AttachLIPFilters.hpp"
 
 #include <algorithm>
+#include <cstdint>
 #include <map>
 #include <set>
 #include <unordered_set>
@@ -37,6 +38,7 @@
 #include "query_optimizer/physical/PhysicalType.hpp"
 #include "query_optimizer/physical/Selection.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "types/TypeID.hpp"
 #include "types/TypedValue.hpp"
 #include "utility/lip_filter/LIPFilter.hpp"
 
@@ -126,11 +128,40 @@ void AttachLIPFilters::attachLIPFilters(
         const E::ExprId source_attr_id = pair.second->source_attribute->id();
         if (already_filtered_attributes->find(source_attr_id)
                 == already_filtered_attributes->end()) {
-          lip_filter_configuration_->addBuildInfo(
-              P::SingleIdentityHashFilterBuildInfo::Create(
-                  pair.second->source_attribute,
-                  std::max(64uL, pair.second->estimated_cardinality * 8u)),
-              pair.second->source);
+          bool use_exact_filter = false;
+          std::int64_t min_cpp_value;
+          std::int64_t max_cpp_value;
+          const bool has_exact_min_max_stats =
+              findExactMinMaxValuesForAttributeHelper(pair.second->source,
+                                                      pair.second->source_attribute,
+                                                      &min_cpp_value,
+                                                      &max_cpp_value);
+          if (has_exact_min_max_stats) {
+            const std::int64_t value_range = max_cpp_value - min_cpp_value;
+            DCHECK_GE(value_range, 0);
+            // TODO(jianqiao): Add this threshold as a gflag (together with
+            // InjectJoinFilters::kMaxFilterSize).
+            if (value_range <= 1000000000L) {
+              use_exact_filter = true;
+            }
+          }
+
+          if (use_exact_filter) {
+            lip_filter_configuration_->addBuildInfo(
+                P::BitVectorExactFilterBuildInfo::Create(
+                    pair.second->source_attribute,
+                    min_cpp_value,
+                    max_cpp_value,
+                    false),
+                pair.second->source);
+          } else {
+            lip_filter_configuration_->addBuildInfo(
+                P::SingleIdentityHashFilterBuildInfo::Create(
+                    pair.second->source_attribute,
+                    std::max(64uL, pair.second->estimated_cardinality * 8u)),
+                pair.second->source);
+          }
+
           lip_filter_configuration_->addProbeInfo(
               P::LIPFilterProbeInfo::Create(
                   pair.first,
@@ -258,5 +289,38 @@ const std::vector<AttachLIPFilters::LIPFilterInfoPtr>& AttachLIPFilters
   return probe_side_info_.at(node);
 }
 
+bool AttachLIPFilters::findExactMinMaxValuesForAttributeHelper(
+    const physical::PhysicalPtr &physical_plan,
+    const expressions::AttributeReferencePtr &attribute,
+    std::int64_t *min_cpp_value,
+    std::int64_t *max_cpp_value) const {
+  bool min_value_is_exact;
+  bool max_value_is_exact;
+
+  const TypedValue min_value =
+      cost_model_->findMinValueStat(physical_plan, attribute, &min_value_is_exact);
+  const TypedValue max_value =
+      cost_model_->findMaxValueStat(physical_plan, attribute, &max_value_is_exact);
+  if (min_value.isNull() || max_value.isNull() ||
+      (!min_value_is_exact) || (!max_value_is_exact)) {
+    return false;
+  }
+
+  switch (attribute->getValueType().getTypeID()) {
+    case TypeID::kInt: {
+      *min_cpp_value = min_value.getLiteral<int>();
+      *max_cpp_value = max_value.getLiteral<int>();
+      return true;
+    }
+    case TypeID::kLong: {
+      *min_cpp_value = min_value.getLiteral<std::int64_t>();
+      *max_cpp_value = max_value.getLiteral<std::int64_t>();
+      return true;
+    }
+    default:
+      return false;
+  }
+}
+
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b2d7ba1/query_optimizer/rules/AttachLIPFilters.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachLIPFilters.hpp b/query_optimizer/rules/AttachLIPFilters.hpp
index b8cfc39..36cb010 100644
--- a/query_optimizer/rules/AttachLIPFilters.hpp
+++ b/query_optimizer/rules/AttachLIPFilters.hpp
@@ -21,6 +21,7 @@
 #define QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_LIP_FILTERS_HPP_
 
 #include <cstddef>
+#include <cstdint>
 #include <map>
 #include <memory>
 #include <set>
@@ -135,6 +136,14 @@ class AttachLIPFilters : public Rule<physical::Physical> {
 
   const std::vector<LIPFilterInfoPtr>& getProbeSideInfo(const NodeList &path);
 
+  // TODO(jianqiao): refactor this method as it is a duplication of
+  // InjectJoinFilters::findExactMinMaxValuesForAttributeHelper().
+  bool findExactMinMaxValuesForAttributeHelper(
+      const physical::PhysicalPtr &physical_plan,
+      const expressions::AttributeReferencePtr &attribute,
+      std::int64_t *min_cpp_value,
+      std::int64_t *max_cpp_value) const;
+
   std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
   std::map<physical::PhysicalPtr, std::vector<LIPFilterInfoPtr>> build_side_info_;
   std::map<physical::PhysicalPtr, std::vector<LIPFilterInfoPtr>> probe_side_info_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2b2d7ba1/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 427500d..6847951 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -59,6 +59,7 @@ target_link_libraries(quickstep_queryoptimizer_rules_AttachLIPFilters
                       quickstep_queryoptimizer_physical_Selection
                       quickstep_queryoptimizer_physical_TopLevelPlan
                       quickstep_queryoptimizer_rules_Rule
+                      quickstep_types_TypeID
                       quickstep_types_TypedValue
                       quickstep_utility_Macros
                       quickstep_utility_lipfilter_LIPFilter)


[05/24] incubator-quickstep git commit: Provided more info for the OutOfMemory exception.

Posted by zu...@apache.org.
Provided more info for the OutOfMemory exception.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4be8e91a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4be8e91a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4be8e91a

Branch: refs/heads/dist-patch
Commit: 4be8e91a4acddab22f4d71f62b4247863ddc766f
Parents: f5c063a
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Feb 23 01:06:37 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Feb 23 01:06:37 2017 -0800

----------------------------------------------------------------------
 storage/StorageErrors.cpp  |  6 ++++++
 storage/StorageErrors.hpp  | 12 +++++++++++-
 storage/StorageManager.cpp |  2 +-
 3 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4be8e91a/storage/StorageErrors.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageErrors.cpp b/storage/StorageErrors.cpp
index 310ee4c..81bd997 100644
--- a/storage/StorageErrors.cpp
+++ b/storage/StorageErrors.cpp
@@ -41,6 +41,12 @@ FileWriteError::FileWriteError(const std::string &filename)
   message_.append(filename);
 }
 
+OutOfMemory::OutOfMemory(const std::size_t num_slots)
+    : message_("OutOfMemory: The system has run out of memory when allocating ") {
+  message_.append(std::to_string(num_slots));
+  message_.append(" slots");
+}
+
 TupleTooLargeForBlock::TupleTooLargeForBlock(const std::size_t tuple_size)
     : tuple_size_(tuple_size),
       message_("TupleTooLargeForBlock: ") {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4be8e91a/storage/StorageErrors.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageErrors.hpp b/storage/StorageErrors.hpp
index 67a3e42..b87f04e 100644
--- a/storage/StorageErrors.hpp
+++ b/storage/StorageErrors.hpp
@@ -141,9 +141,19 @@ class MalformedBlock : public std::exception {
  **/
 class OutOfMemory : public std::exception {
  public:
+  /**
+   * @brief Constructor.
+   *
+   * @param num_slots The number of slots to allocate.
+   **/
+  explicit OutOfMemory(const std::size_t num_slots);
+
   virtual const char* what() const throw() {
-    return "OutOfMemory: The system has run out of memory";
+    return message_.c_str();
   }
+
+ private:
+  std::string message_;
 };
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4be8e91a/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 6f7d38b..783ccfe 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -757,7 +757,7 @@ void* StorageManager::allocateSlots(const std::size_t num_slots,
 #endif
 
   if (slots == nullptr) {
-    throw OutOfMemory();
+    throw OutOfMemory(num_slots);
   }
 
 #if defined(QUICKSTEP_HAVE_LIBNUMA)