You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/01 22:41:45 UTC

[1/4] incubator-quickstep git commit: Fix a bug with SelectOperator [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/dist-cmd 2a6b5d375 -> 87bbb2629 (forced update)


Fix a bug with SelectOperator


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e896b61d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e896b61d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e896b61d

Branch: refs/heads/dist-cmd
Commit: e896b61d1932d4593cb18033fe4da23a5e238f77
Parents: e41a6aa
Author: jianqiao <ji...@cs.wisc.edu>
Authored: Wed Mar 1 13:16:47 2017 -0600
Committer: jianqiao <ji...@cs.wisc.edu>
Committed: Wed Mar 1 13:16:47 2017 -0600

----------------------------------------------------------------------
 relational_operators/SelectOperator.hpp                       | 7 +++++--
 .../tests/SortRunGenerationOperator_unittest.cpp              | 2 +-
 2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e896b61d/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index b9a4d49..df61c06 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -202,9 +202,12 @@ class SelectOperator : public RelationalOperator {
 
   bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
 
-  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+  void feedInputBlock(const block_id input_block_id,
+                      const relation_id input_relation_id,
                       const partition_id part_id) override {
-    input_relation_block_ids_[part_id].push_back(input_block_id);
+    if (input_relation_id == input_relation_.getID()) {
+      input_relation_block_ids_[part_id].push_back(input_block_id);
+    }
   }
 
   QueryContext::insert_destination_id getInsertDestinationID() const override {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e896b61d/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
index 99fafa8..acdd422 100644
--- a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
+++ b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
@@ -622,7 +622,7 @@ TEST_F(SortRunGenerationOperatorTest, 3Column_NullLast_Asc) {
 
   // Comparator for null-col-1 ASC NULLS LAST, null-col-2 ASC NULLS LAST,
   // null-col-3 ASC NULLS LAST.
-  auto comparator = [this](const Tuple &left, const Tuple &right) -> bool {
+  auto comparator = [](const Tuple &left, const Tuple &right) -> bool {
     TestTupleAttrs l = TupleToTupleAttr(left);
     TestTupleAttrs r = TupleToTupleAttr(right);
     l.c1 = l.null_c4 ? std::numeric_limits<int>::max() : l.c1;


[3/4] incubator-quickstep git commit: Style fixes in the distributed version.

Posted by zu...@apache.org.
Style fixes in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5e0c32ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5e0c32ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5e0c32ac

Branch: refs/heads/dist-cmd
Commit: 5e0c32acdf4080f4461e178aeea1af7d6eb80420
Parents: 6a240fc
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Feb 28 14:19:17 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Mar 1 14:32:11 2017 -0800

----------------------------------------------------------------------
 cli/distributed/Cli.cpp                         |   4 +-
 cli/distributed/Conductor.cpp                   |   4 +-
 .../tests/DistributedExecutionGeneratorTest.cpp |  10 +-
 .../DistributedExecutionGeneratorTestRunner.cpp |  49 ++++--
 .../DistributedExecutionGeneratorTestRunner.hpp |  45 +-----
 .../tests/execution_generator/CMakeLists.txt    | 154 ++++++++++---------
 6 files changed, 127 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e0c32ac/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 60b9c8d..6228898 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -48,8 +48,8 @@ typedef quickstep::LineReaderDumb LineReaderImpl;
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "storage/DataExchangerAsync.hpp"
-#include "utility/StringUtil.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "utility/StringUtil.hpp"
 
 #include "tmb/address.h"
 #include "tmb/id_typedefs.h"
@@ -76,7 +76,7 @@ using tmb::client_id;
 
 namespace quickstep {
 
-namespace S = ::quickstep::serialization;
+namespace S = serialization;
 
 void Cli::init() {
   cli_id_ = bus_.Connect();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e0c32ac/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index cf2eb4b..3c68bfb 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -63,13 +63,13 @@ using tmb::client_id;
 
 namespace quickstep {
 
-namespace S = ::quickstep::serialization;
+namespace S = serialization;
 
 void Conductor::init() {
   try {
     string catalog_path = FLAGS_storage_path + kCatalogFilename;
 
-    if (quickstep::FLAGS_initialize_db) {  // Initialize the database
+    if (FLAGS_initialize_db) {  // Initialize the database
       DefaultsConfigurator::InitializeDefaultDatabase(FLAGS_storage_path, catalog_path);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e0c32ac/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
index 1e2120e..b18b5ec 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
@@ -39,6 +39,8 @@ DECLARE_bool(use_filter_joins);
 
 using quickstep::TextBasedTest;
 
+using std::make_unique;
+
 QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_EXECUTION_GENERATOR_TEST);
 
 int main(int argc, char** argv) {
@@ -59,11 +61,9 @@ int main(int argc, char** argv) {
 
   std::ifstream input_file(argv[1]);
   CHECK(input_file.is_open()) << argv[1];
-  std::unique_ptr<quickstep::optimizer::DistributedExecutionGeneratorTestRunner>
-      test_runner(
-          new quickstep::optimizer::DistributedExecutionGeneratorTestRunner(argv[3]));
-  test_driver.reset(
-      new quickstep::TextBasedTestDriver(&input_file, test_runner.get()));
+
+  auto test_runner = make_unique<quickstep::optimizer::DistributedExecutionGeneratorTestRunner>(argv[3]);
+  test_driver = make_unique<quickstep::TextBasedTestDriver>(&input_file, test_runner.get());
   test_driver->registerOption(
       quickstep::optimizer::DistributedExecutionGeneratorTestRunner::kResetOption);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e0c32ac/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 6bd7a1f..3b1259a 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -37,8 +37,10 @@
 #include "query_execution/ForemanDistributed.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
+#include "query_optimizer/Optimizer.hpp"
 #include "query_optimizer/OptimizerContext.hpp"
 #include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/tests/TestDatabaseLoader.hpp"
 #include "storage/DataExchangerAsync.hpp"
 #include "storage/StorageManager.hpp"
 #include "utility/MemStream.hpp"
@@ -67,6 +69,8 @@ namespace optimizer {
 
 namespace {
 
+constexpr int kNumInstances = 3;
+
 void nop() {}
 
 }  // namespace
@@ -147,6 +151,35 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
   }
 }
 
+DistributedExecutionGeneratorTestRunner::~DistributedExecutionGeneratorTestRunner() {
+  const tmb::MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, foreman_->getBusClientID(), TaggedMessage(kPoisonMessage));
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+
+  for (int i = 0; i < kNumInstances; ++i) {
+    workers_[i]->join();
+    shiftbosses_[i]->join();
+  }
+
+  foreman_->join();
+
+  test_database_loader_data_exchanger_.shutdown();
+  test_database_loader_.reset();
+  for (int i = 0; i < kNumInstances; ++i) {
+    data_exchangers_[i].shutdown();
+    storage_managers_[i].reset();
+  }
+
+  CHECK(MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, locator_client_id_, TaggedMessage(kPoisonMessage)));
+
+  test_database_loader_data_exchanger_.join();
+  for (int i = 0; i < kNumInstances; ++i) {
+    data_exchangers_[i].join();
+  }
+  block_locator_->join();
+}
+
 void DistributedExecutionGeneratorTestRunner::runTestCase(
     const string &input, const std::set<string> &options, string *output) {
   // TODO(qzeng): Test multi-threaded query execution when we have a Sort operator.
@@ -174,27 +207,23 @@ void DistributedExecutionGeneratorTestRunner::runTestCase(
     const ParseStatement &parse_statement = *result.parsed_statement;
     std::printf("%s\n", parse_statement.toString().c_str());
 
-    const CatalogRelation *query_result_relation = nullptr;
+    auto query_handle = std::make_unique<QueryHandle>(query_id_++, cli_id_);
     try {
       OptimizerContext optimizer_context;
-      auto query_handle = std::make_unique<QueryHandle>(query_id_++, cli_id_);
-
       optimizer_.generateQueryHandle(parse_statement,
                                      test_database_loader_->catalog_database(),
                                      &optimizer_context,
                                      query_handle.get());
-      query_result_relation = query_handle->getQueryResultRelation();
-
-      QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
-          cli_id_,
-          foreman_->getBusClientID(),
-          query_handle.release(),
-          &bus_);
     } catch (const SqlError &error) {
       *output = error.formatMessage(input);
       break;
     }
 
+    const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
+
+    QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+        cli_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
+
     const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true);
     DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type());
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e0c32ac/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
index 2cd2427..2c0381b 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -31,7 +31,6 @@
 #include "query_execution/BlockLocator.hpp"
 #include "query_execution/ForemanDistributed.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
-#include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/Shiftboss.hpp"
 #include "query_execution/Worker.hpp"
 #include "query_execution/WorkerDirectory.hpp"
@@ -45,16 +44,10 @@
 #include "glog/logging.h"
 
 #include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
 
 namespace quickstep {
 namespace optimizer {
 
-namespace {
-constexpr int kNumInstances = 3;
-}  // namespace
-
 /**
  * @brief TextBasedTestRunner for testing the ExecutionGenerator in the
  *        distributed version.
@@ -72,43 +65,7 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner {
    */
   explicit DistributedExecutionGeneratorTestRunner(const std::string &storage_path);
 
-  ~DistributedExecutionGeneratorTestRunner() {
-    tmb::TaggedMessage poison_tagged_message(quickstep::kPoisonMessage);
-
-    const tmb::MessageBus::SendStatus send_status =
-        QueryExecutionUtil::SendTMBMessage(
-            &bus_,
-            cli_id_,
-            foreman_->getBusClientID(),
-            std::move(poison_tagged_message));
-    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
-
-    for (int i = 0; i < kNumInstances; ++i) {
-      workers_[i]->join();
-      shiftbosses_[i]->join();
-    }
-
-    foreman_->join();
-
-    test_database_loader_data_exchanger_.shutdown();
-    test_database_loader_.reset();
-    for (int i = 0; i < kNumInstances; ++i) {
-      data_exchangers_[i].shutdown();
-      storage_managers_[i].reset();
-    }
-
-    CHECK(MessageBus::SendStatus::kOK ==
-        QueryExecutionUtil::SendTMBMessage(&bus_,
-                                           cli_id_,
-                                           locator_client_id_,
-                                           tmb::TaggedMessage(quickstep::kPoisonMessage)));
-
-    test_database_loader_data_exchanger_.join();
-    for (int i = 0; i < kNumInstances; ++i) {
-      data_exchangers_[i].join();
-    }
-    block_locator_->join();
-  }
+  ~DistributedExecutionGeneratorTestRunner();
 
   void runTestCase(const std::string &input,
                    const std::set<std::string> &options,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e0c32ac/query_optimizer/tests/execution_generator/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt
index d38f4aa..40629ee 100644
--- a/query_optimizer/tests/execution_generator/CMakeLists.txt
+++ b/query_optimizer/tests/execution_generator/CMakeLists.txt
@@ -15,68 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-if (ENABLE_DISTRIBUTED)
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_create
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_delete
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Delete.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_distinct
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Distinct.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_drop
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Drop.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_index
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Index.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_insert
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Insert.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_join
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Join.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_partition
-           "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Partition.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedPartition.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedPartition/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_select
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Select.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_stringpatternmatching
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/StringPatternMatching.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_tablegenerator
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/TableGenerator.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator/")
-  add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_update
-           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
-           "${CMAKE_CURRENT_SOURCE_DIR}/Update.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate.test"
-           "${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate/")
-endif(ENABLE_DISTRIBUTED)
 add_test(quickstep_queryoptimizer_tests_executiongenerator_create
          "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
          "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
@@ -138,25 +76,74 @@ add_test(quickstep_queryoptimizer_tests_executiongenerator_update
          "${CMAKE_CURRENT_BINARY_DIR}/Update.test"
          "${CMAKE_CURRENT_BINARY_DIR}/Update/")
 
+if (ENABLE_DISTRIBUTED)
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_create_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/CreateDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/CreateDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_delete_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Delete.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DeleteDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DeleteDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_distinct_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Distinct.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistinctDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DistinctDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_drop_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Drop.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DropDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DropDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_index_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Index.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/IndexDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/IndexDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_insert_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Insert.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/InsertDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/InsertDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_join_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Join.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/JoinDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/JoinDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_partition_distributed
+           "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Partition.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/PartitionDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/PartitionDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_select_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Select.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/SelectDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/SelectDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_stringpatternmatching_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/StringPatternMatching.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatchingDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatchingDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_tablegenerator_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/TableGenerator.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/TableGeneratorDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/TableGeneratorDistributed/")
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_update_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Update.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/UpdateDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/UpdateDistributed/")
+endif(ENABLE_DISTRIBUTED)
+
 # Create the folders where the unit tests will store their data blocks for the
 # duration of their test.
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Create)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Delete)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Distinct)
-if (ENABLE_DISTRIBUTED)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedPartition)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator)
-  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate)
-endif(ENABLE_DISTRIBUTED)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Drop)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Index)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Insert)
@@ -166,3 +153,18 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Select)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatching)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/TableGenerator)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)
+
+if (ENABLE_DISTRIBUTED)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/CreateDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DeleteDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistinctDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DropDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/IndexDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/InsertDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/JoinDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/PartitionDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/SelectDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatchingDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/TableGeneratorDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/UpdateDistributed)
+endif(ENABLE_DISTRIBUTED)


