You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/03 12:14:45 UTC

[1/8] incubator-quickstep git commit: Minor refactor regarding CatalogDatabase. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/dist-patch 57ca76249 -> c5aa04127 (forced update)


Minor refactor regarding CatalogDatabase.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/964a8064
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/964a8064
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/964a8064

Branch: refs/heads/dist-patch
Commit: 964a80649f37502020c35a15401236f3f48719a2
Parents: 87bbb26
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Mar 2 16:02:20 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Mar 2 16:02:20 2017 -0800

----------------------------------------------------------------------
 catalog/CatalogDatabase.hpp           | 2 +-
 catalog/CatalogDatabaseCache.hpp      | 2 +-
 query_execution/ForemanSingleNode.cpp | 3 +--
 query_execution/ForemanSingleNode.hpp | 1 -
 4 files changed, 3 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/964a8064/catalog/CatalogDatabase.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogDatabase.hpp b/catalog/CatalogDatabase.hpp
index b071d09..86cfcba 100644
--- a/catalog/CatalogDatabase.hpp
+++ b/catalog/CatalogDatabase.hpp
@@ -138,7 +138,7 @@ class RelationIdNotFound : public std::exception {
 /**
  * @brief A single database in the catalog.
  **/
-class CatalogDatabase : public CatalogDatabaseLite {
+class CatalogDatabase final : public CatalogDatabaseLite {
  public:
   typedef std::unordered_map<std::string, CatalogRelation*>::size_type size_type;
   typedef PtrVector<CatalogRelation, true>::const_skip_iterator const_iterator;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/964a8064/catalog/CatalogDatabaseCache.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogDatabaseCache.hpp b/catalog/CatalogDatabaseCache.hpp
index 88ca61d..88e23ab 100644
--- a/catalog/CatalogDatabaseCache.hpp
+++ b/catalog/CatalogDatabaseCache.hpp
@@ -54,7 +54,7 @@ namespace serialization { class CatalogDatabase; }
  * @note A CatalogRelationSchema should be kept unless all associated blocks
  *       have been deleted.
  **/
-class CatalogDatabaseCache : public CatalogDatabaseLite {
+class CatalogDatabaseCache final : public CatalogDatabaseLite {
  public:
   /**
    * @brief Constructor.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/964a8064/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index 02799c7..3eac0ff 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -67,7 +67,6 @@ ForemanSingleNode::ForemanSingleNode(
     : ForemanBase(bus, cpu_id),
       main_thread_client_id_(main_thread_client_id),
       worker_directory_(DCHECK_NOTNULL(worker_directory)),
-      catalog_database_(DCHECK_NOTNULL(catalog_database)),
       storage_manager_(DCHECK_NOTNULL(storage_manager)) {
   const std::vector<QueryExecutionMessageType> sender_message_types{
       kPoisonMessage,
@@ -95,7 +94,7 @@ ForemanSingleNode::ForemanSingleNode(
   policy_enforcer_ = std::make_unique<PolicyEnforcerSingleNode>(
       foreman_client_id_,
       num_numa_nodes,
-      catalog_database_,
+      catalog_database,
       storage_manager_,
       worker_directory_,
       bus_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/964a8064/query_execution/ForemanSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.hpp b/query_execution/ForemanSingleNode.hpp
index d2db51b..9fa3b9a 100644
--- a/query_execution/ForemanSingleNode.hpp
+++ b/query_execution/ForemanSingleNode.hpp
@@ -111,7 +111,6 @@ class ForemanSingleNode final : public ForemanBase {
 
   WorkerDirectory *worker_directory_;
 
-  CatalogDatabaseLite *catalog_database_;
   StorageManager *storage_manager_;
 
   DISALLOW_COPY_AND_ASSIGN(ForemanSingleNode);


[3/8] incubator-quickstep git commit: Refactored command execution in the distributed version.

Posted by zu...@apache.org.
Refactored command execution in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7f25d1c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7f25d1c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7f25d1c1

Branch: refs/heads/dist-patch
Commit: 7f25d1c1473def1f6b5733bf55673f91d11d0195
Parents: ccb2852
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Mar 2 21:30:28 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Mar 2 21:30:28 2017 -0800

----------------------------------------------------------------------
 cli/distributed/Cli.cpp                      |  54 +++------
 cli/distributed/Conductor.cpp                | 137 ++++++++--------------
 cli/distributed/Conductor.hpp                |   5 -
 query_execution/QueryExecutionMessages.proto |   4 -
 query_execution/QueryExecutionTypedefs.hpp   |   4 +-
 5 files changed, 67 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f25d1c1/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 49b7dc1..63f3259 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -123,7 +123,7 @@ void Cli::init() {
   data_exchanger_.set_storage_manager(storage_manager_.get());
   data_exchanger_.start();
 
-  // Prepare for submitting a query.
+  // Prepare for submitting a query or a command.
   bus_.RegisterClientAsSender(cli_id_, kSqlQueryMessage);
 
   bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage);
@@ -131,8 +131,6 @@ void Cli::init() {
 
   bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage);
 
-  // Prepare for submitting a command.
-  bus_.RegisterClientAsSender(cli_id_, kCommandMessage);
   bus_.RegisterClientAsReceiver(cli_id_, kCommandResponseMessage);
 }
 
@@ -166,50 +164,36 @@ void Cli::run() {
         }
 
         if (statement.getStatementType() == ParseStatement::kCommand) {
-          const ParseCommand &command = static_cast<const ParseCommand &>(statement);
-          const std::string &command_str = command.command()->value();
+          const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement);
+          const std::string &command = parse_command.command()->value();
           try {
-            if (command_str == C::kAnalyzeCommand) {
+            if (command == C::kAnalyzeCommand) {
               // TODO(zuyu): support '\analyze'.
-              THROW_SQL_ERROR_AT(command.command()) << "Unsupported Command";
-            } else if (command_str != C::kDescribeDatabaseCommand &&
-                       command_str != C::kDescribeTableCommand) {
-              THROW_SQL_ERROR_AT(command.command()) << "Invalid Command";
+              THROW_SQL_ERROR_AT(parse_command.command()) << "Unsupported Command";
+            } else if (command != C::kDescribeDatabaseCommand &&
+                       command != C::kDescribeTableCommand) {
+              THROW_SQL_ERROR_AT(parse_command.command()) << "Invalid Command";
             }
           } catch (const SqlError &error) {
             fprintf(stderr, "%s", error.formatMessage(*command_string).c_str());
             reset_parser = true;
             break;
           }
+        }
 
-          DLOG(INFO) << "DistributedCli sent CommandMessage (typed '" << kCommandMessage
-                     << "') to Conductor";
-          S::CommandMessage proto;
-          proto.set_command(*command_string);
-
-          const size_t proto_length = proto.ByteSize();
-          char *proto_bytes = static_cast<char*>(malloc(proto_length));
-          CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-          TaggedMessage command_message(static_cast<const void*>(proto_bytes), proto_length, kCommandMessage);
-          free(proto_bytes);
-
-          QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(command_message));
-        } else {
-          DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
-                     << "') to Conductor";
-          S::SqlQueryMessage proto;
-          proto.set_sql_query(*command_string);
+        DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage
+                   << "') to Conductor";
+        S::SqlQueryMessage proto;
+        proto.set_sql_query(*command_string);
 
-          const size_t proto_length = proto.ByteSize();
-          char *proto_bytes = static_cast<char*>(malloc(proto_length));
-          CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+        const size_t proto_length = proto.ByteSize();
+        char *proto_bytes = static_cast<char*>(malloc(proto_length));
+        CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-          TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), proto_length, kSqlQueryMessage);
-          free(proto_bytes);
+        TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), proto_length, kSqlQueryMessage);
+        free(proto_bytes);
 
-          QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(sql_query_message));
-        }
+        QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(sql_query_message));
 
         start = std::chrono::steady_clock::now();
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f25d1c1/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index b877b04..1b8bfb2 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -95,21 +95,18 @@ void Conductor::init() {
   bus_.RegisterClientAsReceiver(conductor_client_id_, kDistributedCliRegistrationMessage);
   bus_.RegisterClientAsSender(conductor_client_id_, kDistributedCliRegistrationResponseMessage);
 
-  bus_.RegisterClientAsReceiver(conductor_client_id_, kCommandMessage);
-  bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage);
-
   bus_.RegisterClientAsReceiver(conductor_client_id_, kSqlQueryMessage);
+  bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage);
   bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage);
   bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage);
 
+
   bus_.RegisterClientAsReceiver(conductor_client_id_, kQueryResultTeardownMessage);
 
   block_locator_ = make_unique<BlockLocator>(&bus_);
   block_locator_->start();
 
-  foreman_ = make_unique<ForemanDistributed>(*block_locator_,
-                                             std::bind(&QueryProcessor::saveCatalog, query_processor_.get()), &bus_,
-                                             catalog_database_);
+  foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, catalog_database_, query_processor_.get());
   foreman_->start();
 }
 
@@ -132,14 +129,6 @@ void Conductor::run() {
             QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
         break;
       }
-      case kCommandMessage: {
-        S::CommandMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-        DLOG(INFO) << "Conductor received the following command: " << proto.command();
-
-        processCommandMessage(sender, new string(move(proto.command())));
-        break;
-      }
       case kSqlQueryMessage: {
         S::SqlQueryMessage proto;
         CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
@@ -161,91 +150,59 @@ void Conductor::run() {
   }
 }
 
-void Conductor::processCommandMessage(const tmb::client_id sender, string *command_string) {
-  parser_wrapper_.feedNextBuffer(command_string);
-  ParseResult parse_result = parser_wrapper_.getNextStatement();
-
+void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
+  SqlParserWrapper parser_wrapper;
+  parser_wrapper.feedNextBuffer(command_string);
+  ParseResult parse_result = parser_wrapper.getNextStatement();
   CHECK(parse_result.condition == ParseResult::kSuccess)
       << "Any syntax error should be addressed in the DistributedCli.";
 
   const ParseStatement &statement = *parse_result.parsed_statement;
-  DCHECK_EQ(ParseStatement::kCommand, statement.getStatementType());
-
-  const ParseCommand &command = static_cast<const ParseCommand &>(statement);
-  const PtrVector<ParseString> &arguments = *(command.arguments());
-  const string &command_str = command.command()->value();
-
-  string command_response;
 
   try {
-    if (command_str == C::kDescribeDatabaseCommand) {
-      command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
-    } else if (command_str == C::kDescribeTableCommand) {
-      if (arguments.empty()) {
+    if (statement.getStatementType() == ParseStatement::kCommand) {
+      const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement);
+      const PtrVector<ParseString> &arguments = *(parse_command.arguments());
+      const string &command = parse_command.command()->value();
+
+      string command_response;
+      if (command == C::kDescribeDatabaseCommand) {
         command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
-      } else {
-        command_response = C::ExecuteDescribeTable(arguments, *catalog_database_);
+      } else if (command == C::kDescribeTableCommand) {
+        if (arguments.empty()) {
+          command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
+        } else {
+          command_response = C::ExecuteDescribeTable(arguments, *catalog_database_);
+        }
       }
-    }
-  } catch (const SqlError &command_error) {
-    // Set the query execution status along with the error message.
-    S::QueryExecutionErrorMessage proto;
-    proto.set_error_message(command_error.formatMessage(*command_string));
-
-    const size_t proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kQueryExecutionErrorMessage);
-    free(proto_bytes);
-
-    DLOG(INFO) << "Conductor sent QueryExecutionErrorMessage (typed '"
-               << kQueryExecutionErrorMessage
-               << "') to Distributed CLI " << sender;
-    CHECK(MessageBus::SendStatus::kOK ==
-        QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
-  }
-
-  S::CommandResponseMessage proto;
-  proto.set_command_response(command_response);
-
-  const size_t proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(malloc(proto_length));
-  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-  TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
-  free(proto_bytes);
-
-  DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage
-             << "') to Distributed CLI " << sender;
-  CHECK(MessageBus::SendStatus::kOK ==
-      QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
-}
-
-void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) {
-  parser_wrapper_.feedNextBuffer(command_string);
-  ParseResult parse_result = parser_wrapper_.getNextStatement();
-
-  CHECK(parse_result.condition == ParseResult::kSuccess)
-      << "Any SQL syntax error should be addressed in the DistributedCli.";
-
-  const ParseStatement &statement = *parse_result.parsed_statement;
-  DCHECK_NE(ParseStatement::kCommand, statement.getStatementType());
-
-  try {
-    auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),
-                                                 sender,
-                                                 statement.getPriority());
-    query_processor_->generateQueryHandle(statement, query_handle.get());
-    DCHECK(query_handle->getQueryPlanMutable() != nullptr);
-
-    QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
-        conductor_client_id_,
-        foreman_->getBusClientID(),
-        query_handle.release(),
-        &bus_);
+      S::CommandResponseMessage proto;
+      proto.set_command_response(command_response);
+
+      const size_t proto_length = proto.ByteSize();
+      char *proto_bytes = static_cast<char*>(malloc(proto_length));
+      CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+      TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
+      free(proto_bytes);
+
+      DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage
+                 << "') to Distributed CLI " << sender;
+      CHECK(MessageBus::SendStatus::kOK ==
+          QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
+    } else {
+      auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),
+                                                   sender,
+                                                   statement.getPriority());
+      query_processor_->generateQueryHandle(statement, query_handle.get());
+      DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+      QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+          conductor_client_id_,
+          foreman_->getBusClientID(),
+          query_handle.release(),
+          &bus_);
+    }
   } catch (const SqlError &sql_error) {
     // Set the query execution status along with the error message.
     S::QueryExecutionErrorMessage proto;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f25d1c1/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index e7e003f..0c0f7e5 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -24,7 +24,6 @@
 #include <string>
 
 #include "cli/distributed/Role.hpp"
-#include "parser/SqlParserWrapper.hpp"
 #include "query_execution/BlockLocator.hpp"
 #include "query_execution/ForemanDistributed.hpp"
 #include "query_optimizer/QueryProcessor.hpp"
@@ -61,12 +60,8 @@ class Conductor final : public Role {
   void run() override;
 
  private:
-  void processCommandMessage(const tmb::client_id sender, std::string *command_string);
-
   void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string);
 
-  SqlParserWrapper parser_wrapper_;
-
   std::unique_ptr<QueryProcessor> query_processor_;
   // Not owned.
   CatalogDatabase *catalog_database_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f25d1c1/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 47246d8..a45e8df 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -81,10 +81,6 @@ message ShiftbossRegistrationResponseMessage {
   required uint64 shiftboss_index = 1;
 }
 
-message CommandMessage {
-  required string command = 1;
-}
-
 message SqlQueryMessage {
   required string sql_query = 1;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f25d1c1/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 0fd0bdf..a49de5e 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -90,9 +90,7 @@ enum QueryExecutionMessageType : message_type_id {
   kDistributedCliRegistrationMessage,  // From CLI to Conductor.
   kDistributedCliRegistrationResponseMessage,  // From Conductor to CLI.
 
-  // From CLI to Conductor.
-  kCommandMessage,
-  kSqlQueryMessage,
+  kSqlQueryMessage, // From CLI to Conductor.
 
   kQueryInitiateMessage,  // From Foreman to Shiftboss.
   kQueryInitiateResponseMessage,  // From Shiftboss to Foreman.


[6/8] incubator-quickstep git commit: Set the default tmb_implementation value to purememory.

Posted by zu...@apache.org.
Set the default tmb_implementation value to purememory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/04b5f7e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/04b5f7e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/04b5f7e0

Branch: refs/heads/dist-patch
Commit: 04b5f7e07973cfc2e9267a8d15178920ad5d88c5
Parents: e37ec54
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Feb 11 11:17:55 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Mar 3 04:14:37 2017 -0800

----------------------------------------------------------------------
 third_party/src/tmb/src/tmb_net_server.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/04b5f7e0/third_party/src/tmb/src/tmb_net_server.cc
----------------------------------------------------------------------
diff --git a/third_party/src/tmb/src/tmb_net_server.cc b/third_party/src/tmb/src/tmb_net_server.cc
index c2be7ef..abe0efe 100644
--- a/third_party/src/tmb/src/tmb_net_server.cc
+++ b/third_party/src/tmb/src/tmb_net_server.cc
@@ -256,7 +256,7 @@ static bool ValidateTmbImplementation(const char *flagname,
                "Invalid value for --%s: %s\n", flagname, value.c_str());
   return false;
 }
-DEFINE_string(tmb_implementation, "nativelog",
+DEFINE_string(tmb_implementation, "purememory",
               "Which underlying TMB implementation to use. Valid choices are "
               "leveldb, nativelog, purememory, sqlite, voltdb, and zookeeper. "
               "Depending on how the TMB library was built, some of these may "


[5/8] incubator-quickstep git commit: Enabled dag viz by default.

Posted by zu...@apache.org.
Enabled dag viz by default.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1c6e16c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1c6e16c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1c6e16c6

Branch: refs/heads/dist-patch
Commit: 1c6e16c6825fe9e74d1f46f12a44c382f309b648
Parents: 04b5f7e
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Feb 11 11:40:33 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Mar 3 04:14:37 2017 -0800

----------------------------------------------------------------------
 query_execution/QueryManagerBase.cpp                        | 2 +-
 query_optimizer/tests/DistributedExecutionGeneratorTest.cpp | 3 +++
 2 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1c6e16c6/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index f84ad4e..2dd5467 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -37,7 +37,7 @@ using std::pair;
 
 namespace quickstep {
 
-DEFINE_bool(visualize_execution_dag, false,
+DEFINE_bool(visualize_execution_dag, true,
             "If true, visualize the execution plan DAG into a graph in DOT "
             "format (DOT is a plain text graph description language) which is "
             "then printed via stderr.");

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1c6e16c6/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
index b18b5ec..9b96e12 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
@@ -43,6 +43,8 @@ using std::make_unique;
 
 QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_EXECUTION_GENERATOR_TEST);
 
+namespace quickstep { DECLARE_bool(visualize_execution_dag); }
+
 int main(int argc, char** argv) {
   google::InitGoogleLogging(argv[0]);
 
@@ -52,6 +54,7 @@ int main(int argc, char** argv) {
   quickstep::optimizer::FLAGS_use_filter_joins = false;
 
   // Honor FLAGS_buffer_pool_slots in StorageManager.
+  quickstep::FLAGS_visualize_execution_dag = false;
   gflags::ParseCommandLineFlags(&argc, &argv, true);
 
   if (argc < 4) {


[7/8] incubator-quickstep git commit: Disabled LIP in the distributed version.

Posted by zu...@apache.org.
Disabled LIP in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/09d6f731
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/09d6f731
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/09d6f731

Branch: refs/heads/dist-patch
Commit: 09d6f731e62bb00b900f1f073f4049ba2f686953
Parents: 1c6e16c
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Feb 12 17:48:53 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Mar 3 04:14:37 2017 -0800

----------------------------------------------------------------------
 query_optimizer/PhysicalGenerator.cpp | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/09d6f731/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index ac51c31..ca6db3f 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -59,7 +59,7 @@ DEFINE_bool(reorder_hash_joins, true,
             "cardinality and selective tables to be joined first, which is suitable "
             "for queries on star-schema tables.");
 
-DEFINE_bool(use_filter_joins, true,
+DEFINE_bool(use_filter_joins, false,
             "If true, apply an optimization that strength-reduces HashJoins to "
             "FilterJoins (implemented as LIPFilters attached to some anchoring "
             "operators. Briefly speaking, in the case that the join attribute has "
@@ -67,7 +67,7 @@ DEFINE_bool(use_filter_joins, true,
             "build a BitVector on the build-side attribute and use the BitVector "
             "to filter the probe side table.");
 
-DEFINE_bool(use_lip_filters, true,
+DEFINE_bool(use_lip_filters, false,
             "If true, use LIP (Lookahead Information Passing) filters to accelerate "
             "query processing. LIP filters are effective for queries on star schema "
             "tables (e.g. the SSB benchmark) and snowflake schema tables (e.g. the "


[8/8] incubator-quickstep git commit: Refactored block loading order in StorageManager::loadBlockOrBlob.

Posted by zu...@apache.org.
Refactored block loading order in StorageManager::loadBlockOrBlob.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c5aa0412
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c5aa0412
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c5aa0412

Branch: refs/heads/dist-patch
Commit: c5aa041270cd4db8ae9b71af70f325ebda855598
Parents: 09d6f73
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Feb 11 11:24:54 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Mar 3 04:14:37 2017 -0800

----------------------------------------------------------------------
 storage/StorageManager.cpp | 60 ++++++++++++++++++++---------------------
 1 file changed, 29 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c5aa0412/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 94e1b67..b920c17 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -666,45 +666,43 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob(
   // already loaded before this function gets called.
   BlockHandle loaded_handle;
 
-#ifdef QUICKSTEP_DISTRIBUTED
-  // TODO(quickstep-team): Use a cost model to determine whether to load from
-  // a remote peer or the disk.
-  if (BlockIdUtil::Domain(block) != block_domain_) {
-    DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
-    const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
-    for (const string &peer_domain_network_address : peer_domain_network_addresses) {
-      DataExchangerClientAsync client(
-          grpc::CreateChannel(peer_domain_network_address, grpc::InsecureChannelCredentials()),
-          this);
-
-      if (client.Pull(block, numa_node, &loaded_handle)) {
-        sendBlockLocationMessage(block, kAddBlockLocationMessage);
-        return loaded_handle;
-      }
-    }
+  const size_t num_slots = file_manager_->numSlots(block);
+  if (num_slots != 0) {
+    void *block_buffer = allocateSlots(num_slots, numa_node);
 
-    DLOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
-               << " from remote peers, so try to load from disk.";
-  }
-#endif
+    const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
+    CHECK(status) << "Failed to read block from persistent storage: " << block;
 
-  const size_t num_slots = file_manager_->numSlots(block);
-  DEBUG_ASSERT(num_slots != 0);
-  void *block_buffer = allocateSlots(num_slots, numa_node);
+    loaded_handle.block_memory = block_buffer;
+    loaded_handle.block_memory_size = num_slots;
 
-  const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
-  CHECK(status) << "Failed to read block from persistent storage: " << block;
+#ifdef QUICKSTEP_DISTRIBUTED
+    if (bus_) {
+      sendBlockLocationMessage(block, kAddBlockLocationMessage);
+    }
+#endif
 
-  loaded_handle.block_memory = block_buffer;
-  loaded_handle.block_memory_size = num_slots;
+    return loaded_handle;
+  }
 
 #ifdef QUICKSTEP_DISTRIBUTED
-  if (bus_) {
-    sendBlockLocationMessage(block, kAddBlockLocationMessage);
+  // TODO(quickstep-team): Use a cost model to determine whether to load from
+  // a remote peer or the disk.
+  DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
+  const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
+  for (const string &peer_domain_network_address : peer_domain_network_addresses) {
+    DataExchangerClientAsync client(
+        grpc::CreateChannel(peer_domain_network_address, grpc::InsecureChannelCredentials()),
+        this);
+
+    if (client.Pull(block, numa_node, &loaded_handle)) {
+      sendBlockLocationMessage(block, kAddBlockLocationMessage);
+      return loaded_handle;
+    }
   }
+  LOG(FATAL) << "Failed to pull Block " << BlockIdUtil::ToString(block)
+             << " from remote peers.";
 #endif
-
-  return loaded_handle;
 }
 
 void StorageManager::insertBlockHandleAfterLoad(const block_id block,


[4/8] incubator-quickstep git commit: Added \analyze support in the distributed version.

Posted by zu...@apache.org.
Added \analyze support in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e37ec541
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e37ec541
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e37ec541

Branch: refs/heads/dist-patch
Commit: e37ec541c1490d8434eb441a1c0389e20f80ec6e
Parents: 7f25d1c
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Mar 2 23:36:28 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Mar 3 04:09:36 2017 -0800

----------------------------------------------------------------------
 cli/CMakeLists.txt                            |   2 +-
 cli/distributed/CMakeLists.txt                |   7 +
 cli/distributed/Cli.cpp                       |   8 +-
 cli/distributed/Conductor.cpp                 |  99 ++++++++++++-
 cli/distributed/Conductor.hpp                 |   8 +-
 query_execution/CMakeLists.txt                |  12 ++
 query_execution/ForemanDistributed.cpp        |  25 +++-
 query_execution/ForemanDistributed.hpp        |  12 +-
 query_execution/PolicyEnforcerDistributed.cpp | 155 ++++++++++++++++++---
 query_execution/PolicyEnforcerDistributed.hpp |  14 ++
 query_execution/QueryExecutionTypedefs.hpp    |   2 +-
 query_optimizer/CMakeLists.txt                |   1 +
 query_optimizer/QueryHandle.hpp               |  43 +++++-
 13 files changed, 351 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index ffeed2f..4562fe6 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -84,6 +84,7 @@ target_link_libraries(quickstep_cli_CommandExecutor
                       quickstep_expressions_aggregation_AggregateFunctionMax
                       quickstep_expressions_aggregation_AggregateFunctionMin
                       quickstep_parser_ParseStatement
+                      quickstep_parser_ParseString
                       quickstep_parser_SqlParserWrapper
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_queryoptimizer_QueryPlan
@@ -93,7 +94,6 @@ target_link_libraries(quickstep_cli_CommandExecutor
                       quickstep_storage_StorageManager
                       quickstep_storage_TupleIdSequence
                       quickstep_storage_TupleStorageSubBlock
-                      quickstep_parser_ParseString
                       quickstep_types_Type
                       quickstep_types_TypeID
                       quickstep_types_TypedValue

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt
index 1f7dee0..2db27e5 100644
--- a/cli/distributed/CMakeLists.txt
+++ b/cli/distributed/CMakeLists.txt
@@ -25,13 +25,19 @@ add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp)
 # Link dependencies:
 target_link_libraries(quickstep_cli_distributed_Conductor
                       glog
+                      quickstep_catalog_CatalogAttribute
                       quickstep_catalog_CatalogDatabase
+                      quickstep_catalog_CatalogRelation
+                      quickstep_catalog_CatalogTypedefs
                       quickstep_cli_CommandExecutorUtil
                       quickstep_cli_Constants
                       quickstep_cli_DefaultsConfigurator
                       quickstep_cli_Flags
                       quickstep_cli_distributed_Role
+                      quickstep_expressions_aggregation_AggregateFunctionMax
+                      quickstep_expressions_aggregation_AggregateFunctionMin
                       quickstep_parser_ParseStatement
+                      quickstep_parser_ParseString
                       quickstep_parser_SqlParserWrapper
                       quickstep_queryexecution_BlockLocator
                       quickstep_queryexecution_ForemanDistributed
@@ -42,6 +48,7 @@ target_link_libraries(quickstep_cli_distributed_Conductor
                       quickstep_queryoptimizer_QueryProcessor
                       quickstep_storage_StorageConstants
                       quickstep_utility_Macros
+                      quickstep_utility_PtrVector
                       quickstep_utility_SqlError
                       quickstep_utility_StringUtil
                       tmb)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 63f3259..14880a7 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -167,11 +167,9 @@ void Cli::run() {
           const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement);
           const std::string &command = parse_command.command()->value();
           try {
-            if (command == C::kAnalyzeCommand) {
-              // TODO(zuyu): support '\analyze'.
-              THROW_SQL_ERROR_AT(parse_command.command()) << "Unsupported Command";
-            } else if (command != C::kDescribeDatabaseCommand &&
-                       command != C::kDescribeTableCommand) {
+            if (command != C::kAnalyzeCommand &&
+                command != C::kDescribeDatabaseCommand &&
+                command != C::kDescribeTableCommand) {
               THROW_SQL_ERROR_AT(parse_command.command()) << "Invalid Command";
             }
           } catch (const SqlError &error) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index 1b8bfb2..5fb4453 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -27,13 +27,20 @@
 #include <sstream>
 #include <string>
 #include <utility>
+#include <vector>
 
+#include "catalog/CatalogAttribute.hpp"
 #include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
 #include "cli/CommandExecutorUtil.hpp"
 #include "cli/Constants.hpp"
 #include "cli/DefaultsConfigurator.hpp"
 #include "cli/Flags.hpp"
+#include "expressions/aggregation/AggregateFunctionMax.hpp"
+#include "expressions/aggregation/AggregateFunctionMin.hpp"
 #include "parser/ParseStatement.hpp"
+#include "parser/ParseString.hpp"
 #include "parser/SqlParserWrapper.hpp"
 #include "query_execution/BlockLocator.hpp"
 #include "query_execution/ForemanDistributed.hpp"
@@ -43,6 +50,7 @@
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryProcessor.hpp"
 #include "storage/StorageConstants.hpp"
+#include "utility/PtrVector.hpp"
 #include "utility/SqlError.hpp"
 #include "utility/StringUtil.hpp"
 
@@ -69,6 +77,8 @@ namespace quickstep {
 namespace C = cli;
 namespace S = serialization;
 
+class Type;
+
 void Conductor::init() {
   try {
     string catalog_path = FLAGS_storage_path + kCatalogFilename;
@@ -154,7 +164,7 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
   SqlParserWrapper parser_wrapper;
   parser_wrapper.feedNextBuffer(command_string);
   ParseResult parse_result = parser_wrapper.getNextStatement();
-  CHECK(parse_result.condition == ParseResult::kSuccess)
+  CHECK_EQ(ParseResult::kSuccess, parse_result.condition)
       << "Any syntax error should be addressed in the DistributedCli.";
 
   const ParseStatement &statement = *parse_result.parsed_statement;
@@ -165,6 +175,11 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
       const PtrVector<ParseString> &arguments = *(parse_command.arguments());
       const string &command = parse_command.command()->value();
 
+      if (command == C::kAnalyzeCommand) {
+        executeAnalyze(sender, arguments);
+        return;
+      }
+
       string command_response;
       if (command == C::kDescribeDatabaseCommand) {
         command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
@@ -225,4 +240,86 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
   }
 }
 
+void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<ParseString> &arguments) {
+  std::vector<std::reference_wrapper<const CatalogRelation>> relations;
+  if (arguments.empty()) {
+    relations.insert(relations.end(), catalog_database_->begin(), catalog_database_->end());
+  } else {
+    for (const auto &argument : arguments) {
+      const CatalogRelation *relation = catalog_database_->getRelationByName(argument.value());
+      if (relation == nullptr) {
+        THROW_SQL_ERROR_AT(&argument) << "Table does not exist";
+      }
+
+      relations.emplace_back(*relation);
+    }
+  }
+
+  // Analyze each relation in the database.
+  for (const CatalogRelation &relation : relations) {
+    const relation_id rel_id = relation.getID();
+    const string rel_name = EscapeQuotes(relation.getName(), '"');
+
+    // Get the number of distinct values for each column.
+    for (const CatalogAttribute &attribute : relation) {
+      const string attr_name = EscapeQuotes(attribute.getName(), '"');
+      const Type &attr_type = attribute.getType();
+      const bool is_min_applicable =
+          AggregateFunctionMin::Instance().canApplyToTypes({&attr_type});
+      const bool is_max_applicable =
+          AggregateFunctionMax::Instance().canApplyToTypes({&attr_type});
+
+      // NOTE(jianqiao): Note that the relation name and the attribute names may
+      // contain non-letter characters, e.g. CREATE TABLE "with space"("1" int).
+      // So here we need to format the names with double quotes (").
+      string *query_string = new string("SELECT COUNT(DISTINCT \"");
+      query_string->append(attr_name);
+      query_string->append("\")");
+      if (is_min_applicable) {
+        query_string->append(", MIN(\"");
+        query_string->append(attr_name);
+        query_string->append("\")");
+      }
+      if (is_max_applicable) {
+        query_string->append(", MAX(\"");
+        query_string->append(attr_name);
+        query_string->append("\")");
+      }
+      query_string->append(" FROM \"");
+      query_string->append(rel_name);
+      query_string->append("\";");
+
+      submitQuery(sender, query_string,
+                  new QueryHandle::AnalyzeQueryInfo(true /* is_analyze_attribute_query */, rel_id, relations.size(),
+                                                    attribute.getID(), is_min_applicable, is_max_applicable));
+    }
+
+    // Get the number of tuples for the relation.
+    string *query_string = new string("SELECT COUNT(*) FROM \"");
+    query_string->append(rel_name);
+    query_string->append("\";");
+
+    submitQuery(sender, query_string,
+                new QueryHandle::AnalyzeQueryInfo(false /* is_analyze_attribute_query */, rel_id, relations.size()));
+  }
+}
+
+void Conductor::submitQuery(const tmb::client_id sender, string *query, QueryHandle::AnalyzeQueryInfo *query_info) {
+  SqlParserWrapper parser_wrapper;
+  parser_wrapper.feedNextBuffer(query);
+  ParseResult parse_result = parser_wrapper.getNextStatement();
+  DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
+
+  const ParseStatement &statement = *parse_result.parsed_statement;
+
+  // Generate the query plan.
+  auto query_handle =
+      make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(), query_info);
+  query_processor_->generateQueryHandle(statement, query_handle.get());
+  DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+  QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+      conductor_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index 0c0f7e5..28c8e02 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -26,15 +26,17 @@
 #include "cli/distributed/Role.hpp"
 #include "query_execution/BlockLocator.hpp"
 #include "query_execution/ForemanDistributed.hpp"
