You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/03/06 20:18:22 UTC

[25/38] incubator-quickstep git commit: Added \analyze support in the distributed version.

Added \analyze support in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e37ec541
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e37ec541
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e37ec541

Branch: refs/heads/reorder-partitioned-hash-join
Commit: e37ec541c1490d8434eb441a1c0389e20f80ec6e
Parents: 7f25d1c
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Mar 2 23:36:28 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Mar 3 04:09:36 2017 -0800

----------------------------------------------------------------------
 cli/CMakeLists.txt                            |   2 +-
 cli/distributed/CMakeLists.txt                |   7 +
 cli/distributed/Cli.cpp                       |   8 +-
 cli/distributed/Conductor.cpp                 |  99 ++++++++++++-
 cli/distributed/Conductor.hpp                 |   8 +-
 query_execution/CMakeLists.txt                |  12 ++
 query_execution/ForemanDistributed.cpp        |  25 +++-
 query_execution/ForemanDistributed.hpp        |  12 +-
 query_execution/PolicyEnforcerDistributed.cpp | 155 ++++++++++++++++++---
 query_execution/PolicyEnforcerDistributed.hpp |  14 ++
 query_execution/QueryExecutionTypedefs.hpp    |   2 +-
 query_optimizer/CMakeLists.txt                |   1 +
 query_optimizer/QueryHandle.hpp               |  43 +++++-
 13 files changed, 351 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index ffeed2f..4562fe6 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -84,6 +84,7 @@ target_link_libraries(quickstep_cli_CommandExecutor
                       quickstep_expressions_aggregation_AggregateFunctionMax
                       quickstep_expressions_aggregation_AggregateFunctionMin
                       quickstep_parser_ParseStatement
+                      quickstep_parser_ParseString
                       quickstep_parser_SqlParserWrapper
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_queryoptimizer_QueryPlan
@@ -93,7 +94,6 @@ target_link_libraries(quickstep_cli_CommandExecutor
                       quickstep_storage_StorageManager
                       quickstep_storage_TupleIdSequence
                       quickstep_storage_TupleStorageSubBlock
-                      quickstep_parser_ParseString
                       quickstep_types_Type
                       quickstep_types_TypeID
                       quickstep_types_TypedValue

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt
index 1f7dee0..2db27e5 100644
--- a/cli/distributed/CMakeLists.txt
+++ b/cli/distributed/CMakeLists.txt
@@ -25,13 +25,19 @@ add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp)
 # Link dependencies:
 target_link_libraries(quickstep_cli_distributed_Conductor
                       glog
+                      quickstep_catalog_CatalogAttribute
                       quickstep_catalog_CatalogDatabase
+                      quickstep_catalog_CatalogRelation
+                      quickstep_catalog_CatalogTypedefs
                       quickstep_cli_CommandExecutorUtil
                       quickstep_cli_Constants
                       quickstep_cli_DefaultsConfigurator
                       quickstep_cli_Flags
                       quickstep_cli_distributed_Role
+                      quickstep_expressions_aggregation_AggregateFunctionMax
+                      quickstep_expressions_aggregation_AggregateFunctionMin
                       quickstep_parser_ParseStatement
+                      quickstep_parser_ParseString
                       quickstep_parser_SqlParserWrapper
                       quickstep_queryexecution_BlockLocator
                       quickstep_queryexecution_ForemanDistributed
@@ -42,6 +48,7 @@ target_link_libraries(quickstep_cli_distributed_Conductor
                       quickstep_queryoptimizer_QueryProcessor
                       quickstep_storage_StorageConstants
                       quickstep_utility_Macros
+                      quickstep_utility_PtrVector
                       quickstep_utility_SqlError
                       quickstep_utility_StringUtil
                       tmb)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/Cli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp
index 63f3259..14880a7 100644
--- a/cli/distributed/Cli.cpp
+++ b/cli/distributed/Cli.cpp
@@ -167,11 +167,9 @@ void Cli::run() {
           const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement);
           const std::string &command = parse_command.command()->value();
           try {
-            if (command == C::kAnalyzeCommand) {
-              // TODO(zuyu): support '\analyze'.
-              THROW_SQL_ERROR_AT(parse_command.command()) << "Unsupported Command";
-            } else if (command != C::kDescribeDatabaseCommand &&
-                       command != C::kDescribeTableCommand) {
+            if (command != C::kAnalyzeCommand &&
+                command != C::kDescribeDatabaseCommand &&
+                command != C::kDescribeTableCommand) {
               THROW_SQL_ERROR_AT(parse_command.command()) << "Invalid Command";
             }
           } catch (const SqlError &error) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index 1b8bfb2..5fb4453 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -27,13 +27,20 @@
 #include <sstream>
 #include <string>
 #include <utility>
+#include <vector>
 
+#include "catalog/CatalogAttribute.hpp"
 #include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
 #include "cli/CommandExecutorUtil.hpp"
 #include "cli/Constants.hpp"
 #include "cli/DefaultsConfigurator.hpp"
 #include "cli/Flags.hpp"
+#include "expressions/aggregation/AggregateFunctionMax.hpp"
+#include "expressions/aggregation/AggregateFunctionMin.hpp"
 #include "parser/ParseStatement.hpp"
+#include "parser/ParseString.hpp"
 #include "parser/SqlParserWrapper.hpp"
 #include "query_execution/BlockLocator.hpp"
 #include "query_execution/ForemanDistributed.hpp"
@@ -43,6 +50,7 @@
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryProcessor.hpp"
 #include "storage/StorageConstants.hpp"
+#include "utility/PtrVector.hpp"
 #include "utility/SqlError.hpp"
 #include "utility/StringUtil.hpp"
 
@@ -69,6 +77,8 @@ namespace quickstep {
 namespace C = cli;
 namespace S = serialization;
 
+class Type;
+
 void Conductor::init() {
   try {
     string catalog_path = FLAGS_storage_path + kCatalogFilename;
@@ -154,7 +164,7 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
   SqlParserWrapper parser_wrapper;
   parser_wrapper.feedNextBuffer(command_string);
   ParseResult parse_result = parser_wrapper.getNextStatement();
-  CHECK(parse_result.condition == ParseResult::kSuccess)
+  CHECK_EQ(ParseResult::kSuccess, parse_result.condition)
       << "Any syntax error should be addressed in the DistributedCli.";
 
   const ParseStatement &statement = *parse_result.parsed_statement;
@@ -165,6 +175,11 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
       const PtrVector<ParseString> &arguments = *(parse_command.arguments());
       const string &command = parse_command.command()->value();
 
+      if (command == C::kAnalyzeCommand) {
+        executeAnalyze(sender, arguments);
+        return;
+      }
+
       string command_response;
       if (command == C::kDescribeDatabaseCommand) {
         command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_);
@@ -225,4 +240,86 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
   }
 }
 
+void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<ParseString> &arguments) {
+  std::vector<std::reference_wrapper<const CatalogRelation>> relations;
+  if (arguments.empty()) {
+    relations.insert(relations.end(), catalog_database_->begin(), catalog_database_->end());
+  } else {
+    for (const auto &argument : arguments) {
+      const CatalogRelation *relation = catalog_database_->getRelationByName(argument.value());
+      if (relation == nullptr) {
+        THROW_SQL_ERROR_AT(&argument) << "Table does not exist";
+      }
+
+      relations.emplace_back(*relation);
+    }
+  }
+
+  // Analyze each relation in the database.
+  for (const CatalogRelation &relation : relations) {
+    const relation_id rel_id = relation.getID();
+    const string rel_name = EscapeQuotes(relation.getName(), '"');
+
+    // Get the number of distinct values for each column.
+    for (const CatalogAttribute &attribute : relation) {
+      const string attr_name = EscapeQuotes(attribute.getName(), '"');
+      const Type &attr_type = attribute.getType();
+      const bool is_min_applicable =
+          AggregateFunctionMin::Instance().canApplyToTypes({&attr_type});
+      const bool is_max_applicable =
+          AggregateFunctionMax::Instance().canApplyToTypes({&attr_type});
+
+      // NOTE(jianqiao): Note that the relation name and the attribute names may
+      // contain non-letter characters, e.g. CREATE TABLE "with space"("1" int).
+      // So here we need to format the names with double quotes (").
+      string *query_string = new string("SELECT COUNT(DISTINCT \"");
+      query_string->append(attr_name);
+      query_string->append("\")");
+      if (is_min_applicable) {
+        query_string->append(", MIN(\"");
+        query_string->append(attr_name);
+        query_string->append("\")");
+      }
+      if (is_max_applicable) {
+        query_string->append(", MAX(\"");
+        query_string->append(attr_name);
+        query_string->append("\")");
+      }
+      query_string->append(" FROM \"");
+      query_string->append(rel_name);
+      query_string->append("\";");
+
+      submitQuery(sender, query_string,
+                  new QueryHandle::AnalyzeQueryInfo(true /* is_analyze_attribute_query */, rel_id, relations.size(),
+                                                    attribute.getID(), is_min_applicable, is_max_applicable));
+    }
+
+    // Get the number of tuples for the relation.
+    string *query_string = new string("SELECT COUNT(*) FROM \"");
+    query_string->append(rel_name);
+    query_string->append("\";");
+
+    submitQuery(sender, query_string,
+                new QueryHandle::AnalyzeQueryInfo(false /* is_analyze_attribute_query */, rel_id, relations.size()));
+  }
+}
+
+void Conductor::submitQuery(const tmb::client_id sender, string *query, QueryHandle::AnalyzeQueryInfo *query_info) {
+  SqlParserWrapper parser_wrapper;
+  parser_wrapper.feedNextBuffer(query);
+  ParseResult parse_result = parser_wrapper.getNextStatement();
+  DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
+
+  const ParseStatement &statement = *parse_result.parsed_statement;
+
+  // Generate the query plan.
+  auto query_handle =
+      make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(), query_info);
+  query_processor_->generateQueryHandle(statement, query_handle.get());
+  DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+
+  QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
+      conductor_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_);
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/Conductor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp
index 0c0f7e5..28c8e02 100644
--- a/cli/distributed/Conductor.hpp
+++ b/cli/distributed/Conductor.hpp
@@ -26,15 +26,17 @@
 #include "cli/distributed/Role.hpp"
 #include "query_execution/BlockLocator.hpp"
 #include "query_execution/ForemanDistributed.hpp"
+#include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryProcessor.hpp"
 #include "utility/Macros.hpp"
+#include "utility/PtrVector.hpp"
 
 #include "tmb/id_typedefs.h"
 
 namespace quickstep {
 
 class CatalogDatabase;
-class ParseCommand;
+class ParseString;
 
 /** \addtogroup CliDistributed
  *  @{
@@ -62,6 +64,10 @@ class Conductor final : public Role {
  private:
   void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string);
 
+  void executeAnalyze(const tmb::client_id sender, const PtrVector<ParseString> &arguments);
+
+  void submitQuery(const tmb::client_id sender, std::string *query, QueryHandle::AnalyzeQueryInfo *query_info);
+
   std::unique_ptr<QueryProcessor> query_processor_;
   // Not owned.
   CatalogDatabase *catalog_database_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 23b706f..82ff183 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -101,8 +101,10 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_catalog_CatalogRelation
                         quickstep_catalog_CatalogTypedefs
                         quickstep_catalog_Catalog_proto
+                        quickstep_cli_Flags
                         quickstep_queryexecution_AdmitRequestMessage
                         quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_BlockLocatorUtil
                         quickstep_queryexecution_ForemanBase
                         quickstep_queryexecution_PolicyEnforcerBase
                         quickstep_queryexecution_PolicyEnforcerDistributed
@@ -112,7 +114,9 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_QueryExecutionUtil
                         quickstep_queryexecution_ShiftbossDirectory
                         quickstep_relationaloperators_WorkOrder_proto
+                        quickstep_storage_DataExchangerAsync
                         quickstep_storage_StorageBlockInfo
+                        quickstep_storage_StorageManager
                         quickstep_threading_ThreadUtil
                         quickstep_utility_EqualsAnyConstant
                         quickstep_utility_Macros
@@ -151,7 +155,10 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase
 if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed
                         glog
+                        quickstep_catalog_CatalogDatabase
                         quickstep_catalog_CatalogRelation
+                        quickstep_catalog_CatalogRelationSchema
+                        quickstep_catalog_CatalogRelationStatistics
                         quickstep_catalog_CatalogTypedefs
                         quickstep_catalog_Catalog_proto
                         quickstep_queryexecution_PolicyEnforcerBase
@@ -166,7 +173,12 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_ShiftbossDirectory
                         quickstep_queryoptimizer_QueryHandle
                         quickstep_queryoptimizer_QueryProcessor
+                        quickstep_storage_StorageBlock
                         quickstep_storage_StorageBlockInfo
+                        quickstep_storage_StorageManager
+                        quickstep_storage_TupleIdSequence
+                        quickstep_storage_TupleStorageSubBlock
+                        quickstep_types_TypedValue
                         quickstep_utility_ExecutionDAGVisualizer
                         quickstep_utility_Macros
                         tmb

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 57f432f..3d47fb6 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -27,8 +27,10 @@
 #include "catalog/CatalogDatabase.hpp"
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
+#include "cli/Flags.hpp"
 #include "query_execution/AdmitRequestMessage.hpp"
 #include "query_execution/BlockLocator.hpp"
+#include "query_execution/BlockLocatorUtil.hpp"
 #include "query_execution/PolicyEnforcerBase.hpp"
 #include "query_execution/PolicyEnforcerDistributed.hpp"
 #include "query_execution/QueryContext.hpp"
@@ -37,7 +39,9 @@
 #include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/ShiftbossDirectory.hpp"
 #include "relational_operators/WorkOrder.pb.h"
+#include "storage/DataExchangerAsync.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
 #include "threading/ThreadUtil.hpp"
 #include "utility/EqualsAnyConstant.hpp"
 
@@ -49,6 +53,7 @@
 #include "tmb/message_style.h"
 #include "tmb/tagged_message.h"
 
+using std::make_unique;
 using std::move;
 using std::size_t;
 using std::unique_ptr;
@@ -75,6 +80,7 @@ ForemanDistributed::ForemanDistributed(
       block_locator_(block_locator),
       catalog_database_(DCHECK_NOTNULL(catalog_database)) {
   const std::vector<QueryExecutionMessageType> sender_message_types{
+      kBlockDomainRegistrationMessage,
       kShiftbossRegistrationResponseMessage,
       kQueryInitiateMessage,
       kWorkOrderMessage,
@@ -82,6 +88,7 @@ ForemanDistributed::ForemanDistributed(
       kQueryTeardownMessage,
       kSaveQueryResultMessage,
       kQueryExecutionSuccessMessage,
+      kCommandResponseMessage,
       kPoisonMessage};
 
   for (const auto message_type : sender_message_types) {
@@ -89,6 +96,7 @@ ForemanDistributed::ForemanDistributed(
   }
 
   const std::vector<QueryExecutionMessageType> receiver_message_types{
+      kBlockDomainRegistrationResponseMessage,
       kShiftbossRegistrationMessage,
       kAdmitRequestMessage,
       kQueryInitiateResponseMessage,
@@ -105,12 +113,17 @@ ForemanDistributed::ForemanDistributed(
     bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
   }
 
-  policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>(
-      foreman_client_id_,
-      catalog_database_,
-      query_processor,
-      &shiftboss_directory_,
-      bus_);
+  client_id locator_client_id;
+  storage_manager_ = make_unique<StorageManager>(
+      FLAGS_storage_path,
+      block_locator::getBlockDomain(data_exchanger_.network_address(), foreman_client_id_, &locator_client_id, bus_),
+      locator_client_id, bus_);
+
+  data_exchanger_.set_storage_manager(storage_manager_.get());
+  data_exchanger_.start();
+
+  policy_enforcer_ = make_unique<PolicyEnforcerDistributed>(
+      foreman_client_id_, catalog_database_, query_processor, storage_manager_.get(), &shiftboss_directory_, bus_);
 }
 
 void ForemanDistributed::run() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 7fc98bd..4053b9d 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -25,6 +25,8 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/ForemanBase.hpp"
 #include "query_execution/ShiftbossDirectory.hpp"
+#include "storage/DataExchangerAsync.hpp"
+#include "storage/StorageManager.hpp"
 #include "utility/Macros.hpp"
 
 #include "tmb/id_typedefs.h"
@@ -70,7 +72,11 @@ class ForemanDistributed final : public ForemanBase {
       QueryProcessor *query_processor,
       const int cpu_id = -1);
 
-  ~ForemanDistributed() override {}
+  ~ForemanDistributed() override {
+    data_exchanger_.shutdown();
+    storage_manager_.reset();
+    data_exchanger_.join();
+  }
 
   void printWorkOrderProfilingResults(const std::size_t query_id,
                                       std::FILE *out) const override;
@@ -126,6 +132,10 @@ class ForemanDistributed final : public ForemanBase {
 
   CatalogDatabaseLite *catalog_database_;
 
+  // Used for '\analyze'.
+  DataExchangerAsync data_exchanger_;
+  std::unique_ptr<StorageManager> storage_manager_;
+
   // From a query id to a set of Shiftbosses that save query result.
   std::unordered_map<std::size_t, std::unordered_set<std::size_t>> query_result_saved_shiftbosses_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 25f2d72..424452e 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -26,7 +26,10 @@
 #include <vector>
 
 #include "catalog/Catalog.pb.h"
+#include "catalog/CatalogDatabase.hpp"
 #include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogRelationStatistics.hpp"
 #include "query_execution/QueryContext.pb.h"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionState.hpp"
@@ -36,7 +39,12 @@
 #include "query_execution/QueryManagerDistributed.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryProcessor.hpp"
+#include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "types/TypedValue.hpp"
 #include "utility/ExecutionDAGVisualizer.hpp"
 
 #include "gflags/gflags.h"
@@ -50,7 +58,9 @@
 using std::free;
 using std::malloc;
 using std::move;
+using std::ostringstream;
 using std::size_t;
+using std::string;
 using std::unique_ptr;
 using std::vector;
 
@@ -231,7 +241,7 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand
 void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manager) {
   const QueryHandle *query_handle = query_manager->query_handle();
 
-  const CatalogRelation *query_result = query_handle->getQueryResultRelation();
+  const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
   const tmb::client_id cli_id = query_handle->getClientId();
   const std::size_t query_id = query_handle->query_id();
 
@@ -259,7 +269,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
     shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
   }
 
-  if (query_result == nullptr) {
+  if (query_result_relation == nullptr) {
     if (query_processor_) {
       query_processor_->saveCatalog();
     }
@@ -272,17 +282,12 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
     char *proto_bytes = static_cast<char*>(malloc(proto_length));
     CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kQueryTeardownMessage);
+    TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage);
     free(proto_bytes);
 
     DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
                << "') to all Shiftbosses";
-    QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
-                                         shiftboss_addresses,
-                                         move(message),
-                                         bus_);
+    QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
 
     TaggedMessage cli_message(kQueryExecutionSuccessMessage);
 
@@ -299,12 +304,33 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
     return;
   }
 
+  const QueryHandle::AnalyzeQueryInfo *analyze_query_info = query_handle->analyze_query_info();
+  if (analyze_query_info) {
+    processAnalyzeQueryResult(cli_id, query_result_relation, analyze_query_info);
+
+    // Clean up query execution states, i.e., QueryContext, in Shiftbosses.
+    S::QueryTeardownMessage proto;
+    proto.set_query_id(query_id);
+
+    const size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage);
+    free(proto_bytes);
+
+    DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage
+               << "') to all Shiftbosses";
+    QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
+    return;
+  }
+
   // NOTE(zuyu): SaveQueryResultMessage implicitly triggers QueryTeardown in Shiftboss.
   S::SaveQueryResultMessage proto;
   proto.set_query_id(query_id);
-  proto.set_relation_id(query_result->getID());
+  proto.set_relation_id(query_result_relation->getID());
 
-  const vector<block_id> blocks(query_result->getBlocksSnapshot());
+  const vector<block_id> blocks(query_result_relation->getBlocksSnapshot());
   for (const block_id block : blocks) {
     proto.add_blocks(block);
   }
@@ -315,18 +341,111 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage
   char *proto_bytes = static_cast<char*>(malloc(proto_length));
   CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
-                        proto_length,
-                        kSaveQueryResultMessage);
+  TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kSaveQueryResultMessage);
   free(proto_bytes);
 
   // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
   DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
              << "') to all Shiftbosses";
-  QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
-                                       shiftboss_addresses,
-                                       move(message),
-                                       bus_);
+  QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_);
+}
+
+void PolicyEnforcerDistributed::processAnalyzeQueryResult(const tmb::client_id cli_id,
+                                                          const CatalogRelation *query_result_relation,
+                                                          const QueryHandle::AnalyzeQueryInfo *analyze_query_info) {
+  const relation_id rel_id = analyze_query_info->rel_id;
+  CatalogRelation *mutable_relation =
+      static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(rel_id);
+  CatalogRelationStatistics *mutable_stat = mutable_relation->getStatisticsMutable();
+
+  const auto analyze_query_result = [this, &query_result_relation]() {
+    const vector<block_id> blocks = query_result_relation->getBlocksSnapshot();
+    DCHECK_EQ(1u, blocks.size());
+
+    vector<TypedValue> query_result;
+    {
+      BlockReference block = storage_manager_->getBlock(blocks.front(), *query_result_relation);
+      const TupleStorageSubBlock &tuple_store = block->getTupleStorageSubBlock();
+      DCHECK_EQ(1, tuple_store.numTuples());
+
+      const std::size_t num_columns = tuple_store.getRelation().size();
+      if (tuple_store.isPacked()) {
+        for (std::size_t i = 0; i < num_columns; ++i) {
+          query_result.emplace_back(tuple_store.getAttributeValueTyped(0, i));
+        }
+      } else {
+        std::unique_ptr<TupleIdSequence> existence_map(tuple_store.getExistenceMap());
+        for (std::size_t i = 0; i < num_columns; ++i) {
+          query_result.emplace_back(
+              tuple_store.getAttributeValueTyped(*existence_map->begin(), i));
+        }
+      }
+    }
+
+    // Clean up the query result relation.
+    for (const block_id block : blocks) {
+      storage_manager_->deleteBlockOrBlobFile(block);
+    }
+    catalog_database_->dropRelationById(query_result_relation->getID());
+
+    return query_result;
+  }();
+
+  if (analyze_query_info->is_analyze_attribute_query) {
+    const attribute_id attr_id = analyze_query_info->attr_id;
+
+    auto cit = analyze_query_result.begin();
+    DCHECK_EQ(TypeID::kLong, cit->getTypeID());
+    mutable_stat->setNumDistinctValues(attr_id, cit->getLiteral<std::int64_t>());
+
+    if (analyze_query_info->is_min_applicable) {
+      ++cit;
+      mutable_stat->setMinValue(attr_id, *cit);
+    }
+
+    if (analyze_query_info->is_max_applicable) {
+      ++cit;
+      mutable_stat->setMaxValue(attr_id, *cit);
+    }
+  } else {
+    completed_analyze_relations_[cli_id].push_back(rel_id);
+
+    DCHECK_EQ(1u, analyze_query_result.size());
+    const TypedValue &num_tuples = analyze_query_result.front();
+    DCHECK_EQ(TypeID::kLong, num_tuples.getTypeID());
+    mutable_stat->setNumTuples(num_tuples.getLiteral<std::int64_t>());
+
+    mutable_stat->setExactness(true);
+
+    if (completed_analyze_relations_[cli_id].size() == analyze_query_info->num_relations) {
+      query_processor_->markCatalogAltered();
+      query_processor_->saveCatalog();
+
+      ostringstream analyze_command_response;
+      for (const relation_id rel_id : completed_analyze_relations_[cli_id]) {
+        analyze_command_response << "Analyzing " << catalog_database_->getRelationSchemaById(rel_id).getName()
+                                 << " ... done\n";
+      }
+
+      S::CommandResponseMessage proto;
+      proto.set_command_response(analyze_command_response.str());
+
+      const size_t proto_length = proto.ByteSize();
+      char *proto_bytes = static_cast<char*>(malloc(proto_length));
+      CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+      TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage);
+      free(proto_bytes);
+
+      DLOG(INFO) << "PolicyEnforcerDistributed sent CommandResponseMessage (typed '" << kCommandResponseMessage
+                 << "') to CLI with TMB client id " << cli_id;
+      const tmb::MessageBus::SendStatus send_status =
+          QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id, move(message));
+      CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
+
+      completed_analyze_relations_.erase(cli_id);
+    }
+  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 18fd9ae..5334da0 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -28,6 +28,7 @@
 #include "query_execution/QueryManagerBase.hpp"
 #include "query_execution/ShiftbossDirectory.hpp"
 #include "query_optimizer/QueryHandle.hpp"
+#include "types/TypedValue.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -42,7 +43,9 @@ class TaggedMessage;
 namespace quickstep {
 
 class CatalogDatabaseLite;
+class CatalogRelation;
 class QueryProcessor;
+class StorageManager;
 
 /** \addtogroup QueryExecution
  *  @{
@@ -61,16 +64,20 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
    * @param catalog_database The CatalogDatabase used.
    * @param query_processor The QueryProcessor to save catalog upon the query
    *        completion.
+   * @param storage_manager The StorageManager to use.
+   * @param shiftboss_directory The ShiftbossDirectory to manage Shiftbosses.
    * @param bus The TMB.
    **/
   PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
                             CatalogDatabaseLite *catalog_database,
                             QueryProcessor *query_processor,
+                            StorageManager *storage_manager,
                             ShiftbossDirectory *shiftboss_directory,
                             tmb::MessageBus *bus)
       : PolicyEnforcerBase(catalog_database),
         foreman_client_id_(foreman_client_id),
         query_processor_(query_processor),
+        storage_manager_(storage_manager),
         shiftboss_directory_(shiftboss_directory),
         bus_(bus) {}
 
@@ -157,13 +164,20 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
 
   void initiateQueryInShiftboss(QueryHandle *query_handle);
 
+  void processAnalyzeQueryResult(const tmb::client_id cli_id,
+                                 const CatalogRelation *query_result_relation,
+                                 const QueryHandle::AnalyzeQueryInfo *analyze_query_info);
+
   const tmb::client_id foreman_client_id_;
 
   QueryProcessor *query_processor_;
+  StorageManager *storage_manager_;
   ShiftbossDirectory *shiftboss_directory_;
 
   tmb::MessageBus *bus_;
 
+  std::unordered_map<tmb::client_id, std::vector<relation_id>> completed_analyze_relations_;
+
   DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerDistributed);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index a49de5e..081852f 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -90,7 +90,7 @@ enum QueryExecutionMessageType : message_type_id {
   kDistributedCliRegistrationMessage,  // From CLI to Conductor.
   kDistributedCliRegistrationResponseMessage,  // From Conductor to CLI.
 
-  kSqlQueryMessage, // From CLI to Conductor.
+  kSqlQueryMessage,  // From CLI to Conductor.
 
   kQueryInitiateMessage,  // From Foreman to Shiftboss.
   kQueryInitiateResponseMessage,  // From Shiftboss to Foreman.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 3ff783c..9bdb753 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -229,6 +229,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
                       quickstep_utility_Macros
                       quickstep_utility_PlanVisualizer)
 target_link_libraries(quickstep_queryoptimizer_QueryHandle
+                      quickstep_catalog_CatalogTypedefs
                       quickstep_catalog_Catalog_proto
                       quickstep_queryexecution_QueryContext_proto
                       quickstep_queryoptimizer_QueryPlan

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 7cb4f68..6feeb4c 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -25,6 +25,7 @@
 #include <memory>
 
 #include "catalog/Catalog.pb.h"
+#include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.pb.h"
 #include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "query_optimizer/QueryPlan.hpp"
@@ -45,19 +46,47 @@ class CatalogRelation;
  **/
 class QueryHandle {
  public:
+  // The info for generated queries during executing '\analyze'.
+  struct AnalyzeQueryInfo {
+    AnalyzeQueryInfo(const bool is_analyze_attribute_query_in,
+                     const relation_id rel_id_in,
+                     const std::size_t num_relations_in,
+                     const attribute_id attr_id_in = kInvalidCatalogId,
+                     const bool is_min_applicable_in = false,
+                     const bool is_max_applicable_in = false)
+      : is_analyze_attribute_query(is_analyze_attribute_query_in),
+        rel_id(rel_id_in),
+        num_relations(num_relations_in),
+        attr_id(attr_id_in),
+        is_min_applicable(is_min_applicable_in),
+        is_max_applicable(is_max_applicable_in) {}
+
+    const bool is_analyze_attribute_query;
+    const relation_id rel_id;
+    const std::size_t num_relations;
+
+    // Only valid if 'is_analyze_attribute_query' is true.
+    const attribute_id attr_id;
+    const bool is_min_applicable;
+    const bool is_max_applicable;
+  };
+
   /**
    * @brief Constructor.
    *
    * @param query_id The given query id.
    * @param cli_id The client id of the CLI which submits the query.
    * @param query_priority The priority of this query.
+   * @param analyze_query_info The info of this analyze query.
    */
-  explicit QueryHandle(const std::size_t query_id,
-                       const tmb::client_id cli_id,
-                       const std::uint64_t query_priority = 1)
+  QueryHandle(const std::size_t query_id,
+              const tmb::client_id cli_id,
+              const std::uint64_t query_priority = 1,
+              AnalyzeQueryInfo *analyze_query_info = nullptr)
       : query_id_(query_id),
         cli_id_(cli_id),
         query_priority_(query_priority),
+        analyze_query_info_(analyze_query_info),
         query_plan_(new QueryPlan()),
         query_result_relation_(nullptr) {}
 
@@ -87,6 +116,13 @@ class QueryHandle {
   }
 
   /**
+   * @brief Get the query info for the command '\analyze'.
+   **/
+  const AnalyzeQueryInfo* analyze_query_info() const {
+    return analyze_query_info_.get();
+  }
+
+  /**
    * @return The const query plan.
    */
   const QueryPlan& getQueryPlan() const {
@@ -165,6 +201,7 @@ class QueryHandle {
   const tmb::client_id cli_id_;
 
   const std::uint64_t query_priority_;
+  std::unique_ptr<AnalyzeQueryInfo> analyze_query_info_;
 
   std::unique_ptr<QueryPlan> query_plan_;