[4/4] incubator-quickstep git commit: Added command support in the distributed version.

Posted by zu...@apache.org.
Added command support in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/87bbb262
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/87bbb262
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/87bbb262

Branch: refs/heads/dist-cmd
Commit: 87bbb2629ddc8f09c997612fca2cd700ce95d040
Parents: 5e0c32a
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Feb 27 00:30:43 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Mar 1 14:41:27 2017 -0800

----------------------------------------------------------------------
 CMakeLists.txt                                  |   2 +
 cli/distributed/CMakeLists.txt                  |   3 +
 cli/distributed/Cli.cpp                         |  72 ++++--
 cli/distributed/Conductor.cpp                   |  81 +++++-
 cli/distributed/Conductor.hpp                   |   3 +
 cli/tests/CMakeLists.txt                        |  41 ++++
 cli/tests/DistributedCommandExecutorTest.cpp    |  62 +++++
 .../DistributedCommandExecutorTestRunner.cpp    | 246 +++++++++++++++++++
 .../DistributedCommandExecutorTestRunner.hpp    |  99 ++++++++
 cli/tests/command_executor/CMakeLists.txt       |  18 ++
 query_execution/QueryExecutionMessages.proto    |   8 +
 query_execution/QueryExecutionTypedefs.hpp      |  10 +-
 12 files changed, 624 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 918069c..9cd02be 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -817,6 +817,7 @@ if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_distributed_cli_shell
                         glog
                         quickstep_catalog_CatalogRelation