+#include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryProcessor.hpp"
 #include "utility/Macros.hpp"
+#include "utility/PtrVector.hpp"
 
 #include "tmb/id_typedefs.h"
 
 namespace quickstep {
 
 class CatalogDatabase;
-class ParseCommand;
+class ParseString;
 
 /** \addtogroup CliDistributed
  *  @{
@@ -62,6 +64,10 @@ class Conductor final : public Role {
  private:
   void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string);
 
+  void executeAnalyze(const tmb::client_id sender, const PtrVector<ParseString> &arguments);
+
+  void submitQuery(const tmb::client_id sender, std::string *query, QueryHandle::AnalyzeQueryInfo *query_info);
+
   std::unique_ptr<QueryProcessor> query_processor_;
   // Not owned.
   CatalogDatabase *catalog_database_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 23b706f..82ff183 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -101,8 +101,10 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_catalog_CatalogRelation
                         quickstep_catalog_CatalogTypedefs
                         quickstep_catalog_Catalog_proto
+                        quickstep_cli_Flags
                         quickstep_queryexecution_AdmitRequestMessage
                         quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_BlockLocatorUtil
                         quickstep_queryexecution_ForemanBase
                         quickstep_queryexecution_PolicyEnforcerBase
                         quickstep_queryexecution_PolicyEnforcerDistributed
@@ -112,7 +114,9 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_QueryExecutionUtil
                         quickstep_queryexecution_ShiftbossDirectory
                         quickstep_relationaloperators_WorkOrder_proto
+                        quickstep_storage_DataExchangerAsync
                         quickstep_storage_StorageBlockInfo
+                        quickstep_storage_StorageManager
                         quickstep_threading_ThreadUtil
                         quickstep_utility_EqualsAnyConstant
                         quickstep_utility_Macros
@@ -151,7 +155,10 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase
 if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed
                         glog
+                        quickstep_catalog_CatalogDatabase
                         quickstep_catalog_CatalogRelation
+                        quickstep_catalog_CatalogRelationSchema
+                        quickstep_catalog_CatalogRelationStatistics
                         quickstep_catalog_CatalogTypedefs
                         quickstep_catalog_Catalog_proto
                         quickstep_queryexecution_PolicyEnforcerBase
@@ -166,7 +173,12 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_ShiftbossDirectory
                         quickstep_queryoptimizer_QueryHandle
                         quickstep_queryoptimizer_QueryProcessor
+                        quickstep_storage_StorageBlock
                         quickstep_storage_StorageBlockInfo
+                        quickstep_storage_StorageManager
+                        quickstep_storage_TupleIdSequence
+                        quickstep_storage_TupleStorageSubBlock
+                        quickstep_types_TypedValue
                         quickstep_utility_ExecutionDAGVisualizer
                         quickstep_utility_Macros
                         tmb

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 57f432f..3d47fb6 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -27,8 +27,10 @@
 #include "catalog/CatalogDatabase.hpp"
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
+#include "cli/Flags.hpp"
 #include "query_execution/AdmitRequestMessage.hpp"
 #include "query_execution/BlockLocator.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
 #include "query_execution/PolicyEnforcerBase.hpp"
 #include "query_execution/PolicyEnforcerDistributed.hpp"
 #include "query_execution/QueryContext.hpp"
@@ -37,7 +39,9 @@
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/ShiftbossDirectory.hpp"
 #include "relational_operators/WorkOrder.pb.h"
+#include "storage/DataExchangerAsync.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
 #include "threading/ThreadUtil.hpp"
 #include "utility/EqualsAnyConstant.hpp"
 
@@ -49,6 +53,7 @@
 #include "tmb/message_style.h"
 #include "tmb/tagged_message.h"
 
+using std::make_unique;
 using std::move;
 using std::size_t;
 using std::unique_ptr;
@@ -75,6 +80,7 @@ ForemanDistributed::ForemanDistributed(
       block_locator_(block_locator),
       catalog_database_(DCHECK_NOTNULL(catalog_database)) {
   const std::vector<QueryExecutionMessageType> sender_message_types{
+      kBlockDomainRegistrationMessage,
       kShiftbossRegistrationResponseMessage,
       kQueryInitiateMessage,
       kWorkOrderMessage,
@@ -82,6 +88,7 @@ ForemanDistributed::ForemanDistributed(
       kQueryTeardownMessage,
       kSaveQueryResultMessage,
       kQueryExecutionSuccessMessage,
+      kCommandResponseMessage,
       kPoisonMessage};
 
   for (const auto message_type : sender_message_types) {
@@ -89,6 +96,7 @@ ForemanDistributed::ForemanDistributed(
   }
 
   const std::vector<QueryExecutionMessageType> receiver_message_types{
+      kBlockDomainRegistrationResponseMessage,
       kShiftbossRegistrationMessage,
       kAdmitRequestMessage,
       kQueryInitiateResponseMessage,
@@ -105,12 +113,17 @@ ForemanDistributed::ForemanDistributed(
     bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
   }
 
-  policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>(
-      foreman_client_id_,
-      catalog_database_,
-      query_processor,
-      &shiftboss_directory_,
-      bus_);
+  client_id locator_client_id;
+  storage_manager_ = make_unique<StorageManager>(
+      FLAGS_storage_path,
+      block_locator::getBlockDomain(data_exchanger_.network_address(), foreman_client_id_, &locator_client_id, bus_),
+      locator_client_id, bus_);
+
+  data_exchanger_.set_storage_manager(storage_manager_.get());
+  data_exchanger_.start();
+
+  policy_enforcer_ = make_unique<PolicyEnforcerDistributed>(
+      foreman_client_id_, catalog_database_, query_processor, storage_manager_.get(), &shiftboss_directory_, bus_);
 }
 
 void ForemanDistributed::run() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 7fc98bd..4053b9d 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -25,6 +25,8 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/ForemanBase.hpp"
 #include "query_execution/ShiftbossDirectory.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
 #include "utility/Macros.hpp"
 
 #include "tmb/id_typedefs.h"
@@ -70,7 +72,11 @@ class ForemanDistributed final : public ForemanBase {
       QueryProcessor *query_processor,
       const int cpu_id = -1);
 
-  ~ForemanDistributed() override {}
+  ~ForemanDistributed() override {
+    data_exchanger_.shutdown();
+    storage_manager_.reset();
+    data_exchanger_.join();
+  }
 
   void printWorkOrderProfilingResults(const std::size_t query_id,
                                       std::FILE *out) const override;
@@ -126,6 +132,10 @@ class ForemanDistributed final : public ForemanBase {
 
   CatalogDatabaseLite *catalog_database_;
 
+  // Used for '\analyze'.
+  DataExchangerAsync data_exchanger_;
+  std::unique_ptr<StorageManager> storage_manager_;
+
   // From a query id to a set of Shiftbosses that save query result.
   std::unordered_map<std::size_t, std::unordered_set<std::size_t>> query_result_saved_shiftbosses_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 25f2d72..424452e 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -26,7 +26,10 @@
 #include <vector>
 
 #include "catalog/Catalog.pb.h"
+#include "catalog/CatalogDatabase.hpp"
 #include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogRelationStatistics.hpp"
 #include "query_execution/QueryContext.pb.h"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionState.hpp"
@@ -36,7 +39,12 @@
 #include "query_execution/QueryManagerDistributed.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryProcessor.hpp"
+#include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "types/TypedValue.hpp"
 #include "utility/ExecutionDAGVisualizer.hpp"
 
 #include "gflags/gflags.h"
@@ -50,7 +58,9 @@
 using std::free;
 using std::malloc;
 using std::move;
+using std::ostringstream;
 using std::size_t;
+using std::string;
 using std::unique_ptr;
 using std::vector;
 
@@ -231,7 +241,7 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand
 void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manager) {
   const QueryHandle *query_handle = query_manager->query_handle();
 
-  const CatalogRelation *query_result = query_handle->getQueryResultRelation();
+  const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
   const tmb::client_id cli_id = query_handle->getClientId();
   const std::size_t query_id = query_handle->query_id();
 
@@ -259,7 +269,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
     shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
   }
 
-  if (query_result == nullptr) {
+  if (query_result_relation == nullptr) {
     if (query_processor_) {
       query_processor_->saveCatalog();
     }
@@ -272,17 +282,12 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
     char *proto_bytes = static_cast<char*>(malloc(proto_length));
     CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kQueryTeardownMessage);
+    TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage);
     free(proto_bytes);
 
     DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
                << "') to all Shiftbosses";
-    QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
-                                         shiftboss_addresses,
-                                         move(message),
-                                         bus_);
+    QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
 
     TaggedMessage cli_message(kQueryExecutionSuccessMessage);
 
@@ -299,12 +304,33 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
     return;
   }
 
+  const QueryHandle::AnalyzeQueryInfo *analyze_query_info = query_handle->analyze_query_info();
+  if (analyze_query_info) {
+    processAnalyzeQueryResult(cli_id, query_result_relation, analyze_query_info);
+
+    // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
+    S::QueryTeardownMessage proto;
+    proto.set_query_id(query_id);
+
+    const size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage);
+    free(proto_bytes);
+
+    DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
+               << "') to all Shiftbosses";
+    QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
+    return;
+  }
+
   // NOTE(zuyu): SaveQueryResultMessage implicitly triggers QueryTeardown in Shiftboss.
   S::SaveQueryResultMessage proto;
   proto.set_query_id(query_id);
-  proto.set_relation_id(query_result->getID());
+  proto.set_relation_id(query_result_relation->getID());
 
-  const vector<block_id> blocks(query_result->getBlocksSnapshot());
+  const vector<block_id> blocks(query_result_relation->getBlocksSnapshot());
   for (const block_id block : blocks) {
     proto.add_blocks(block);
   }
@@ -315,18 +341,111 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
   char *proto_bytes = static_cast<char*>(malloc(proto_length));
   CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                        proto_length,
-                        kSaveQueryResultMessage);
+  TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kSaveQueryResultMessage);
   free(proto_bytes);
 
   // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
   DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
              << "') to all Shiftbosses";
-  QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
-                                       shiftboss_addresses,
-                                       move(message),
-                                       bus_);
+  QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
+}
+
+void PolicyEnforcerDistributed::processAnalyzeQueryResult(const tmb::client_id cli_id,
+                                                          const CatalogRelation *query_result_relation,
+                                                          const QueryHandle::AnalyzeQueryInfo *analyze_query_info) {
+  const relation_id rel_id = analyze_query_info->rel_id;
+  CatalogRelation *mutable_relation =
+      static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(rel_id);
+  CatalogRelationStatistics *mutable_stat = mutable_relation->getStatisticsMutable();
+
+  const auto analyze_query_result = [this, &query_result_relation]() {
+    const vector<block_id> blocks = query_result_relation->getBlocksSnapshot();
+    DCHECK_EQ(1u, blocks.size());
+
+    vector<TypedValue> query_result;
+    {
+      BlockReference block = storage_manager_->getBlock(blocks.front(), *query_result_relation);
+      const TupleStorageSubBlock &tuple_store = block->getTupleStorageSubBlock();
+      DCHECK_EQ(1, tuple_store.numTuples());
+
+      const std::size_t num_columns = tuple_store.getRelation().size();
+      if (tuple_store.isPacked()) {
+        for (std::size_t i = 0; i < num_columns; ++i) {
+          query_result.emplace_back(tuple_store.getAttributeValueTyped(0, i));
+        }
+      } else {
+        std::unique_ptr<TupleIdSequence> existence_map(tuple_store.getExistenceMap());
+        for (std::size_t i = 0; i < num_columns; ++i) {
+          query_result.emplace_back(
+              tuple_store.getAttributeValueTyped(*existence_map->begin(), i));
+        }
+      }
+    }
+
+    // Clean up the query result relation.
+    for (const block_id block : blocks) {
+      storage_manager_->deleteBlockOrBlobFile(block);
+    }
+    catalog_database_->dropRelationById(query_result_relation->getID());
+
+    return query_result;
+  }();
+
+  if (analyze_query_info->is_analyze_attribute_query) {
+    const attribute_id attr_id = analyze_query_info->attr_id;
+
+    auto cit = analyze_query_result.begin();
+    DCHECK_EQ(TypeID::kLong, cit->getTypeID());
+    mutable_stat->setNumDistinctValues(attr_id, cit->getLiteral<std::int64_t>());
+
+    if (analyze_query_info->is_min_applicable) {
+      ++cit;
+      mutable_stat->setMinValue(attr_id, *cit);
+    }
+
+    if (analyze_query_info->is_max_applicable) {
+      ++cit;
+      mutable_stat->setMaxValue(attr_id, *cit);
+    }
+  } else {
+    completed_analyze_relations_[cli_id].push_back(rel_id);
+
+    DCHECK_EQ(1u, analyze_query_result.size());
+    const TypedValue &num_tuples = analyze_query_result.front();
+    DCHECK_EQ(TypeID::kLong, num_tuples.getTypeID());
+    mutable_stat->setNumTuples(num_tuples.getLiteral<std::int64_t>());
+
+    mutable_stat->setExactness(true);
+
+    if (completed_analyze_relations_[cli_id].size() == analyze_query_info->num_relations) {
+      query_processor_->markCatalogAltered();
+      query_processor_->saveCatalog();
+
+      ostringstream analyze_command_response;
+      for (const relation_id rel_id : completed_analyze_relations_[cli_id]) {
+        analyze_command_response << "Analyzing " << catalog_database_->getRelationSchemaById(rel_id).getName()
+                                 << " ... done\n";
+      }
+
+      S::CommandResponseMessage proto;
+      proto.set_command_response(analyze_command_response.str());
+
+      const size_t proto_length = proto.ByteSize();
+      char *proto_bytes = static_cast<char*>(malloc(proto_length));
+      CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+      TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
+      free(proto_bytes);
+
+      DLOG(INFO) << "PolicyEnforcerDistributed sent CommandResponseMessage (typed '" << kCommandResponseMessage
+                 << "') to CLI with TMB client id " << cli_id;
+      const tmb::MessageBus::SendStatus send_status =
+          QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id, move(message));
+      CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+
+      completed_analyze_relations_.erase(cli_id);
+    }
+  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 18fd9ae..5334da0 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -28,6 +28,7 @@
 #include "query_execution/QueryManagerBase.hpp"
 #include "query_execution/ShiftbossDirectory.hpp"
 #include "query_optimizer/QueryHandle.hpp"
+#include "types/TypedValue.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -42,7 +43,9 @@ class TaggedMessage;
 namespace quickstep {
 
 class CatalogDatabaseLite;
+class CatalogRelation;
 class QueryProcessor;
+class StorageManager;
 
 /** \addtogroup QueryExecution
  *  @{
@@ -61,16 +64,20 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
    * @param catalog_database The CatalogDatabase used.
    * @param query_processor The QueryProcessor to save catalog upon the query
    *        completion.
+   * @param storage_manager The StorageManager to use.
+   * @param shiftboss_directory The ShiftbossDirectory to manage Shiftbosses.
    * @param bus The TMB.
    **/
   PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
                             CatalogDatabaseLite *catalog_database,
                             QueryProcessor *query_processor,
+                            StorageManager *storage_manager,
                             ShiftbossDirectory *shiftboss_directory,
                             tmb::MessageBus *bus)
       : PolicyEnforcerBase(catalog_database),
         foreman_client_id_(foreman_client_id),
         query_processor_(query_processor),
