You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/03/29 00:08:17 UTC

[13/40] incubator-quickstep git commit: Added the query print support in the distributed version.

Added the query print support in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/68fc7456
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/68fc7456
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/68fc7456

Branch: refs/heads/new-op
Commit: 68fc745648fa51b13631510682054dedc01ab094
Parents: 6909e7c
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Mar 13 13:37:18 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Mar 13 16:41:53 2017 -0700

----------------------------------------------------------------------
 cli/Flags.cpp                             |  4 ++
 cli/Flags.hpp                             |  2 +
 cli/QuickstepCli.cpp                      |  3 --
 cli/distributed/Conductor.cpp             | 16 +++++--
 relational_operators/WorkOrderFactory.cpp | 64 ++++++++++++++++----------
 5 files changed, 59 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68fc7456/cli/Flags.cpp
----------------------------------------------------------------------
diff --git a/cli/Flags.cpp b/cli/Flags.cpp
index 74915ae..362eac3 100644
--- a/cli/Flags.cpp
+++ b/cli/Flags.cpp
@@ -32,6 +32,10 @@ using std::fprintf;
 
 namespace quickstep {
 
+DEFINE_bool(print_query, false,
+            "Print each input query statement. This is useful when running a "
+            "large number of queries in a batch.");
+
 DEFINE_bool(initialize_db, false, "If true, initialize a database.");
 
 static bool ValidateNumWorkers(const char *flagname, int value) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68fc7456/cli/Flags.hpp
----------------------------------------------------------------------
diff --git a/cli/Flags.hpp b/cli/Flags.hpp
index a268e39..1ae37c4 100644
--- a/cli/Flags.hpp
+++ b/cli/Flags.hpp
@@ -33,6 +33,8 @@ namespace quickstep {
  * single-node and the distributed version.
  **/
 
+DECLARE_bool(print_query);
+
 DECLARE_bool(initialize_db);
 
 DECLARE_int32(num_workers);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68fc7456/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 26cb154..c2634bc 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -119,9 +119,6 @@ using tmb::client_id;
 
 namespace quickstep {
 
-DEFINE_bool(print_query, false,
-            "Print each input query statement. This is useful when running a "
-            "large number of queries in a batch.");
 DEFINE_string(profile_file_name, "",
               "If nonempty, enable profiling using GOOGLE CPU Profiler, and write "
               "its output to the given file name. This flag has no effect if "

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68fc7456/cli/distributed/Conductor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp
index a8408ef..a13ab21 100644
--- a/cli/distributed/Conductor.cpp
+++ b/cli/distributed/Conductor.cpp
@@ -214,9 +214,11 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm
       CHECK(MessageBus::SendStatus::kOK ==
           QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message)));
     } else {
-      auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(),
-                                                   sender,
-                                                   statement.getPriority());
+      if (FLAGS_print_query) {
+        printf("\nQuery %zu: %s\n", query_processor_->query_id(), command_string->c_str());
+      }
+
+      auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority());
       query_processor_->generateQueryHandle(statement, query_handle.get());
       DCHECK(query_handle->getQueryPlanMutable() != nullptr);
 
@@ -299,6 +301,10 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars
       query_string->append(rel_name);
       query_string->append("\";");
 
+      if (FLAGS_print_query) {
+        printf("\nQuery %zu: %s\n", query_processor_->query_id(), query_string->c_str());
+      }
+
       parser_wrapper.feedNextBuffer(query_string);
       const ParseResult parse_result = parser_wrapper.getNextStatement();
       DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);
@@ -322,6 +328,10 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars
     query_string->append(rel_name);
     query_string->append("\";");
 
+    if (FLAGS_print_query) {
+      printf("\nQuery %zu: %s\n", query_processor_->query_id(), query_string->c_str());
+    }
+
     parser_wrapper.feedNextBuffer(query_string);
     const ParseResult parse_result = parser_wrapper.getNextStatement();
     DCHECK_EQ(ParseResult::kSuccess, parse_result.condition);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68fc7456/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index ae57e6f..56f431b 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -84,7 +84,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
 
   switch (proto.work_order_type()) {
     case serialization::AGGREGATION: {
-      LOG(INFO) << "Creating AggregationWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating AggregationWorkOrder for Query " << proto.query_id()
+                << " in Shiftboss " << shiftboss_index;
       return new AggregationWorkOrder(
           proto.query_id(),
           proto.GetExtension(serialization::AggregationWorkOrder::block_id),
@@ -94,7 +95,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index), query_context));
     }
     case serialization::BUILD_AGGREGATION_EXISTENCE_MAP: {
-      LOG(INFO) << "Creating BuildAggregationExistenceMapWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating BuildAggregationExistenceMapWorkOrder for Query " << proto.query_id()
+                << " in Shiftboss " << shiftboss_index;
 
       return new BuildAggregationExistenceMapWorkOrder(
           proto.query_id(),
@@ -107,7 +109,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           storage_manager);
     }
     case serialization::BUILD_LIP_FILTER: {
-      LOG(INFO) << "Creating BuildLIPFilterWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating BuildLIPFilterWorkOrder for Query " << proto.query_id()
+                << " in Shiftboss " << shiftboss_index;
 
       const QueryContext::lip_deployment_id lip_deployment_index =
           proto.GetExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index);