+                        quickstep_cli_Constants
                         quickstep_cli_Flags
                         quickstep_cli_LineReader
                         quickstep_cli_PrintToScreen
@@ -833,6 +834,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_storage_StorageBlockInfo
                         quickstep_storage_StorageManager
                         quickstep_utility_Macros
+                        quickstep_utility_SqlError
                         quickstep_utility_StringUtil
                         tmb
                         ${GFLAGS_LIB_NAME}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/distributed/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt
index 5804321..1f7dee0 100644
--- a/cli/distributed/CMakeLists.txt
+++ b/cli/distributed/CMakeLists.txt
@@ -26,6 +26,8 @@ add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp)
 target_link_libraries(quickstep_cli_distributed_Conductor
                       glog
                       quickstep_catalog_CatalogDatabase
+                      quickstep_cli_CommandExecutorUtil
+                      quickstep_cli_Constants
                       quickstep_cli_DefaultsConfigurator
                       quickstep_cli_Flags
                       quickstep_cli_distributed_Role
@@ -41,6 +43,7 @@ target_link_libraries(quickstep_cli_distributed_Conductor
                       quickstep_storage_StorageConstants
                       quickstep_utility_Macros
                       quickstep_utility_SqlError
+                      quickstep_utility_StringUtil
                       tmb)
 target_link_libraries(quickstep_cli_distributed_Executor
                       glog

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 6228898..49b7dc1 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -30,6 +30,7 @@
 
 #include "catalog/CatalogRelation.hpp"
 #include "cli/CliConfig.h"  // For QUICKSTEP_USE_LINENOISE.
+#include "cli/Constants.hpp"
 #include "cli/Flags.hpp"
 
 #ifdef QUICKSTEP_USE_LINENOISE
@@ -49,6 +50,7 @@ typedef quickstep::LineReaderDumb LineReaderImpl;
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "storage/DataExchangerAsync.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "utility/SqlError.hpp"
 #include "utility/StringUtil.hpp"
 
 #include "tmb/address.h"
@@ -76,6 +78,7 @@ using tmb::client_id;
 
 namespace quickstep {
 
+namespace C = cli;
 namespace S = serialization;
 
 void Cli::init() {
@@ -127,6 +130,10 @@ void Cli::init() {
   bus_.RegisterClientAsSender(cli_id_, kQueryResultTeardownMessage);
 
   bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage);
+
+  // Prepare for submitting a command.
+  bus_.RegisterClientAsSender(cli_id_, kCommandMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kCommandResponseMessage);
 }
 
 void Cli::run() {
@@ -158,27 +165,51 @@ void Cli::run() {
           break;
         }
 
-        CHECK_NE(statement.getStatementType(), ParseStatement::kCommand)
-            << "TODO(quickstep-team)";
+        if (statement.getStatementType() == ParseStatement::kCommand) {
+          const ParseCommand &command = static_cast<const ParseCommand &>(statement);
+          const std::string &command_str = command.command()->value();
+          try {
+            if (command_str == C::kAnalyzeCommand) {
+              // TODO(zuyu): support '\analyze'.
+              THROW_SQL_ERROR_AT(command.command()) << "Unsupported Command";
+            } else if (command_str != C::kDescribeDatabaseCommand &&
+                       command_str != C::kDescribeTableCommand) {
+              THROW_SQL_ERROR_AT(command.command()) << "Invalid Command";
+            }
+          } catch (const SqlError &error) {
+            fprintf(stderr, "%s", error.formatMessage(*command_string).c_str());
+            reset_parser = true;
+            break;
+          }
+
+          DLOG(INFO) << "DistributedCli sent CommandMessage (typed '" << kCommandMessage
+                     << "') to Conductor";
+          S::CommandMessage proto;
+          proto.set_command(*command_string);
+
+          const size_t proto_length = proto.ByteSize();
+          char *proto_bytes = static_cast<char*>(malloc(proto_length));
+          CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-        DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
-                   << "') to Conductor";
-        S::SqlQueryMessage proto;
-        proto.set_sql_query(*command_string);
+          TaggedMessage command_message(static_cast<const void*>(proto_bytes), proto_length, kCommandMessage);
+          free(proto_bytes);
 
-        const size_t proto_length = proto.ByteSize();
-        char *proto_bytes = static_cast<char*>(malloc(proto_length));
-        CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+          QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(command_message));
+        } else {
+          DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
+                     << "') to Conductor";
+          S::SqlQueryMessage proto;
+          proto.set_sql_query(*command_string);
 
-        TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes),
-                                        proto_length,
-                                        kSqlQueryMessage);
-        free(proto_bytes);
+          const size_t proto_length = proto.ByteSize();
+          char *proto_bytes = static_cast<char*>(malloc(proto_length));
+          CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-        QueryExecutionUtil::SendTMBMessage(&bus_,
-                                           cli_id_,
-                                           conductor_client_id_,
-                                           move(sql_query_message));
+          TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), proto_length, kSqlQueryMessage);
+          free(proto_bytes);
+
+          QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(sql_query_message));
+        }
 
         start = std::chrono::steady_clock::now();
 