+        storage_manager_(storage_manager),
         shiftboss_directory_(shiftboss_directory),
         bus_(bus) {}
 
@@ -157,13 +164,20 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
 
   void initiateQueryInShiftboss(QueryHandle *query_handle);
 
+  void processAnalyzeQueryResult(const tmb::client_id cli_id,
+                                 const CatalogRelation *query_result_relation,
+                                 const QueryHandle::AnalyzeQueryInfo *analyze_query_info);
+
   const tmb::client_id foreman_client_id_;
 
   QueryProcessor *query_processor_;
+  StorageManager *storage_manager_;
   ShiftbossDirectory *shiftboss_directory_;
 
   tmb::MessageBus *bus_;
 
+  std::unordered_map<tmb::client_id, std::vector<relation_id>> completed_analyze_relations_;
+
   DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerDistributed);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index a49de5e..081852f 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -90,7 +90,7 @@ enum QueryExecutionMessageType : message_type_id {
   kDistributedCliRegistrationMessage,  // From CLI to Conductor.
   kDistributedCliRegistrationResponseMessage,  // From Conductor to CLI.
 
-  kSqlQueryMessage, // From CLI to Conductor.
+  kSqlQueryMessage,  // From CLI to Conductor.
 
   kQueryInitiateMessage,  // From Foreman to Shiftboss.
   kQueryInitiateResponseMessage,  // From Shiftboss to Foreman.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 3ff783c..9bdb753 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -229,6 +229,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
                       quickstep_utility_Macros
                       quickstep_utility_PlanVisualizer)
 target_link_libraries(quickstep_queryoptimizer_QueryHandle
+                      quickstep_catalog_CatalogTypedefs
                       quickstep_catalog_Catalog_proto
                       quickstep_queryexecution_QueryContext_proto
                       quickstep_queryoptimizer_QueryPlan

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 7cb4f68..6feeb4c 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -25,6 +25,7 @@
 #include <memory>
 
 #include "catalog/Catalog.pb.h"
+#include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.pb.h"
 #include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "query_optimizer/QueryPlan.hpp"
@@ -45,19 +46,47 @@ class CatalogRelation;
  **/
 class QueryHandle {
  public:
+  // The info for generated queries during executing '\analyze'.
+  struct AnalyzeQueryInfo {
+    AnalyzeQueryInfo(const bool is_analyze_attribute_query_in,
+                     const relation_id rel_id_in,
+                     const std::size_t num_relations_in,
+                     const attribute_id attr_id_in = kInvalidCatalogId,
+                     const bool is_min_applicable_in = false,
+                     const bool is_max_applicable_in = false)
+      : is_analyze_attribute_query(is_analyze_attribute_query_in),
+        rel_id(rel_id_in),
+        num_relations(num_relations_in),
+        attr_id(attr_id_in),
+        is_min_applicable(is_min_applicable_in),
+        is_max_applicable(is_max_applicable_in) {}
+
+    const bool is_analyze_attribute_query;
+    const relation_id rel_id;
+    const std::size_t num_relations;
+
+    // Only valid if 'is_analyze_attribute_query' is true.
+    const attribute_id attr_id;
+    const bool is_min_applicable;
+    const bool is_max_applicable;
+  };
+
   /**
    * @brief Constructor.
    *
    * @param query_id The given query id.
    * @param cli_id The client id of the CLI which submits the query.
    * @param query_priority The priority of this query.
+   * @param analyze_query_info The info of this analyze query.
    */
-  explicit QueryHandle(const std::size_t query_id,
-                       const tmb::client_id cli_id,
-                       const std::uint64_t query_priority = 1)
+  QueryHandle(const std::size_t query_id,
+              const tmb::client_id cli_id,
+              const std::uint64_t query_priority = 1,
+              AnalyzeQueryInfo *analyze_query_info = nullptr)
       : query_id_(query_id),
         cli_id_(cli_id),
         query_priority_(query_priority),
+        analyze_query_info_(analyze_query_info),
         query_plan_(new QueryPlan()),
         query_result_relation_(nullptr) {}
 
@@ -87,6 +116,13 @@ class QueryHandle {
   }
 
   /**
+   * @brief Get the query info for the command '\analyze'.
+   **/
+  const AnalyzeQueryInfo* analyze_query_info() const {
+    return analyze_query_info_.get();
+  }
+
+  /**
    * @return The const query plan.
    */
   const QueryPlan& getQueryPlan() const {
@@ -165,6 +201,7 @@ class QueryHandle {
   const tmb::client_id cli_id_;
 
   const std::uint64_t query_priority_;
+  std::unique_ptr<AnalyzeQueryInfo> analyze_query_info_;
 
   std::unique_ptr<QueryPlan> query_plan_;
 



[2/8] incubator-quickstep git commit: Refactored catalog saving in the distributed version.

Posted by zu...@apache.org.
Refactored catalog saving in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ccb2852f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ccb2852f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ccb2852f

Branch: refs/heads/dist-patch
Commit: ccb2852f71da77d364d4bfcb276cb6318b751a8c
Parents: 964a806
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Mar 2 17:14:50 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Mar 2 17:14:50 2017 -0800

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                         |  1 +
 query_execution/ForemanDistributed.cpp                 |  5 ++---
 query_execution/ForemanDistributed.hpp                 |  6 +++---
 query_execution/PolicyEnforcerDistributed.cpp          |  5 ++++-
 query_execution/PolicyEnforcerDistributed.hpp          | 13 ++++++-------
 .../tests/DistributedExecutionGeneratorTestRunner.cpp  |  6 ++----
 6 files changed, 18 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 12d6be0..23b706f 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -165,6 +165,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_QueryManagerDistributed
                         quickstep_queryexecution_ShiftbossDirectory
                         quickstep_queryoptimizer_QueryHandle
+                        quickstep_queryoptimizer_QueryProcessor
                         quickstep_storage_StorageBlockInfo
                         quickstep_utility_ExecutionDAGVisualizer
                         quickstep_utility_Macros

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 389d6ab..57f432f 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -17,7 +17,6 @@
 #include <cstddef>
 #include <cstdio>
 #include <cstdlib>
-#include <functional>
 #include <memory>
 #include <unordered_map>
 #include <unordered_set>
@@ -68,9 +67,9 @@ class QueryHandle;
 
 ForemanDistributed::ForemanDistributed(
     const BlockLocator &block_locator,
-    std::function<void()> &&save_catalog_callback,
     MessageBus *bus,
     CatalogDatabaseLite *catalog_database,
+    QueryProcessor *query_processor,
     const int cpu_id)
     : ForemanBase(bus, cpu_id),
       block_locator_(block_locator),
@@ -108,8 +107,8 @@ ForemanDistributed::ForemanDistributed(
 
   policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>(
       foreman_client_id_,
-      move(save_catalog_callback),
       catalog_database_,
+      query_processor,
       &shiftboss_directory_,
       bus_);
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 5f1a14b..7fc98bd 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -17,7 +17,6 @@
 
 #include <cstddef>
 #include <cstdio>
-#include <functional>
 #include <memory>
 #include <unordered_map>
 #include <unordered_set>
@@ -36,6 +35,7 @@ namespace quickstep {
 
 class BlockLocator;
 class CatalogDatabaseLite;
+class QueryProcessor;
 
 namespace serialization { class WorkOrderMessage; }
 
@@ -56,7 +56,7 @@ class ForemanDistributed final : public ForemanBase {
    * @param block_locator The block locator that manages block location info.
    * @param bus A pointer to the TMB.
    * @param catalog_database The catalog database where this query is executed.
-   * @param save_catalog_callback The callback used to save catalog upon the query
+   * @param query_processor The QueryProcessor to save catalog upon the query
    *        completion.
    * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
    *
@@ -65,9 +65,9 @@ class ForemanDistributed final : public ForemanBase {
   **/
   ForemanDistributed(
       const BlockLocator &block_locator,
-      std::function<void()> &&save_catalog_callback,
       tmb::MessageBus *bus,
       CatalogDatabaseLite *catalog_database,
+      QueryProcessor *query_processor,
       const int cpu_id = -1);
 
   ~ForemanDistributed() override {}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 6ee58a8..25f2d72 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -35,6 +35,7 @@
 #include "query_execution/QueryManagerBase.hpp"
 #include "query_execution/QueryManagerDistributed.hpp"
 #include "query_optimizer/QueryHandle.hpp"
+#include "query_optimizer/QueryProcessor.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/ExecutionDAGVisualizer.hpp"
 
@@ -259,7 +260,9 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
   }
 
   if (query_result == nullptr) {
-    save_catalog_callback_();
+    if (query_processor_) {
+      query_processor_->saveCatalog();
+    }
 
     // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
     serialization::QueryTeardownMessage proto;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index f8476c8..18fd9ae 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -16,7 +16,6 @@
 #define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
 
 #include <cstddef>
-#include <functional>
 #include <memory>
 #include <unordered_map>
 #include <utility>
@@ -43,6 +42,7 @@ class TaggedMessage;
 namespace quickstep {
 
 class CatalogDatabaseLite;
+class QueryProcessor;
 
 /** \addtogroup QueryExecution
  *  @{
@@ -58,19 +58,19 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
    * @brief Constructor.
    *
    * @param foreman_client_id The TMB client ID of the Foreman.
-   * @param save_catalog_callback The callback used to save catalog upon the query
-   *        completion.
    * @param catalog_database The CatalogDatabase used.
+   * @param query_processor The QueryProcessor to save catalog upon the query
+   *        completion.
    * @param bus The TMB.
    **/
   PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
-                            std::function<void()> &&save_catalog_callback,
                             CatalogDatabaseLite *catalog_database,
+                            QueryProcessor *query_processor,
                             ShiftbossDirectory *shiftboss_directory,
                             tmb::MessageBus *bus)
       : PolicyEnforcerBase(catalog_database),
         foreman_client_id_(foreman_client_id),
-        save_catalog_callback_(std::move(save_catalog_callback)),
+        query_processor_(query_processor),
         shiftboss_directory_(shiftboss_directory),
         bus_(bus) {}
 
@@ -159,8 +159,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
 
   const tmb::client_id foreman_client_id_;
 
-  const std::function<void()> save_catalog_callback_;
-
+  QueryProcessor *query_processor_;
   ShiftbossDirectory *shiftboss_directory_;
 
   tmb::MessageBus *bus_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 3b1259a..0eeb83f 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -71,8 +71,6 @@ namespace {
 
 constexpr int kNumInstances = 3;
 
-void nop() {}
-
 }  // namespace
 
 const char *DistributedExecutionGeneratorTestRunner::kResetOption =
@@ -110,8 +108,8 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
 
   // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former
   // could receive a registration message from the latter.
-  foreman_ = make_unique<ForemanDistributed>(*block_locator_, std::bind(&nop), &bus_,
-                                             test_database_loader_->catalog_database());
+  foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, test_database_loader_->catalog_database(),
+                                             nullptr /* query_processor */);
 
   // We don't use the NUMA aware version of worker code.
   const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,