You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:01:35 UTC

[43/50] incubator-quickstep git commit: Added DistributedCli executable.

Added DistributedCli executable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b949c504
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b949c504
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b949c504

Branch: refs/heads/quickstep_partition_parser_support
Commit: b949c50450625f16c40a581db3ed811be2b9ccf2
Parents: bc4086b
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Nov 27 22:32:24 2016 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Dec 5 00:15:16 2016 -0800

----------------------------------------------------------------------
 CMakeLists.txt                               |  17 ++
 cli/CMakeLists.txt                           |   3 +
 cli/distributed/CMakeLists.txt               |  93 +++++++++
 cli/distributed/Cli.cpp                      | 239 ++++++++++++++++++++++
 cli/distributed/Cli.hpp                      |  71 +++++++
 cli/distributed/CliDistributedModule.hpp     |  23 +++
 cli/distributed/Conductor.cpp                | 180 ++++++++++++++++
 cli/distributed/Conductor.hpp                |  80 ++++++++
 cli/distributed/Executor.cpp                 |  87 ++++++++
 cli/distributed/Executor.hpp                 |  83 ++++++++
 cli/distributed/QuickstepDistributedCli.cpp  |  81 ++++++++
 cli/distributed/Role.cpp                     |  51 +++++
 cli/distributed/Role.hpp                     |  69 +++++++
 query_execution/QueryExecutionMessages.proto |   8 +
 query_execution/QueryExecutionTypedefs.hpp   |   6 +-
 validate_cmakelists.py                       |   4 +-
 16 files changed, 1093 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 126b47b..391cb26 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -782,3 +782,20 @@ endif()
 
 # Link against other required system and third-party libraries.
 target_link_libraries(quickstep_cli_shell ${LIBS})
+
+if (ENABLE_DISTRIBUTED)
+  # Build the quickstep_distributed_cli_shell executable.
+  add_executable (quickstep_distributed_cli_shell cli/distributed/QuickstepDistributedCli.cpp)
+  # Link against direct deps (will transitively pull in everything needed).
+  # NOTE(zuyu): Link quickstep_cli_LineReader on behalf of quickstep_cli_distributed_Cli,
+  # as a workaround for bypassing conditionally built target checks in validate_cmakelists.py.
+  target_link_libraries(quickstep_distributed_cli_shell
+                        glog
+                        quickstep_cli_LineReader
+                        quickstep_cli_distributed_Cli
+                        quickstep_cli_distributed_Conductor
+                        quickstep_cli_distributed_Executor
+                        quickstep_utility_StringUtil
+                        ${GFLAGS_LIB_NAME}
+                        ${GRPCPLUSPLUS_LIBRARIES})
+endif(ENABLE_DISTRIBUTED)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index 9b62af9..be13c82 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -16,6 +16,9 @@
 # under the License.
 
 include_directories(${CMAKE_CURRENT_BINARY_DIR})
+if (ENABLE_DISTRIBUTED)
+  add_subdirectory(distributed)
+endif(ENABLE_DISTRIBUTED)
 add_subdirectory(tests)
 
 if (WIN32)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt
new file mode 100644
index 0000000..e16d8af
--- /dev/null
+++ b/cli/distributed/CMakeLists.txt
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if (BUILD_SHARED_LIBS)
+  set(GFLAGS_LIB_NAME gflags_nothreads-shared)
+else()
+  set(GFLAGS_LIB_NAME gflags_nothreads-static)
+endif()
+
+# Declare micro-libs and link dependencies:
+add_library(quickstep_cli_distributed_Cli Cli.cpp Cli.hpp)
+add_library(quickstep_cli_distributed_Conductor Conductor.cpp Conductor.hpp)
+add_library(quickstep_cli_distributed_Executor Executor.cpp Executor.hpp)
+add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp)
+
+# Link dependencies:
+target_link_libraries(quickstep_cli_distributed_Cli
+                      glog
+                      quickstep_catalog_CatalogRelation
+                      quickstep_cli_Flags
+                      quickstep_cli_PrintToScreen
+                      quickstep_cli_distributed_Role
+                      quickstep_parser_ParseStatement
+                      quickstep_parser_SqlParserWrapper
+                      quickstep_queryexecution_BlockLocatorUtil
+                      quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_queryexecution_QueryExecutionTypedefs
+                      quickstep_queryexecution_QueryExecutionUtil
+                      quickstep_storage_DataExchangerAsync
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageManager
+                      quickstep_utility_Macros
+                      quickstep_utility_StringUtil
+                      tmb)
+target_link_libraries(quickstep_cli_distributed_Conductor
+                      glog
+                      quickstep_cli_DefaultsConfigurator
+                      quickstep_cli_Flags
+                      quickstep_cli_distributed_Role
+                      quickstep_parser_ParseStatement
+                      quickstep_parser_SqlParserWrapper
+                      quickstep_queryexecution_BlockLocator
+                      quickstep_queryexecution_ForemanDistributed
+                      quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_queryexecution_QueryExecutionTypedefs
+                      quickstep_queryexecution_QueryExecutionUtil
+                      quickstep_queryoptimizer_QueryHandle
+                      quickstep_queryoptimizer_QueryProcessor
+                      quickstep_storage_StorageConstants
+                      quickstep_utility_Macros
+                      quickstep_utility_SqlError
+                      tmb)
+target_link_libraries(quickstep_cli_distributed_Executor
+                      glog
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_cli_Flags
+                      quickstep_cli_distributed_Role
+                      quickstep_queryexecution_BlockLocatorUtil
+                      quickstep_queryexecution_QueryExecutionTypedefs
+                      quickstep_queryexecution_Shiftboss
+                      quickstep_queryexecution_Worker
+                      quickstep_queryexecution_WorkerDirectory
+                      quickstep_storage_DataExchangerAsync
+                      quickstep_storage_StorageManager
+                      quickstep_utility_Macros
+                      tmb)
+target_link_libraries(quickstep_cli_distributed_Role
+                      quickstep_utility_Macros
+                      tmb
+                      ${GFLAGS_LIB_NAME})
+
+# Module all-in-one library:
+add_library(quickstep_cli_distributed ../../empty_src.cpp CliDistributedModule.hpp)
+
+target_link_libraries(quickstep_cli_distributed
+                      quickstep_cli_distributed_Cli
+                      quickstep_cli_distributed_Conductor
+                      quickstep_cli_distributed_Executor
+                      quickstep_cli_distributed_Role)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
new file mode 100644
index 0000000..01f824d
--- /dev/null
+++ b/cli/distributed/Cli.cpp
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/distributed/Cli.hpp"
+
+#include <chrono>
+#include <cstddef>
+#include <cstdio>
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "cli/CliConfig.h"  // For QUICKSTEP_USE_LINENOISE.
+#include "cli/Flags.hpp"
+
+#ifdef QUICKSTEP_USE_LINENOISE
+#include "cli/LineReaderLineNoise.hpp"
+typedef quickstep::LineReaderLineNoise LineReaderImpl;
+#else
+#include "cli/LineReaderDumb.hpp"
+typedef quickstep::LineReaderDumb LineReaderImpl;
+#endif
+
+#include "cli/PrintToScreen.hpp"
+#include "parser/ParseStatement.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "utility/StringUtil.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/message_style.h"
+#include "tmb/native_net_client_message_bus.h"
+#include "tmb/tagged_message.h"
+
+#include "glog/logging.h"
+
+using std::fprintf;
+using std::free;
+using std::make_unique;
+using std::malloc;
+using std::move;
+using std::printf;
+using std::size_t;
+using std::string;
+using std::vector;
+
+using tmb::AnnotatedMessage;
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+namespace S = ::quickstep::serialization;
+
+void Cli::init() {
+  cli_id_ = bus_.Connect();
+  DLOG(INFO) << "DistributedCli TMB Client ID: " << cli_id_;
+
+  bus_.RegisterClientAsSender(cli_id_, kDistributedCliRegistrationMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kDistributedCliRegistrationResponseMessage);
+
+  DLOG(INFO) << "DistributedCli sent DistributedCliRegistrationMessage (typed '"
+             << kDistributedCliRegistrationMessage
+             << "') to all";
+
+  tmb::Address all_addresses;
+  all_addresses.All(true);
+  // NOTE(zuyu): The singleton Conductor would need one copy of the message.
+  tmb::MessageStyle style;
+
+  TaggedMessage cli_reg_message(kDistributedCliRegistrationMessage);
+  DCHECK(tmb::MessageBus::SendStatus::kOK ==
+      bus_.Send(cli_id_, all_addresses, style, move(cli_reg_message)));
+
+  // Wait for Conductor to response.
+  const AnnotatedMessage cli_reg_response_message(bus_.Receive(cli_id_, 0, true));
+  DCHECK_EQ(kDistributedCliRegistrationResponseMessage,
+            cli_reg_response_message.tagged_message.message_type());
+  conductor_client_id_ = cli_reg_response_message.sender;
+
+  DLOG(INFO) << "DistributedCli received typed '" << kDistributedCliRegistrationResponseMessage
+             << "' message from Conductor (id'" << conductor_client_id_ << "').";
+
+  // Setup StorageManager.
+  bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage);
+
+  client_id locator_client_id;
+  storage_manager_ = make_unique<StorageManager>(
+      FLAGS_storage_path,
+      block_locator::getBlockDomain(data_exchanger_.network_address(), cli_id_, &locator_client_id, &bus_),
+      locator_client_id, &bus_);
+
+  data_exchanger_.set_storage_manager(storage_manager_.get());
+  data_exchanger_.start();
+
+  // Prepare for submitting a query.
+  bus_.RegisterClientAsSender(cli_id_, kSqlQueryMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
+  bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage);
+}
+
+void Cli::run() {
+  LineReaderImpl line_reader("distributed_quickstep> ",
+                             "                  ...> ");
+  auto parser_wrapper = make_unique<SqlParserWrapper>();
+  std::chrono::time_point<std::chrono::steady_clock> start, end;
+
+  for (;;) {
+    string *command_string = new string();
+    *command_string = line_reader.getNextCommand();
+    if (command_string->size() == 0) {
+      delete command_string;
+      break;
+    }
+
+    parser_wrapper->feedNextBuffer(command_string);
+
+    bool quitting = false;
+    // A parse error should reset the parser. This is because the thrown quickstep
+    // SqlError does not do the proper reset work of the YYABORT macro.
+    bool reset_parser = false;
+    for (;;) {
+      ParseResult result = parser_wrapper->getNextStatement();
+      const ParseStatement &statement = *result.parsed_statement;
+      if (result.condition == ParseResult::kSuccess) {
+        if (statement.getStatementType() == ParseStatement::kQuit) {
+          quitting = true;
+          break;
+        }
+
+        CHECK_NE(statement.getStatementType(), ParseStatement::kCommand)
+            << "TODO(quickstep-team)";
+
+        DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
+                   << "') to Conductor";
+        S::SqlQueryMessage proto;
+        proto.set_sql_query(*command_string);
+
+        const size_t proto_length = proto.ByteSize();
+        char *proto_bytes = static_cast<char*>(malloc(proto_length));
+        CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+        TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes),
+                                        proto_length,
+                                        kSqlQueryMessage);
+        free(proto_bytes);
+
+        QueryExecutionUtil::SendTMBMessage(&bus_,
+                                           cli_id_,
+                                           conductor_client_id_,
+                                           move(sql_query_message));
+
+        start = std::chrono::steady_clock::now();
+
+        const AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true));
+        const TaggedMessage &tagged_message = annotated_message.tagged_message;
+        DLOG(INFO) << "DistributedCli received typed '" << tagged_message.message_type()
+                   << "' message from client " << annotated_message.sender;
+        switch (tagged_message.message_type()) {
+          case kQueryExecutionSuccessMessage: {
+            end = std::chrono::steady_clock::now();
+
+            S::QueryExecutionSuccessMessage proto;
+            CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+            if (proto.has_result_relation()) {
+              CatalogRelation result_relation(proto.result_relation());
+
+              PrintToScreen::PrintRelation(result_relation, storage_manager_.get(), stdout);
+
+              const vector<block_id> blocks(result_relation.getBlocksSnapshot());
+              for (const block_id block : blocks) {
+                storage_manager_->deleteBlockOrBlobFile(block);
+              }
+            }
+
+            std::chrono::duration<double, std::milli> time_in_ms = end - start;
+            printf("Time: %s ms\n", DoubleToStringWithSignificantDigits(time_in_ms.count(), 3).c_str());
+            break;
+          }
+          case kQueryExecutionErrorMessage: {
+            S::QueryExecutionErrorMessage proto;
+            CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+            fprintf(stderr, "%s", proto.error_message().c_str());
+            break;
+          }
+          default: {
+            LOG(ERROR) << "Unknown TMB message type";
+          }
+        }
+      } else {
+        if (result.condition == ParseResult::kError) {
+          fprintf(stderr, "%s", result.error_message.c_str());
+        }
+        reset_parser = true;
+        break;
+      }
+    }
+
+    if (quitting) {
+      break;
+    } else if (reset_parser) {
+      parser_wrapper = make_unique<SqlParserWrapper>();
+      reset_parser = false;
+    }
+  }
+
+  bus_.Disconnect(cli_id_);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/Cli.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.hpp b/cli/distributed/Cli.hpp
new file mode 100644
index 0000000..32c178f
--- /dev/null
+++ b/cli/distributed/Cli.hpp
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_DISTRIBUTED_CLI_HPP_
+#define QUICKSTEP_CLI_DISTRIBUTED_CLI_HPP_
+
+#include <memory>
+
+#include "cli/distributed/Role.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/** \addtogroup CliDistributed
+ *  @{
+ */
+
+/**
+ * @brief A class for the Cli component in the distributed version.
+ **/
+class Cli final : public Role {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  Cli() = default;
+
+  ~Cli() override {
+    data_exchanger_.shutdown();
+    storage_manager_.reset();
+    data_exchanger_.join();
+  }
+
+  void init() override;
+
+  void run() override;
+
+ private:
+  tmb::client_id cli_id_, conductor_client_id_;
+
+  DataExchangerAsync data_exchanger_;
+  std::unique_ptr<StorageManager> storage_manager_;
+
+  DISALLOW_COPY_AND_ASSIGN(Cli);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_CLI_DISTRIBUTED_CLI_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/CliDistributedModule.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/CliDistributedModule.hpp b/cli/distributed/CliDistributedModule.hpp
new file mode 100644
index 0000000..cfa1e1b
--- /dev/null
+++ b/cli/distributed/CliDistributedModule.hpp
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+/** @defgroup CliDistributed
+ *
+ * The distributed QuickStep command-line interface.
+ **/

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
new file mode 100644
index 0000000..c4a2721
--- /dev/null
+++ b/cli/distributed/Conductor.cpp
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/distributed/Conductor.hpp"
+
+#include <cstddef>
+#include <cstdlib>
+#include <exception>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "cli/DefaultsConfigurator.hpp"
+#include "cli/Flags.hpp"
+#include "parser/ParseStatement.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
+#include "storage/StorageConstants.hpp"
+#include "utility/SqlError.hpp"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/native_net_client_message_bus.h"
+#include "tmb/tagged_message.h"
+
+#include "glog/logging.h"
+
+using std::free;
+using std::make_unique;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::string;
+
+using tmb::AnnotatedMessage;
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+namespace S = ::quickstep::serialization;
+
+void Conductor::init() {
+  try {
+    string catalog_path = FLAGS_storage_path + kCatalogFilename;
+
+    if (quickstep::FLAGS_initialize_db) {  // Initialize the database
+      DefaultsConfigurator::InitializeDefaultDatabase(FLAGS_storage_path, catalog_path);
+    }
+
+    query_processor_ = make_unique<QueryProcessor>(move(catalog_path));
+  } catch (const std::exception &e) {
+    LOG(FATAL) << "FATAL ERROR DURING STARTUP: " << e.what()
+               << "\nIf you intended to create a new database, "
+               << "please use the \"-initialize_db=true\" command line option.";
+  } catch (...) {
+    LOG(FATAL) << "NON-STANDARD EXCEPTION DURING STARTUP";
+  }
+
+  bus_.ResetBus();
+
+  conductor_client_id_ = bus_.Connect();
+  DLOG(INFO) << "Conductor TMB Client ID: " << conductor_client_id_;
+
+  bus_.RegisterClientAsReceiver(conductor_client_id_, kDistributedCliRegistrationMessage);
+  bus_.RegisterClientAsSender(conductor_client_id_, kDistributedCliRegistrationResponseMessage);
+
+  bus_.RegisterClientAsReceiver(conductor_client_id_, kSqlQueryMessage);
+  bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
+  bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
+
+  block_locator_ = make_unique<BlockLocator>(&bus_);
+  block_locator_->start();
+
+  foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, query_processor_->getDefaultDatabase());
+  foreman_->start();
+}
+
+void Conductor::run() {
+  for (;;) {
+    AnnotatedMessage annotated_message(bus_.Receive(conductor_client_id_, 0, true));
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    const client_id sender = annotated_message.sender;
+
+    DLOG(INFO) << "Conductor received typed '" << tagged_message.message_type()
+               << "' message from client " << sender;
+    switch (tagged_message.message_type()) {
+      case kDistributedCliRegistrationMessage: {
+        TaggedMessage message(kDistributedCliRegistrationResponseMessage);
+
+        DLOG(INFO) << "Conductor sent DistributedCliRegistrationResponseMessage (typed '"
+                   << kDistributedCliRegistrationResponseMessage
+                   << "') to Distributed CLI " << sender;
+        CHECK(MessageBus::SendStatus::kOK ==
+            QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+        break;
+      }
+      case kSqlQueryMessage: {
+        S::SqlQueryMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+        DLOG(INFO) << "Conductor received the following SQL query: " << proto.sql_query();
+
+        processSqlQueryMessage(sender, new string(move(proto.sql_query())));
+        break;
+      }
+      default:
+        LOG(FATAL) << "Unknown TMB message type";
+    }
+  }
+}
+
+void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
+  parser_wrapper_.feedNextBuffer(command_string);
+  ParseResult parse_result = parser_wrapper_.getNextStatement();
+
+  CHECK(parse_result.condition == ParseResult::kSuccess)
+      << "Any SQL syntax error should be addressed in the DistributedCli.";
+
+  const ParseStatement &statement = *parse_result.parsed_statement;
+  CHECK(statement.getStatementType() != ParseStatement::kCommand)
+     << "TODO(quickstep-team)";
+
+  try {
+    auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),
+                                                 sender,
+                                                 statement.getPriority());
+    query_processor_->generateQueryHandle(statement, query_handle.get());
+    DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+    QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+        conductor_client_id_,
+        foreman_->getBusClientID(),
+        query_handle.release(),
+        &bus_);
+  } catch (const SqlError &sql_error) {
+    // Set the query execution status along with the error message.
+    S::QueryExecutionErrorMessage proto;
+    proto.set_error_message(sql_error.formatMessage(*command_string));
+
+    const size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kQueryExecutionErrorMessage);
+    free(proto_bytes);
+
+    DLOG(INFO) << "Conductor (on behalf of Optimizer) sent QueryExecutionErrorMessage (typed '"
+               << kQueryExecutionErrorMessage
+               << "') to Distributed CLI " << sender;
+    CHECK(MessageBus::SendStatus::kOK ==
+        QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
new file mode 100644
index 0000000..e8c9582
--- /dev/null
+++ b/cli/distributed/Conductor.hpp
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_DISTRIBUTED_CONDUCTOR_HPP_
+#define QUICKSTEP_CLI_DISTRIBUTED_CONDUCTOR_HPP_
+
+#include <memory>
+#include <string>
+
+#include "cli/distributed/Role.hpp"
+#include "parser/SqlParserWrapper.hpp"
+#include "query_execution/BlockLocator.hpp"
+#include "query_execution/ForemanDistributed.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/** \addtogroup CliDistributed
+ *  @{
+ */
+
+/**
+ * @brief A class for the Conductor component in the distributed version.
+ **/
+class Conductor final : public Role {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  Conductor() = default;
+
+  ~Conductor() override {
+    foreman_->join();
+    block_locator_->join();
+  }
+
+  void init() override;
+
+  void run() override;
+
+ private:
+  void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string);
+
+  SqlParserWrapper parser_wrapper_;
+
+  std::unique_ptr<QueryProcessor> query_processor_;
+
+  tmb::client_id conductor_client_id_;
+
+  std::unique_ptr<BlockLocator> block_locator_;
+
+  std::unique_ptr<ForemanDistributed> foreman_;
+
+  DISALLOW_COPY_AND_ASSIGN(Conductor);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_CLI_DISTRIBUTED_CONDUCTOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/Executor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp
new file mode 100644
index 0000000..1d03579
--- /dev/null
+++ b/cli/distributed/Executor.cpp
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/distributed/Executor.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "cli/Flags.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/native_net_client_message_bus.h"
+
+#include "glog/logging.h"
+
+using std::make_unique;
+using std::size_t;
+using std::vector;
+
+using tmb::client_id;
+
+namespace quickstep {
+
+void Executor::init() {
+  executor_client_id_ = bus_.Connect();
+  DLOG(INFO) << "Executor TMB Client ID: " << executor_client_id_;
+
+  bus_.RegisterClientAsSender(executor_client_id_, kBlockDomainRegistrationMessage);
+  bus_.RegisterClientAsReceiver(executor_client_id_, kBlockDomainRegistrationResponseMessage);
+
+  vector<client_id> worker_client_ids;
+  vector<numa_node_id> worker_numa_nodes(FLAGS_num_workers, kAnyNUMANodeID);
+
+  for (std::size_t worker_thread_index = 0;
+       worker_thread_index < FLAGS_num_workers;
+       ++worker_thread_index) {
+    workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_));
+    worker_client_ids.push_back(workers_.back()->getBusClientID());
+  }
+
+  worker_directory_ =
+      make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids, worker_numa_nodes);
+
+  client_id locator_client_id;
+  storage_manager_ = make_unique<StorageManager>(
+      FLAGS_storage_path,
+      block_locator::getBlockDomain(data_exchanger_.network_address(), executor_client_id_, &locator_client_id, &bus_),
+      locator_client_id, &bus_);
+
+  data_exchanger_.set_storage_manager(storage_manager_.get());
+  data_exchanger_.start();
+
+  shiftboss_ =
+      make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get());
+  shiftboss_->start();
+
+  for (const auto &worker : workers_) {
+    worker->start();
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/Executor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.hpp b/cli/distributed/Executor.hpp
new file mode 100644
index 0000000..6ffa756
--- /dev/null
+++ b/cli/distributed/Executor.hpp
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_DISTRIBUTED_EXECUTOR_HPP_
+#define QUICKSTEP_CLI_DISTRIBUTED_EXECUTOR_HPP_
+
+#include <memory>
+#include <vector>
+
+#include "cli/distributed/Role.hpp"
+#include "query_execution/Shiftboss.hpp"
+#include "query_execution/Worker.hpp"
+#include "query_execution/WorkerDirectory.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+/** \addtogroup CliDistributed
+ *  @{
+ */
+
+/**
+ * @brief A class for the Executor component in the distributed version.
+ **/
+class Executor final : public Role {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  Executor() = default;
+
+  ~Executor() override {
+    for (const auto &worker : workers_) {
+      worker->join();
+    }
+    shiftboss_->join();
+
+    data_exchanger_.shutdown();
+    storage_manager_.reset();
+    data_exchanger_.join();
+  }
+
+  void init() override;
+
+  void run() override {}
+
+ private:
+  tmb::client_id executor_client_id_;
+
+  std::vector<std::unique_ptr<Worker>> workers_;
+  std::unique_ptr<WorkerDirectory> worker_directory_;
+  DataExchangerAsync data_exchanger_;
+  std::unique_ptr<StorageManager> storage_manager_;
+  std::unique_ptr<Shiftboss> shiftboss_;
+
+  DISALLOW_COPY_AND_ASSIGN(Executor);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_CLI_DISTRIBUTED_EXECUTOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/QuickstepDistributedCli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/QuickstepDistributedCli.cpp b/cli/distributed/QuickstepDistributedCli.cpp
new file mode 100644
index 0000000..f01cd13
--- /dev/null
+++ b/cli/distributed/QuickstepDistributedCli.cpp
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+/* A standalone command-line interface to Distributed QuickStep */
+
+#include <memory>
+#include <string>
+
+#include "cli/distributed/Cli.hpp"
+#include "cli/distributed/Conductor.hpp"
+#include "cli/distributed/Executor.hpp"
+#include "cli/distributed/Role.hpp"
+#include "utility/StringUtil.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+#include "grpc/grpc.h"
+
+using std::make_unique;
+
+namespace quickstep {
+
+constexpr char kCliRole[] = "cli";
+constexpr char kConductorRole[] = "conductor";
+constexpr char kExecutorRole[] = "executor";
+
+DEFINE_string(role, "",
+              "The role in the distributed Quickstep: Conductor, Executor, or Cli.");
+static bool ValidateRole(const char *flagname,
+                         const std::string &value) {
+  if (value.empty()) {
+    return false;
+  }
+
+  FLAGS_role = ToLower(value);
+  return FLAGS_role == kCliRole ||
+         FLAGS_role == kConductorRole ||
+         FLAGS_role == kExecutorRole;
+}
+static const volatile bool role_dummy
+    = gflags::RegisterFlagValidator(&FLAGS_role, &ValidateRole);
+
+}  // namespace quickstep
+
+using quickstep::FLAGS_role;
+
+int main(int argc, char *argv[]) {
+  google::InitGoogleLogging(argv[0]);
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+  grpc_init();
+
+  std::unique_ptr<quickstep::Role> role;
+  if (FLAGS_role == quickstep::kCliRole) {
+    role = make_unique<quickstep::Cli>();
+  } else if (FLAGS_role == quickstep::kConductorRole) {
+    role = make_unique<quickstep::Conductor>();
+  } else if (FLAGS_role == quickstep::kExecutorRole) {
+    role = make_unique<quickstep::Executor>();
+  }
+
+  role->init();
+  role->run();
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/Role.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Role.cpp b/cli/distributed/Role.cpp
new file mode 100644
index 0000000..d56ef09
--- /dev/null
+++ b/cli/distributed/Role.cpp
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "cli/distributed/Role.hpp"
+
+#include <cstdio>
+#include <cstdint>
+
+#include "gflags/gflags.h"
+
+#include "tmb/native_net_client_message_bus.h"
+
+namespace quickstep {
+
+DEFINE_string(tmb_server_ip, "127.0.0.1", "IP Address of the TMB Server.");
+
+static bool ValidateTmbServerPort(const char *flagname,
+                                 std::int32_t value) {
+  if (value > 0 && value < 65536) {
+    return true;
+  } else {
+    std::fprintf(stderr, "--%s must be between 1 and 65535 (inclusive)\n", flagname);
+    return false;
+  }
+}
+DEFINE_int32(tmb_server_port, 4575, "Port of the TMB Server.");
+static const bool tmb_server_port_dummy
+    = gflags::RegisterFlagValidator(&FLAGS_tmb_server_port, &ValidateTmbServerPort);
+
+Role::Role() {
+  bus_.AddServer(FLAGS_tmb_server_ip, FLAGS_tmb_server_port);
+  bus_.Initialize();
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/cli/distributed/Role.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Role.hpp b/cli/distributed/Role.hpp
new file mode 100644
index 0000000..b802543
--- /dev/null
+++ b/cli/distributed/Role.hpp
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_CLI_DISTRIBUTED_ROLE_HPP_
+#define QUICKSTEP_CLI_DISTRIBUTED_ROLE_HPP_
+
+#include "utility/Macros.hpp"
+
+#include "tmb/native_net_client_message_bus.h"
+
+namespace quickstep {
+
+/** \addtogroup CliDistributed
+ *  @{
+ */
+
+/**
+ * @brief A base class for all components in the distributed version.
+ **/
+class Role {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  Role();
+
+  /**
+   * @brief Virtual destructor.
+   **/
+  virtual ~Role() {}
+
+  /**
+   * @brief Initialize the component.
+   **/
+  virtual void init() = 0;
+
+  /**
+   * @brief Start the component.
+   **/
+  virtual void run() = 0;
+
+ protected:
+  tmb::NativeNetClientMessageBus bus_;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(Role);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_CLI_DISTRIBUTED_ROLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 93e458c..28b5ebd 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -78,6 +78,10 @@ message ShiftbossRegistrationResponseMessage {
   required uint64 shiftboss_index = 1;
 }
 
+message SqlQueryMessage {
+  required string sql_query = 1;
+}
+
 message QueryInitiateMessage {
   required uint64 query_id = 1;
   required CatalogDatabase catalog_database_cache = 2;
@@ -131,6 +135,10 @@ message QueryExecutionSuccessMessage {
   optional CatalogRelationSchema result_relation = 1;
 }
 
+message QueryExecutionErrorMessage {
+  required string error_message = 1;
+}
+
 // BlockLocator related messages.
 message BlockDomainRegistrationMessage {
   // Format IP:Port, i.e., "0.0.0.0:0".

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 919e45b..faf2132 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -81,6 +81,9 @@ enum QueryExecutionMessageType : message_type_id {
   kShiftbossRegistrationMessage,  // From Shiftboss to Foreman.
   kShiftbossRegistrationResponseMessage,  // From Foreman to Shiftboss, or from
                                           // Shiftboss to Worker.
+  kDistributedCliRegistrationMessage,  // From CLI to Conductor.
+  kDistributedCliRegistrationResponseMessage,  // From Conductor to CLI.
+  kSqlQueryMessage,  // From CLI to Conductor.
   kQueryInitiateMessage,  // From Foreman to Shiftboss.
   kQueryInitiateResponseMessage,  // From Shiftboss to Foreman.
 
@@ -92,8 +95,9 @@ enum QueryExecutionMessageType : message_type_id {
   kSaveQueryResultMessage,  // From Foreman to Shiftboss.
   kSaveQueryResultResponseMessage,  // From Shiftboss to Foreman.
 
-  // From Foreman to CLI.
+  // From Foreman / Conductor to CLI.
   kQueryExecutionSuccessMessage,
+  kQueryExecutionErrorMessage,
 
   // BlockLocator related messages, sorted in a life cycle of StorageManager
   // with a unique block domain.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b949c504/validate_cmakelists.py
----------------------------------------------------------------------
diff --git a/validate_cmakelists.py b/validate_cmakelists.py
index f691d1f..9d1f530 100755
--- a/validate_cmakelists.py
+++ b/validate_cmakelists.py
@@ -46,7 +46,9 @@ EXCLUDED_TOP_LEVEL_DIRS = ["build", "third_party"]
 # Explicitly ignored dependencies (special headers with no other quickstep
 # dependencies).
 IGNORED_DEPENDENCIES = frozenset(
-    ["quickstep_storage_DataExchange.grpc_proto",
+    ["quickstep_cli_LineReaderDumb",
+     "quickstep_cli_LineReaderLineNoise",
+     "quickstep_storage_DataExchange.grpc_proto",
      "quickstep_threading_WinThreadsAPI",
      "quickstep_utility_textbasedtest_TextBasedTest",
      "quickstep_utility_textbasedtest_TextBasedTestDriver",