@@ -187,6 +218,13 @@ void Cli::run() {
         DLOG(INFO) << "DistributedCli received typed '" << tagged_message.message_type()
                    << "' message from client " << annotated_message.sender;
         switch (tagged_message.message_type()) {
+          case kCommandResponseMessage: {
+            S::CommandResponseMessage proto;
+            CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+            printf("%s", proto.command_response().c_str());
+            break;
+          }
           case kQueryExecutionSuccessMessage: {
             end = std::chrono::steady_clock::now();
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index 3c68bfb..b877b04 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -29,6 +29,8 @@
 #include <utility>
 
 #include "catalog/CatalogDatabase.hpp"
+#include "cli/CommandExecutorUtil.hpp"
+#include "cli/Constants.hpp"
 #include "cli/DefaultsConfigurator.hpp"
 #include "cli/Flags.hpp"
 #include "parser/ParseStatement.hpp"
@@ -42,6 +44,7 @@
 #include "query_optimizer/QueryProcessor.hpp"
 #include "storage/StorageConstants.hpp"
 #include "utility/SqlError.hpp"
+#include "utility/StringUtil.hpp"
 
 #include "tmb/id_typedefs.h"
 #include "tmb/native_net_client_message_bus.h"
@@ -63,6 +66,7 @@ using tmb::client_id;
 
 namespace quickstep {
 
+namespace C = cli;
 namespace S = serialization;
 
 void Conductor::init() {
@@ -91,6 +95,9 @@ void Conductor::init() {
   bus_.RegisterClientAsReceiver(conductor_client_id_, kDistributedCliRegistrationMessage);
   bus_.RegisterClientAsSender(conductor_client_id_, kDistributedCliRegistrationResponseMessage);
 
+  bus_.RegisterClientAsReceiver(conductor_client_id_, kCommandMessage);
+  bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage);
+
   bus_.RegisterClientAsReceiver(conductor_client_id_, kSqlQueryMessage);
   bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
   bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
@@ -125,6 +132,14 @@ void Conductor::run() {
             QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
         break;
       }
+      case kCommandMessage: {
+        S::CommandMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+        DLOG(INFO) << "Conductor received the following command: " << proto.command();
+
+        processCommandMessage(sender, new string(move(proto.command())));
+        break;
+      }
       case kSqlQueryMessage: {
         S::SqlQueryMessage proto;
         CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -146,6 +161,69 @@ void Conductor::run() {
   }
 }
 
+void Conductor::processCommandMessage(const tmb::client_id sender, string *command_string) {
+  parser_wrapper_.feedNextBuffer(command_string);
+  ParseResult parse_result = parser_wrapper_.getNextStatement();
+
+  CHECK(parse_result.condition == ParseResult::kSuccess)
+      << "Any syntax error should be addressed in the DistributedCli.";
+
+  const ParseStatement &statement = *parse_result.parsed_statement;
+  DCHECK_EQ(ParseStatement::kCommand, statement.getStatementType());
+
+  const ParseCommand &command = static_cast<const ParseCommand &>(statement);
+  const PtrVector<ParseString> &arguments = *(command.arguments());
+  const string &command_str = command.command()->value();
+
+  string command_response;
+
+  try {
+    if (command_str == C::kDescribeDatabaseCommand) {
+      command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
+    } else if (command_str == C::kDescribeTableCommand) {
+      if (arguments.empty()) {
+        command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
+      } else {
+        command_response = C::ExecuteDescribeTable(arguments, *catalog_database_);
+      }
+    }
+  } catch (const SqlError &command_error) {
+    // Set the query execution status along with the error message.
+    S::QueryExecutionErrorMessage proto;
+    proto.set_error_message(command_error.formatMessage(*command_string));
+
+    const size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kQueryExecutionErrorMessage);
+    free(proto_bytes);
+
+    DLOG(INFO) << "Conductor sent QueryExecutionErrorMessage (typed '"
+               << kQueryExecutionErrorMessage
+               << "') to Distributed CLI " << sender;
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+  }
+
+  S::CommandResponseMessage proto;
+  proto.set_command_response(command_response);
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
+  free(proto_bytes);
+
+  DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage
+             << "') to Distributed CLI " << sender;
+  CHECK(MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+}
+
 void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
   parser_wrapper_.feedNextBuffer(command_string);
   ParseResult parse_result = parser_wrapper_.getNextStatement();
@@ -154,8 +232,7 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
       << "Any SQL syntax error should be addressed in the DistributedCli.";
 
   const ParseStatement &statement = *parse_result.parsed_statement;
-  CHECK(statement.getStatementType() != ParseStatement::kCommand)
-     << "TODO(quickstep-team)";
+  DCHECK_NE(ParseStatement::kCommand, statement.getStatementType());
 
   try {
     auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index 09bf2b9..e7e003f 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -35,6 +35,7 @@
 namespace quickstep {
 
 class CatalogDatabase;
+class ParseCommand;
 
 /** \addtogroup CliDistributed
  *  @{
@@ -60,6 +61,8 @@ class Conductor final : public Role {
   void run() override;
 
  private:
+  void processCommandMessage(const tmb::client_id sender, std::string *command_string);
+
   void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string);
 
   SqlParserWrapper parser_wrapper_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/tests/CMakeLists.txt b/cli/tests/CMakeLists.txt
index 48f27bb..7f8150f 100644
--- a/cli/tests/CMakeLists.txt
+++ b/cli/tests/CMakeLists.txt
@@ -23,6 +23,14 @@ add_executable(quickstep_cli_tests_CommandExecutorTest
                CommandExecutorTestRunner.hpp
                "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
                "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
+if (ENABLE_DISTRIBUTED)
+  add_executable(quickstep_cli_tests_DistributedCommandExecutorTest
+                 DistributedCommandExecutorTest.cpp
+                 DistributedCommandExecutorTestRunner.cpp
+                 DistributedCommandExecutorTestRunner.hpp
+                 "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
+                 "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
+endif(ENABLE_DISTRIBUTED)
 
 target_link_libraries(quickstep_cli_tests_CommandExecutorTest
                       glog
@@ -49,3 +57,36 @@ target_link_libraries(quickstep_cli_tests_CommandExecutorTest
                       quickstep_utility_TextBasedTestDriver
                       tmb
                       ${LIBS})
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_cli_tests_DistributedCommandExecutorTest
+                        glog
+                        gtest
+                        quickstep_catalog_CatalogTypedefs
+                        quickstep_cli_CommandExecutorUtil
+                        quickstep_cli_Constants
+                        quickstep_cli_DropRelation
+                        quickstep_cli_PrintToScreen
+                        quickstep_parser_ParseStatement
+                        quickstep_parser_SqlParserWrapper
+                        quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_BlockLocatorUtil
+                        quickstep_queryexecution_ForemanDistributed
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryexecution_Shiftboss
+                        quickstep_queryexecution_Worker
+                        quickstep_queryexecution_WorkerDirectory
+                        quickstep_queryoptimizer_Optimizer
+                        quickstep_queryoptimizer_OptimizerContext
+                        quickstep_queryoptimizer_QueryHandle
+                        quickstep_queryoptimizer_tests_TestDatabaseLoader
+                        quickstep_storage_DataExchangerAsync
+                        quickstep_storage_StorageManager
+                        quickstep_utility_Macros
+                        quickstep_utility_MemStream
+                        quickstep_utility_SqlError
+                        quickstep_utility_TextBasedTestDriver
+                        tmb
+                        ${GFLAGS_LIB_NAME}
+                        ${LIBS})
+endif(ENABLE_DISTRIBUTED)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/tests/DistributedCommandExecutorTest.cpp
----------------------------------------------------------------------
diff --git a/cli/tests/DistributedCommandExecutorTest.cpp b/cli/tests/DistributedCommandExecutorTest.cpp
new file mode 100644
index 0000000..b41a70f
--- /dev/null
+++ b/cli/tests/DistributedCommandExecutorTest.cpp
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include <iostream>
+#include <fstream>
+#include <memory>
+
+#include "cli/tests/DistributedCommandExecutorTestRunner.hpp"
+#include "utility/textbased_test/TextBasedTestDriver.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+using quickstep::TextBasedTest;
+
+using std::make_unique;
+
+QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_COMMAND_EXECUTOR_TEST);
+
+int main(int argc, char** argv) {
+  google::InitGoogleLogging(argv[0]);
+  // Honor FLAGS_buffer_pool_slots in StorageManager.
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  if (argc < 4) {
+    LOG(ERROR) << "Must have at least 3 arguments, but " << argc - 1
+               << " are provided";
+  }
+
+  std::ifstream input_file(argv[1]);
+  CHECK(input_file.is_open()) << argv[1];
+
+  auto test_runner = make_unique<quickstep::DistributedCommandExecutorTestRunner>(argv[3]);
+  test_driver = make_unique<quickstep::TextBasedTestDriver>(&input_file, test_runner.get());
+  test_driver->registerOption(
+      quickstep::DistributedCommandExecutorTestRunner::kResetOption);
+
+  ::testing::InitGoogleTest(&argc, argv);
+  const int success = RUN_ALL_TESTS();
+  if (success != 0) {
+    test_driver->writeActualOutputToFile(argv[2]);
+  }
+
+  return success;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/tests/DistributedCommandExecutorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/cli/tests/DistributedCommandExecutorTestRunner.cpp b/cli/tests/DistributedCommandExecutorTestRunner.cpp
new file mode 100644
index 0000000..66d0767
--- /dev/null
+++ b/cli/tests/DistributedCommandExecutorTestRunner.cpp
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/tests/DistributedCommandExecutorTestRunner.hpp"
+
+#include <cstdio>
+#include <functional>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "cli/CommandExecutorUtil.hpp"
+#include "cli/Constants.hpp"
+#include "cli/DropRelation.hpp"
+#include "cli/PrintToScreen.hpp"
+#include "parser/ParseStatement.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_optimizer/Optimizer.hpp"
+#include "query_optimizer/OptimizerContext.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/tests/TestDatabaseLoader.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/MemStream.hpp"
+#include "utility/SqlError.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::make_unique;
+using std::string;
+using std::vector;
+
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+class CatalogRelation;
+
+namespace {
+
+void nop() {}
+
+}  // namespace
+
+namespace C = cli;
+
+const char *DistributedCommandExecutorTestRunner::kResetOption =
+    "reset_before_execution";
+
+DistributedCommandExecutorTestRunner::DistributedCommandExecutorTestRunner(const string &storage_path)
+    : query_id_(0) {
+  bus_.Initialize();
+
+  cli_id_ = bus_.Connect();
+  bus_.RegisterClientAsSender(cli_id_, kAdmitRequestMessage);
+  bus_.RegisterClientAsSender(cli_id_, kPoisonMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
+
+  bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage);
+
+  block_locator_ = make_unique<BlockLocator>(&bus_);
+  block_locator_->start();
+
+  test_database_loader_ = make_unique<optimizer::TestDatabaseLoader>(
+      storage_path,
+      block_locator::getBlockDomain(
+          test_database_loader_data_exchanger_.network_address(), cli_id_, &locator_client_id_, &bus_),
+      locator_client_id_,
+      &bus_);
+  DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_);
+  test_database_loader_data_exchanger_.set_storage_manager(test_database_loader_->storage_manager());
+  test_database_loader_data_exchanger_.start();
+
+  test_database_loader_->createTestRelation(false /* allow_vchar */);
+  test_database_loader_->loadTestRelation();
+
+  // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
+  // could receive a registration message from the latter.
+  foreman_ = make_unique<ForemanDistributed>(*block_locator_, std::bind(&nop), &bus_,
+                                             test_database_loader_->catalog_database());
+
+  // We don't use the NUMA aware version of worker code.
+  const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,
+                                        kAnyNUMANodeID);
+
+  bus_local_.Initialize();
+
+  worker_ = make_unique<Worker>(0 /* worker_thread_index */, &bus_local_);
+
+  const vector<tmb::client_id> worker_client_ids(1, worker_->getBusClientID());
+  worker_directory_ = make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids, numa_nodes);
+
+  storage_manager_ = make_unique<StorageManager>(
+      storage_path,
+      block_locator::getBlockDomain(
+          data_exchanger_.network_address(), cli_id_, &locator_client_id_, &bus_),
+      locator_client_id_, &bus_);
+  DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_);
+
+  data_exchanger_.set_storage_manager(storage_manager_.get());
+  shiftboss_ =
+      make_unique<Shiftboss>(&bus_, &bus_local_, storage_manager_.get(), worker_directory_.get(),
+                             storage_manager_->hdfs());
+
+  foreman_->start();
+
+  data_exchanger_.start();
+  shiftboss_->start();
+  worker_->start();
+}
+
+DistributedCommandExecutorTestRunner::~DistributedCommandExecutorTestRunner() {
+  const tmb::MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, foreman_->getBusClientID(), TaggedMessage(kPoisonMessage));
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+
+  worker_->join();
+  shiftboss_->join();
+
+  foreman_->join();
+
+  test_database_loader_data_exchanger_.shutdown();
+  test_database_loader_.reset();
+  data_exchanger_.shutdown();
+  storage_manager_.reset();
+
+  CHECK(MessageBus::SendStatus::kOK ==
+      QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, locator_client_id_, TaggedMessage(kPoisonMessage)));
+
+  test_database_loader_data_exchanger_.join();
+  data_exchanger_.join();
+  block_locator_->join();
+}
+
+void DistributedCommandExecutorTestRunner::runTestCase(
+    const string &input, const std::set<string> &options, string *output) {
+  // TODO(qzeng): Test multi-threaded query execution when we have a Sort operator.
+
+  VLOG(4) << "Test SQL(s): " << input;
+
+  if (options.find(kResetOption) != options.end()) {
+    test_database_loader_->clear();
+    test_database_loader_->createTestRelation(false /* allow_vchar */);
+    test_database_loader_->loadTestRelation();
+  }
+
+  MemStream output_stream;
+  sql_parser_.feedNextBuffer(new string(input));
+
+  while (true) {
+    ParseResult result = sql_parser_.getNextStatement();
+    if (result.condition != ParseResult::kSuccess) {
+      if (result.condition == ParseResult::kError) {
+        *output = result.error_message;
+      }
+      break;
+    }
+
+    const ParseStatement &parse_statement = *result.parsed_statement;
+    std::printf("%s\n", parse_statement.toString().c_str());
+
+    try {
+      if (parse_statement.getStatementType() == ParseStatement::kCommand) {
+        const ParseCommand &command = static_cast<const ParseCommand &>(parse_statement);
+        const PtrVector<ParseString> &arguments = *(command.arguments());
+        const string &command_str = command.command()->value();
+
+        string command_response;
+        if (command_str == C::kDescribeDatabaseCommand) {
+          command_response = C::ExecuteDescribeDatabase(arguments, *test_database_loader_->catalog_database());
+        } else if (command_str == C::kDescribeTableCommand) {
+          if (arguments.empty()) {
+            command_response = C::ExecuteDescribeDatabase(arguments, *test_database_loader_->catalog_database());
+          } else {
+            command_response = C::ExecuteDescribeTable(arguments, *test_database_loader_->catalog_database());
+          }
+        } else {
+          THROW_SQL_ERROR_AT(command.command()) << "Unsupported command";
+        }
+
+        std::fprintf(output_stream.file(), "%s", command_response.c_str());
+      } else {
+        optimizer::OptimizerContext optimizer_context;
+        auto query_handle = std::make_unique<QueryHandle>(query_id_++, cli_id_);
+
+        optimizer_.generateQueryHandle(parse_statement,
+                                       test_database_loader_->catalog_database(),
+                                       &optimizer_context,
+                                       query_handle.get());
+        const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
+
+        QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+            cli_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
+
+        const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true);
+        DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type());
+
+        if (query_result_relation) {
+          PrintToScreen::PrintRelation(*query_result_relation,
+                                       test_database_loader_->storage_manager(),
+                                       output_stream.file());
+          DropRelation::Drop(*query_result_relation,
+                             test_database_loader_->catalog_database(),
+                             test_database_loader_->storage_manager());
+        }
+      }
+    } catch (const SqlError &error) {
+      *output = error.formatMessage(input);
+      break;
+    }
+  }
+
+  if (output->empty()) {
+    *output = output_stream.str();
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/tests/DistributedCommandExecutorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/cli/tests/DistributedCommandExecutorTestRunner.hpp b/cli/tests/DistributedCommandExecutorTestRunner.hpp
new file mode 100644
index 0000000..0427a85
--- /dev/null
+++ b/cli/tests/DistributedCommandExecutorTestRunner.hpp
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_TESTS_DISTRIBUTED_COMMAND_EXECUTOR_TEST_RUNNER_HPP_
+#define QUICKSTEP_CLI_TESTS_DISTRIBUTED_COMMAND_EXECUTOR_TEST_RUNNER_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "query_optimizer/Optimizer.hpp"
+#include "query_optimizer/tests/TestDatabaseLoader.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/Macros.hpp"
+#include "utility/textbased_test/TextBasedTestRunner.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/**
+ * @brief TextBasedTestRunner for testing the CommandExecutor in the
+ *        distributed version.
+ */
+class DistributedCommandExecutorTestRunner : public TextBasedTestRunner {
+ public:
+  /**
+   * @brief If this option is enabled, recreate the entire database and
+   *        repopulate the data before every test.
+   */
+  static const char *kResetOption;
+
+  /**
+   * @brief Constructor.
+   */
+  explicit DistributedCommandExecutorTestRunner(const std::string &storage_path);
+
+  ~DistributedCommandExecutorTestRunner();
+
+  void runTestCase(const std::string &input,
+                   const std::set<std::string> &options,
+                   std::string *output) override;
+
+ private:
+  std::size_t query_id_;
+
+  SqlParserWrapper sql_parser_;
+  std::unique_ptr<optimizer::TestDatabaseLoader> test_database_loader_;
+  DataExchangerAsync test_database_loader_data_exchanger_;
+  optimizer::Optimizer optimizer_;
+
+  MessageBusImpl bus_;
+  tmb::client_id cli_id_, locator_client_id_;
+
+  std::unique_ptr<BlockLocator> block_locator_;
+
+  std::unique_ptr<ForemanDistributed> foreman_;
+
+  MessageBusImpl bus_local_;
+  std::unique_ptr<Worker> worker_;
+  std::unique_ptr<WorkerDirectory> worker_directory_;
+  DataExchangerAsync data_exchanger_;
+  std::unique_ptr<StorageManager> storage_manager_;
+  std::unique_ptr<Shiftboss> shiftboss_;
+
+  DISALLOW_COPY_AND_ASSIGN(DistributedCommandExecutorTestRunner);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_CLI_TESTS_DISTRIBUTED_COMMAND_EXECUTOR_TEST_RUNNER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/tests/command_executor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/tests/command_executor/CMakeLists.txt b/cli/tests/command_executor/CMakeLists.txt
index 9cf1869..e62d954 100644
--- a/cli/tests/command_executor/CMakeLists.txt
+++ b/cli/tests/command_executor/CMakeLists.txt
@@ -26,7 +26,25 @@ add_test(quickstep_cli_tests_commandexecutor_dt
          "${CMAKE_CURRENT_BINARY_DIR}/Dt.test"
          "${CMAKE_CURRENT_BINARY_DIR}/Dt/")
 
+if (ENABLE_DISTRIBUTED)
+  add_test(quickstep_cli_tests_commandexecutor_d_distributed
+           "../quickstep_cli_tests_DistributedCommandExecutorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/D.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DDistributed/")
+  add_test(quickstep_cli_tests_commandexecutor_dt_distributed
+           "../quickstep_cli_tests_DistributedCommandExecutorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/Dt.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DtDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/DtDistributed/")
+endif(ENABLE_DISTRIBUTED)
+
 # Create the folders where the unit tests will store their data blocks for the
 # duration of their test.
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/D)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Dt)
+
+if (ENABLE_DISTRIBUTED)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DDistributed)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DtDistributed)
+endif(ENABLE_DISTRIBUTED)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 68f286d..47246d8 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -81,6 +81,10 @@ message ShiftbossRegistrationResponseMessage {
   required uint64 shiftboss_index = 1;
 }
 