@@ -124,7 +127,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           CreateLIPFilterBuilderHelper(lip_deployment_index, query_context));
     }
     case serialization::BUILD_HASH: {
-      LOG(INFO) << "Creating BuildHashWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating BuildHashWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
       vector<attribute_id> join_key_attributes;
       for (int i = 0; i < proto.ExtensionSize(serialization::BuildHashWorkOrder::join_key_attributes); ++i) {
         join_key_attributes.push_back(
@@ -149,7 +152,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::BuildHashWorkOrder::lip_deployment_index), query_context));
     }
     case serialization::DELETE: {
-      LOG(INFO) << "Creating DeleteWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating DeleteWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
       return new DeleteWorkOrder(
           proto.query_id(),
           catalog_database->getRelationSchemaById(
@@ -163,7 +166,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           bus);
     }
     case serialization::DESTROY_AGGREGATION_STATE: {
-      LOG(INFO) << "Creating DestroyAggregationStateWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating DestroyAggregationStateWorkOrder for Query " << proto.query_id()
+                << " in Shiftboss " << shiftboss_index;
       return new DestroyAggregationStateWorkOrder(
           proto.query_id(),
           proto.GetExtension(
@@ -171,7 +175,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           query_context);
     }
     case serialization::DESTROY_HASH: {
-      LOG(INFO) << "Creating DestroyHashWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating DestroyHashWorkOrder for Query " << proto.query_id()
+                << " in Shiftboss " << shiftboss_index;
       return new DestroyHashWorkOrder(
           proto.query_id(),
           proto.GetExtension(
@@ -181,7 +186,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           query_context);
     }
     case serialization::DROP_TABLE: {
-      LOG(INFO) << "Creating DropTableWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating DropTableWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
       vector<block_id> blocks;
       for (int i = 0; i < proto.ExtensionSize(serialization::DropTableWorkOrder::block_ids); ++i) {
         blocks.push_back(
@@ -198,7 +203,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           catalog_database);
     }
     case serialization::FINALIZE_AGGREGATION: {
-      LOG(INFO) << "Creating FinalizeAggregationWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating FinalizeAggregationWorkOrder for Query " << proto.query_id()
+                << " in Shiftboss " << shiftboss_index;
       // TODO(quickstep-team): Handle inner-table partitioning in the distributed
       // setting.
       return new FinalizeAggregationWorkOrder(
@@ -258,7 +264,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
 
       switch (hash_join_work_order_type) {
         case serialization::HashJoinWorkOrder::HASH_ANTI_JOIN: {
-          LOG(INFO) << "Creating HashAntiJoinWorkOrder in Shiftboss " << shiftboss_index;
+          LOG(INFO) << "Creating HashAntiJoinWorkOrder for Query " << proto.query_id()
+                    << " in Shiftboss " << shiftboss_index;
           return new HashAntiJoinWorkOrder(
               proto.query_id(),
               build_relation,
@@ -275,7 +282,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               lip_filter_adaptive_prober);
         }
         case serialization::HashJoinWorkOrder::HASH_INNER_JOIN: {
-          LOG(INFO) << "Creating HashInnerJoinWorkOrder in Shiftboss " << shiftboss_index;
+          LOG(INFO) << "Creating HashInnerJoinWorkOrder for Query " << proto.query_id()
+                    << " in Shiftboss " << shiftboss_index;
           return new HashInnerJoinWorkOrder(
               proto.query_id(),
               build_relation,
@@ -300,7 +308,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
                 proto.GetExtension(serialization::HashJoinWorkOrder::is_selection_on_build, i));
           }
 
-          LOG(INFO) << "Creating HashOuterJoinWorkOrder in Shiftboss " << shiftboss_index;
+          LOG(INFO) << "Creating HashOuterJoinWorkOrder for Query " << proto.query_id()
+                    << " in Shiftboss " << shiftboss_index;
           return new HashOuterJoinWorkOrder(
               proto.query_id(),
               build_relation,
@@ -317,7 +326,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               lip_filter_adaptive_prober);
         }
         case serialization::HashJoinWorkOrder::HASH_SEMI_JOIN: {
-          LOG(INFO) << "Creating HashSemiJoinWorkOrder in Shiftboss " << shiftboss_index;
+          LOG(INFO) << "Creating HashSemiJoinWorkOrder for Query " << proto.query_id()
+                    << " in Shiftboss " << shiftboss_index;
           return new HashSemiJoinWorkOrder(
               proto.query_id(),
               build_relation,
@@ -338,7 +348,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       }
     }
     case serialization::INSERT: {
-      LOG(INFO) << "Creating InsertWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating InsertWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
       return new InsertWorkOrder(
           proto.query_id(),
           query_context->getInsertDestination(
@@ -347,7 +357,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::InsertWorkOrder::tuple_index)));
     }
     case serialization::NESTED_LOOP_JOIN: {
-      LOG(INFO) << "Creating NestedLoopsJoinWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating NestedLoopsJoinWorkOrder for Query " << proto.query_id()
+                << " in Shiftboss " << shiftboss_index;
       return new NestedLoopsJoinWorkOrder(
           proto.query_id(),
           catalog_database->getRelationSchemaById(
@@ -365,7 +376,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           storage_manager);
     }
     case serialization::SAMPLE: {
-      LOG(INFO) << "Creating SampleWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating SampleWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
       return new SampleWorkOrder(
           proto.query_id(),
           catalog_database->getRelationSchemaById(
@@ -378,7 +389,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           storage_manager);
     }
     case serialization::SAVE_BLOCKS: {
-      LOG(INFO) << "Creating SaveBlocksWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating SaveBlocksWorkOrder for Query " << proto.query_id()
+                << " in Shiftboss " << shiftboss_index;
       return new SaveBlocksWorkOrder(
           proto.query_id(),
           proto.GetExtension(serialization::SaveBlocksWorkOrder::block_id),
@@ -386,7 +398,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           storage_manager);
     }
     case serialization::SELECT: {
-      LOG(INFO) << "Creating SelectWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating SelectWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
       const bool simple_projection =
           proto.GetExtension(serialization::SelectWorkOrder::simple_projection);
       vector<attribute_id> simple_selection;
@@ -414,7 +426,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::SelectWorkOrder::lip_deployment_index), query_context));
     }
     case serialization::SORT_MERGE_RUN: {
-      LOG(INFO) << "Creating SortMergeRunWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating SortMergeRunWorkOrder for Query " << proto.query_id()
+                << " in Shiftboss " << shiftboss_index;
       vector<merge_run_operator::Run> runs;
       for (int i = 0; i < proto.ExtensionSize(serialization::SortMergeRunWorkOrder::runs); ++i) {
         merge_run_operator::Run run;
@@ -443,7 +456,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           bus);
     }
     case serialization::SORT_RUN_GENERATION: {
-      LOG(INFO) << "Creating SortRunGenerationWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating SortRunGenerationWorkOrder for Query " << proto.query_id()
+                << " in Shiftboss " << shiftboss_index;
       return new SortRunGenerationWorkOrder(
           proto.query_id(),
           catalog_database->getRelationSchemaById(
@@ -456,7 +470,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           storage_manager);
     }
     case serialization::TABLE_GENERATOR: {
-      LOG(INFO) << "Creating SortRunGenerationWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating SortRunGenerationWorkOrder for Query " << proto.query_id()
+                << " in Shiftboss " << shiftboss_index;
       return new TableGeneratorWorkOrder(
           proto.query_id(),
           query_context->getGeneratorFunctionHandle(
@@ -465,7 +480,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index)));
     }
     case serialization::TEXT_SCAN: {
-      LOG(INFO) << "Creating TextScanWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating TextScanWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
       return new TextScanWorkOrder(
           proto.query_id(),
           proto.GetExtension(serialization::TextScanWorkOrder::filename),
@@ -478,7 +493,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           hdfs);
     }
     case serialization::UPDATE: {
-      LOG(INFO) << "Creating UpdateWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating UpdateWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
       return new UpdateWorkOrder(
           proto.query_id(),
           catalog_database->getRelationSchemaById(
@@ -496,7 +511,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           bus);
     }
     case serialization::WINDOW_AGGREGATION: {
-      LOG(INFO) << "Creating WindowAggregationWorkOrder in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating WindowAggregationWorkOrder for Query " << proto.query_id()
+                << " in Shiftboss " << shiftboss_index;
       vector<block_id> blocks;
       for (int i = 0; i < proto.ExtensionSize(serialization::WindowAggregationWorkOrder::block_ids); ++i) {
         blocks.push_back(