+message CommandMessage {
+  required string command = 1;
+}
+
 message SqlQueryMessage {
   required string sql_query = 1;
 }
@@ -134,6 +138,10 @@ message SaveQueryResultResponseMessage {
   required uint64 shiftboss_index = 4;
 }
 
+message CommandResponseMessage {
+  required string command_response = 1;
+}
+
 message QueryExecutionSuccessMessage {
   optional CatalogRelationSchema result_relation = 1;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 994bd60..0fd0bdf 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -89,7 +89,11 @@ enum QueryExecutionMessageType : message_type_id {
                                           // Shiftboss to Worker.
   kDistributedCliRegistrationMessage,  // From CLI to Conductor.
   kDistributedCliRegistrationResponseMessage,  // From Conductor to CLI.
-  kSqlQueryMessage,  // From CLI to Conductor.
+
+  // From CLI to Conductor.
+  kCommandMessage,
+  kSqlQueryMessage,
+
   kQueryInitiateMessage,  // From Foreman to Shiftboss.
   kQueryInitiateResponseMessage,  // From Shiftboss to Foreman.
 
@@ -101,8 +105,10 @@ enum QueryExecutionMessageType : message_type_id {
   kSaveQueryResultMessage,  // From Foreman to Shiftboss.
   kSaveQueryResultResponseMessage,  // From Shiftboss to Foreman.
 
+  kQueryExecutionSuccessMessage,  // From Foreman to CLI.
+
   // From Foreman / Conductor to CLI.
-  kQueryExecutionSuccessMessage,
+  kCommandResponseMessage,
   kQueryExecutionErrorMessage,
 
   kQueryResultTeardownMessage,  // From CLI to Conductor.



[2/4] incubator-quickstep git commit: Init release scripts Includes: Script to do common release tasks - subcommand to create artifacts - subcommand to publish release candidates - subcommand to test a release candidate

Posted by zu...@apache.org.
Init release scripts
Includes:
Script to do common release tasks
- subcommand to create artifacts
- subcommand to publish release candidates
- subcommand to test a release candidate

All commands are parameterized by the .profile file


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/6a240fcd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/6a240fcd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/6a240fcd

Branch: refs/heads/dist-cmd
Commit: 6a240fcd1452abb7870a9e8ec82ed8f60f75e9c4
Parents: e896b61
Author: cramja <ma...@gmail.com>
Authored: Tue Feb 28 11:13:30 2017 -0600
Committer: cramja <ma...@gmail.com>
Committed: Wed Mar 1 16:07:58 2017 -0600

----------------------------------------------------------------------
 .gitattributes                  |   1 +
 release/.gitignore              |   3 +
 release/README.md               |  34 +++++++
 release/release_cmds.sh         | 172 +++++++++++++++++++++++++++++++++++
 release/release_manager.profile |  29 ++++++
 5 files changed, 239 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a240fcd/.gitattributes
----------------------------------------------------------------------
diff --git a/.gitattributes b/.gitattributes
new file mode 100644
index 0000000..c0aaef0
--- /dev/null
+++ b/.gitattributes
@@ -0,0 +1 @@
+release export-ignore

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a240fcd/release/.gitignore
----------------------------------------------------------------------
diff --git a/release/.gitignore b/release/.gitignore
new file mode 100644
index 0000000..0716d9f
--- /dev/null
+++ b/release/.gitignore
@@ -0,0 +1,3 @@
+.release_manager.profile
+svn-*
+apache-*

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a240fcd/release/README.md
----------------------------------------------------------------------
diff --git a/release/README.md b/release/README.md
new file mode 100644
index 0000000..5776e09
--- /dev/null
+++ b/release/README.md
@@ -0,0 +1,34 @@
+## Scripts to help release Quickstep
+
+### Preqs
+
+You must fill out `release_manager.profile` first.
+
+You'll need:
+  * md5sum
+  * shasum
+  * svn
+  * gpg
+  * tar
+  * gzip
+
+This is of course in addition to the regular build tools.
+
+### Usage Overview
+
+```
+# Test a candidate
+./release_cmds.sh test
+
+# Create candidate artifacts
+./release_cmds.sh create
+
+# Publish candidate artifacts (after you've created them)
+./release_cmds.sh publish
+```
+
+* A full guide to releases can be found on [confluence][cwiki-howto].
+* PGP key generation is discussed [here][pgp-keys]
+
+[cwiki-howto]: https://cwiki.apache.org/confluence/display/QUICKSTEP/How+To+Release
+[pgp-keys]: http://quickstep.apache.org/release-signing/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a240fcd/release/release_cmds.sh
----------------------------------------------------------------------
diff --git a/release/release_cmds.sh b/release/release_cmds.sh
new file mode 100755
index 0000000..d23b4ef
--- /dev/null
+++ b/release/release_cmds.sh
@@ -0,0 +1,172 @@
+#!/usr/bin/env bash
+
+# Functions for Releasing Quickstep
+# Usage: Fill out your details in release_manager.profile.
+#        ./release_cmds [create|publish|test]
+#
+# Note: This script is a scratch pad with most of the relevent commands. Modify 
+#       it to see fit. We'll finalize it when we understand the process better.
+#       
+#       Everything in this script assumes that you are running in the release/ 
+#       folder of your quickstep directory.
+#
+# PREREQS:
+#  - full details on how to do a release is in a confluence article: How to Release
+#  - You must have previously created a pgp key using your apache username
+#    and uploaded it to a keyserver [http://quickstep.apache.org/release-signing/]
+#
+
+### Helper functions
+
+create_artifacts() {
+  # Updates submodules, archives everything, signs it using your key (requires 
+  # user input), makes checksums, moves artifacts to release/ folder
+  #
+
+  export RELEASE_DIR=`pwd`
+  cd ..
+
+  # need the submodules to be included for the compile to work.
+  # likely, this will be a no-op
+  git submodule init
+  git submodule update
+
+  # you'll need to push this if you want the tag to be visible to committers
+  # the tag is necesary for the archiving to work correctly
+  git tag -a rc-$VERSION -m 'release candidate $VERSION'
+  git archive --format "tar" --prefix=$PROJECT_NAME-$VERSION/ -o $PROJECT_NAME-$VERSION.tar rc-$VERSION
+  git submodule foreach --recursive 'git archive --verbose --prefix=$PROJECT_NAME-$VERSION/$path/ --format tar master --output $RELEASE_DIR/submodule-$sha1.tar'
+  if [[ $(ls submodule-*.tar | wc -l) != 0  ]]; then
+    # combine all archives into one tar
+    tar --concatenate --file $PROJECT_NAME-$VERSION.tar submodule-*.tar
+    # remove sub tars
+    rm -rf submodule-*.tar
+  fi
+
+  # gzip final tar
+  gzip --force --verbose $PROJECT_NAME-$VERSION.tar
+
+  # Make the signature. This requires human input
+  gpg -u $APACHE_USERNAME@apache.org --armor --output $PROJECT_NAME-$VERSION.tar.gz.asc --detach-sign $PROJECT_NAME-$VERSION.tar.gz
+  # Make hashes
+  md5sum $PROJECT_NAME-$VERSION.tar.gz > $PROJECT_NAME-$VERSION.tar.gz.md5
+  sha1sum $PROJECT_NAME-$VERSION.tar.gz > $PROJECT_NAME-$VERSION.tar.gz.sha
+
+
+  # Make sure these three artifacts are good
+  gpg --verify $PROJECT_NAME-$VERSION.tar.gz.asc
+  md5sum --check $PROJECT_NAME-$VERSION.tar.gz.md5
+  sha1sum --check $PROJECT_NAME-$VERSION.tar.gz.sha
+
+  mv $PROJECT_NAME-$VERSION.tar.gz* $RELEASE_DIR
+
+  cd $RELEASE_DIR
+}
+
+publish_candidate() {
+  # push a RC to subversion. We use SVN sparse directories so as not to pull
+  # every artifact ever. Assumes that you have already created the artifacts
+  # and they reside in the release/ folder.
+  # directory layout is x.y.z/RCw, where w is the release candidate number
+  #
+
+  if [ ! -d "$SVN_DEV" ]; then
+    svn checkout --depth immediates $SVN_DEV_URL $SVN_DEV
+  fi
+
+  BASE_DIR=`pwd`
+  cd $SVN_DEV
+
+  if [ ! -d "$VERSION" ]; then
+    mkdir $VERSION
+    cd $VERSION
+  else
+    cd $VERSION
+    svn update --set-depth immediates .
+  fi
+
+  # now in $VERSION folder
+
+  RCFOLDER=RC$CANDIDATE
+  mkdir $RCFOLDER
+  cd $RCFOLDER
+  cp $BASE_DIR/$PROJECT_NAME-$VERSION.tar.gz* ./
+  cd ..
+
+  svn add $VERSION/$RCFOLDER
+  svn commit --username=$APACHE_USERNAME -m "Quickstep-$VERSION RC$CANDIDATE"
+
+  cd $BASE_DIR
+}
+
+test_candidate() {
+  # This is best run in /tmp
+  # tries checking out a release candidate and building
+  #
+
+  svn checkout --depth immediates $SVN_DEV_URL $SVN_DEV
+  cd $SVN_DEV
+
+  # add the KEYS file from quickstep to your gpg key ring
+  gpg --import KEYS
+
+  cd $VERSION
+  svn update --set-depth infinity RC$CANDIDATE
+  cd RC$CANDIDATE
+
+  tar -xzf $PROJECT_NAME-$VERSION.tar.gz
+
+  # verify artifacts
+  gpg --verify $PROJECT_NAME-$VERSION.tar.gz.asc
+  md5sum --check $PROJECT_NAME-$VERSION.tar.gz.md5
+  sha1sum --check $PROJECT_NAME-$VERSION.tar.gz.sha
+
+  # now build the release artifacts
+  cd $PROJECT_NAME-$VERSION
+
+  # first download third_party, then do your normal build
+  cd third_party/
+  ./download_and_patch_prerequisites.sh
+  cd ..
+  cd build
+  cmake ..
+  make
+}
+
+usage() {
+  echo "usage: $0 [create|publish|test]"
+  exit $1
+}
+
+### main
+
+source release_manager.profile
+if [ -z $APACHE_USERNAME ] || [ -z $VERSION ] || [ -z $CANDIDATE ]; then 
+  echo "please set release_manager.profile"
+  exit 1
+fi
+
+alias svn="svn --non-interactive"
+
+set -e
+
+if [ -z $1 ] ; then
+  usage 0
+fi
+
+echo "user: $APACHE_USERNAME"
+echo "version: $VERSION"
+echo "candidate: $CANDIDATE"
+
+if [ "create" == "$1" ] ; then
+  echo "creating artifacts ..."
+  create_artifacts
+elif [ "publish" == "$1" ] ; then
+  echo "publishing candidate artifacts ..."
+  publish_candidate 
+elif [ "test" == "$1" ] ; then
+  echo "testing candidate artifacts ..."
+  test_candidate
+else
+  usage 0
+fi

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6a240fcd/release/release_manager.profile
----------------------------------------------------------------------
diff --git a/release/release_manager.profile b/release/release_manager.profile
new file mode 100644
index 0000000..6b51003
--- /dev/null
+++ b/release/release_manager.profile
@@ -0,0 +1,29 @@
+# Profile for Releasing Quickstep
+#
+# This script is for release managers. It is to be used in conjunction with 
+# the release_cmds script.
+#
+# Usage: Fill out this file with your apache details. Then source this file.
+#        Sourcing the release_cmds file will automatically source this.
+#
+
+# you must be registered with apache
+export APACHE_USERNAME=
+
+# quickstep version, Major.Minor.Increment
+export VERSION=
+
+# release candidate number. For a new version, this starts at 0
+export CANDIDATE=
+
+# folders corresponding to SVN repos where we keep our release artifacts
+export SVN_DEV=svn-quickstep-dev
+export SVN_REL=svn-quickstep-rel
+
+# you probably will not need to change things below this line
+export PROJECT_NAME=apache-quickstep-incubating
+
+# path to apache repo with quickstep releases
+
+export SVN_DEV_URL=https://dist.apache.org/repos/dist/dev/incubator/quickstep
+export SVN_REL_URL=https://dist.apache.org/repos/dist/rel/incubator/quickstep
\ No newline at end of file