You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/03 08:12:59 UTC

[1/3] incubator-quickstep git commit: Cleaned up the messages w/ a dummy payload. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/refactor-hashjoin-probe-build 7c5bdf92e -> a61b99e9e (forced update)


Cleaned up the messages w/ a dummy payload.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ccea2ff8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ccea2ff8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ccea2ff8

Branch: refs/heads/refactor-hashjoin-probe-build
Commit: ccea2ff83ea73e950d52c152cc422a9e93cf6aad
Parents: 52a32a3
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Fri Jul 29 23:52:33 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Mon Aug 1 10:11:19 2016 -0700

----------------------------------------------------------------------
 query_execution/ForemanSingleNode.cpp           |   4 +-
 query_execution/QueryExecutionMessages.proto    |   4 -
 query_execution/QueryExecutionUtil.hpp          |   5 +-
 query_execution/README.md                       | 110 +++++++++----------
 query_execution/WorkerMessage.hpp               |  13 +--
 query_execution/tests/BlockLocator_unittest.cpp |  11 +-
 storage/tests/DataExchange_unittest.cpp         |  11 +-
 7 files changed, 60 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index cda02a7..d2b56ae 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -167,9 +167,7 @@ void ForemanSingleNode::run() {
     if (!policy_enforcer_->hasQueries()) {
       // Signal the main thread that there are no queries to be executed.
       // Currently the message doesn't have any real content.
-      const int dummy_payload = 0;
-      TaggedMessage completion_tagged_message(
-          &dummy_payload, sizeof(dummy_payload), kWorkloadCompletionMessage);
+      TaggedMessage completion_tagged_message(kWorkloadCompletionMessage);
       const tmb::MessageBus::SendStatus send_status =
           QueryExecutionUtil::SendTMBMessage(
               bus_,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 308d736..f2219f6 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -20,10 +20,6 @@ import "catalog/Catalog.proto";
 import "query_execution/QueryContext.proto";
 import "relational_operators/WorkOrder.proto";
 
-// Used for any messages that do not carry payloads.
-message EmptyMessage {
-}
-
 // Note: There are different types of completion messages for normal work orders
 // rebuild work orders. This can be potentially helpful when we want to collect
 // different statistics for executing different types of work orders.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 6ea4a29..5994f22 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -123,10 +123,7 @@ class QueryExecutionUtil {
     style.Broadcast(true);
     Address address;
     address.All(true);
-    std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
-    TaggedMessage poison_tagged_message(poison_message.get(),
-                                        sizeof(*poison_message),
-                                        kPoisonMessage);
+    TaggedMessage poison_tagged_message(kPoisonMessage);
 
     const tmb::MessageBus::SendStatus send_status = bus->Send(
         sender_id, address, style, std::move(poison_tagged_message));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/README.md
----------------------------------------------------------------------
diff --git a/query_execution/README.md b/query_execution/README.md
index 22ad91d..12e0f57 100644
--- a/query_execution/README.md
+++ b/query_execution/README.md
@@ -3,19 +3,19 @@
 ## Types of threads
 There are two kinds of threads in Quickstep - Foreman and Worker. The foreman
 thread controls the query execution progress, finds schedulable work (called as
-WorkOrder) and assigns (or schedules) it for execution to the Worker threads. 
+WorkOrder) and assigns (or schedules) it for execution to the Worker threads.
 The Worker threads receive the WorkOrders and execute them. After execution they
-send a completion message (or response message) back to Foreman. 
+send a completion message (or response message) back to Foreman.
 
 ## High level functionality of Foreman
-Foreman requests all the RelationalOperators in the physical query plan 
+Foreman requests all the RelationalOperators in the physical query plan
 represented as a DAG to give any schedulable work (in the form of WorkOrders).
 While doing so, Foreman has to respect dependencies between operators. There are
-two kinds of dependencies between operators - pipeline breaking (or blocking) 
+two kinds of dependencies between operators - pipeline breaking (or blocking)
 and pipeline non-breaking (or non-blocking). In the first case, the output of
 the producer operator can't be pipelined to the consumer operator. In the second
 case, the Foreman will facilitate the pipelining of the intermediate output
-produced by the producer operator to the consumer operator. 
+produced by the producer operator to the consumer operator.
 
 ## Messages in execution engine
 
@@ -26,110 +26,110 @@ of the message.
 Foreman -> Worker : WorkerMessage which consists of the following things
 - A pointer to the WorkOrder to be executed. The WorkOrder could be a normal
 WorkOrder or a rebuild WorkOrder. A normal WorkOrder involves the invocation of
-WorkOrder::execute() method which is overriden by all of the RelationalOperator 
-classes. A rebuild WorkOrder has one StorageBlock as input and calls a 
+WorkOrder::execute() method which is overriden by all of the RelationalOperator
+classes. A rebuild WorkOrder has one StorageBlock as input and calls a
 rebuild() method on the block. More details about rebuild() can be found in the
-storage module. 
+storage module.
 - The index of the relational operator in the query plan DAG that produced the
-WorkOrder. 
-
-Main thread -> Worker : WorkerMessage of type PoisonMessage. This message is
-used to terminate the Worker thread, typically when shutting down the Quickstep
-process. 
+WorkOrder.
 
 ### ForemanMessage
-Multiple senders are possible for this message. There are multiple types of 
+Multiple senders are possible for this message. There are multiple types of
 ForemanMessages, each of which indicates the purpose of the message.
 
-Worker -> Foreman : ForemanMessage of types WorkOrderCompletion and 
-RebuildCompletion are sent after a Worker finishes executing a respective type 
-of WorkOrder. This message helps the Foreman track the progress of individual 
-operators as well as the whole query. 
+Worker -> Foreman : ForemanMessage of types WorkOrderCompletion and
+RebuildCompletion are sent after a Worker finishes executing a respective type
+of WorkOrder. This message helps the Foreman track the progress of individual
+operators as well as the whole query.
 
 Some relational operators and InsertDestination -> Foreman : ForemanMessage of
-types DataPipeline and WorkOrdersAvailable. InsertDestination first determines 
+types DataPipeline and WorkOrdersAvailable. InsertDestination first determines
 when an output block of a relational operator gets full. Once a block is full,
-it streams the unique block ID of the filled block along with the index of the 
-relational operator that produced the block to Foreman with the message type 
+it streams the unique block ID of the filled block along with the index of the
+relational operator that produced the block to Foreman with the message type
 DataPipeline. Some operators which modify the block in place also send similar
-messages to Foreman. 
+messages to Foreman.
 
 ### FeedbackMessage
 This message is sent from Workers to the Foreman during a WorkOrder execution.
 
 In certain operators, e.g. TextScan (used for bulk loading data from text files)
-and Sort, there is a communication between the relational operator and its 
-WorkOrders. In such cases, when a WorkOrder is under execution on a Worker 
+and Sort, there is a communication between the relational operator and its
+WorkOrders. In such cases, when a WorkOrder is under execution on a Worker
 thread, a FeedbackMessage is sent from the WorkOrder via the Worker to Foreman.
 Foreman relays this message to the relational operator that produced the sender
-WorkOrder. The relational operator uses this message to update its internal 
-state to potentially generate newer WorkOrders. 
+WorkOrder. The relational operator uses this message to update its internal
+state to potentially generate newer WorkOrders.
+
+### PoisonMessage
+This message is used to terminate a thread (i.e., Foreman and Worker), typically
+when shutting down the Quickstep process.
 
 ## How does the Foreman react after receiving various messages?
 ### WorkOrder completion message
 * Update the book-keeping of pending WorkOrders per Worker and per operator.
 * Fetch new WorkOrders if available for the operator of whose WorkOrder was
-just executed.  
+just executed.
 * Update the state of an operator - the possible options are:
   - Normal WorkOrders are still under execution
   - All normal WorkOrders have finished execution and rebuild WorkOrders are yet
-  to be generated. 
+  to be generated.
   - All normal WorkOrders have finished execution, rebuild WorkOrders have been
-  generated and issued to Workers. 
+  generated and issued to Workers.
   - All normal and rebuild WorkOrders have been executed AND all the dependency
-  operators for the given operator have finished execution, therefore the given 
-  operator has finished its execution. 
-* Fetch the WorkOrders from the dependents of the given operator. 
+  operators for the given operator have finished execution, therefore the given
+  operator has finished its execution.
+* Fetch the WorkOrders from the dependents of the given operator.
 
 ### Rebuild WorkOrder completion message
 * Update the book-keeping of pending WorkOrders per Worker and per operator.
 * If all the rebuild WorkOrders have finished their execution, try to fetch the
 WorkOrders of the dependent operators of the operator whose rebuild WorkOrder
-was just executed. 
+was just executed.
 
 ### Data pipeline message
-* Find the consumer operators (i.e. operators which have a non 
-pipeline-breaking link) of the producer operator. 
-* Stream the block ID to the eligible consumer operators. 
-* Fetch new WorkOrders from these consumer operators which may have become 
-available because of the streaming of data. 
+* Find the consumer operators (i.e. operators which have a non
+pipeline-breaking link) of the producer operator.
+* Stream the block ID to the eligible consumer operators.
+* Fetch new WorkOrders from these consumer operators which may have become
+available because of the streaming of data.
 
 ### WorkOrder available message
 * Fetch new WorkOrders that may have become available.
 
 ### Feedback message
-* Relay the feedback message to a specified relational operator. The recipient 
-operator is specified in the header of the message. 
+* Relay the feedback message to a specified relational operator. The recipient
+operator is specified in the header of the message.
 
 ## Example
-We look at a sample query to better describe the flow of messages - 
+We look at a sample query to better describe the flow of messages -
 
 SELECT R.a, S.b from R, S where R.a = S.a and R.c < 20;
 
-This is an equi-join query which can be implemented using a hash join. We assume 
-that S is a larger relation and the build relation is the output of the 
+This is an equi-join query which can be implemented using a hash join. We assume
+that S is a larger relation and the build relation is the output of the
 selection on R.
 
 The query execution plan involves the following operators:
-* SelectOperator to filter R based on predicate R.c < 20 (We call the output as 
-R') 
+* SelectOperator to filter R based on predicate R.c < 20 (We call the output as
+R')
 * BuildHashOperator to construct a hash table on R'
 * HashJoinOperator to probe the hash table, where the probe relation is S
 * DestroyHashTableOperator to destroy the hash table after the join is done
-* Multiple DropTableOperators to destroy the temporaray relations produced as 
-output. 
+* Multiple DropTableOperators to destroy the temporaray relations produced as
+output.
 
 R has two blocks with IDs as 1 and 2. S has two blocks with IDs as 3 and 4.
-We assume that the SelectOperator produces one filled block and one partially 
-filled block as output. Note that in the query plan DAG, the link between 
-SelectOperator and BuildHashOperator allows streaming of data. The 
-HashJoinOperator's WorkOrder can't be generated unless all of the 
+We assume that the SelectOperator produces one filled block and one partially
+filled block as output. Note that in the query plan DAG, the link between
+SelectOperator and BuildHashOperator allows streaming of data. The
+HashJoinOperator's WorkOrder can't be generated unless all of the
 BuildHashOperator's WorkOrders have finished their execution. The execution is
-assumed to be performed by a single Worker thread. 
+assumed to be performed by a single Worker thread.
 
-The following table describes the message exchange that happens during the 
-query excution. We primarily focus on three operators - Select, BuildHash and 
-HashJoin (probe). 
+The following table describes the message exchange that happens during the
+query excution. We primarily focus on three operators - Select, BuildHash and
+HashJoin (probe).
 
 | Sender | Receiver | Message  | Message Description |
 |:-----------------:|----------|---------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/WorkerMessage.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkerMessage.hpp b/query_execution/WorkerMessage.hpp
index 560c1ba..a0434de 100644
--- a/query_execution/WorkerMessage.hpp
+++ b/query_execution/WorkerMessage.hpp
@@ -35,7 +35,6 @@ class WorkerMessage {
   enum class WorkerMessageType {
     kRebuildWorkOrder = 0,
     kWorkOrder,
-    kPoison
   };
 
   /**
@@ -70,15 +69,6 @@ class WorkerMessage {
   }
 
   /**
-   * @brief A static factory method for generating a poison message.
-   *
-   * @return The constructed PoisonMessage.
-   **/
-  static WorkerMessage* PoisonMessage() {
-    return new WorkerMessage(nullptr, 0, WorkerMessageType::kPoison);
-  }
-
-  /**
    * @brief Destructor.
    **/
   ~WorkerMessage() {
@@ -128,8 +118,7 @@ class WorkerMessage {
   /**
    * @brief Constructor.
    *
-   * @param work_unit The work order to be executed by the worker. A NULL
-   *        workorder indicates a poison message.
+   * @param work_unit The work order to be executed by the worker.
    * @param relational_op_index The index of the relational operator in the
    *        query plan DAG that generated the given WorkOrder.
    * @param type Type of the WorkerMessage.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/query_execution/tests/BlockLocator_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp
index fe7b86b..fd25e9e 100644
--- a/query_execution/tests/BlockLocator_unittest.cpp
+++ b/query_execution/tests/BlockLocator_unittest.cpp
@@ -90,16 +90,7 @@ class BlockLocatorTest : public ::testing::Test {
   virtual void TearDown() {
     storage_manager_.reset();
 
-    serialization::EmptyMessage proto;
-
-    const int proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kPoisonMessage);
-    free(proto_bytes);
+    TaggedMessage message(kPoisonMessage);
 
     LOG(INFO) << "Worker (id '" << worker_client_id_
               << "') sent PoisonMessage (typed '" << kPoisonMessage

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccea2ff8/storage/tests/DataExchange_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/DataExchange_unittest.cpp b/storage/tests/DataExchange_unittest.cpp
index 38d12f6..4bad17b 100644
--- a/storage/tests/DataExchange_unittest.cpp
+++ b/storage/tests/DataExchange_unittest.cpp
@@ -105,16 +105,7 @@ class DataExchangeTest : public ::testing::Test {
     data_exchanger_expected_.shutdown();
     storage_manager_expected_.reset();
 
-    serialization::EmptyMessage proto;
-
-    const int proto_length = proto.ByteSize();
-    char *proto_bytes = static_cast<char*>(malloc(proto_length));
-    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
-    TaggedMessage message(static_cast<const void*>(proto_bytes),
-                          proto_length,
-                          kPoisonMessage);
-    free(proto_bytes);
+    TaggedMessage message(kPoisonMessage);
 
     LOG(INFO) << "Worker (id '" << worker_client_id_
               << "') sent PoisonMessage (typed '" << kPoisonMessage


[3/3] incubator-quickstep git commit: Implemented hashjoin optimization class and removed the logic from ExecutionGenerator.

Posted by ji...@apache.org.
Implemented hashjoin optimization class and removed the logic from ExecutionGenerator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a61b99e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a61b99e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a61b99e9

Branch: refs/heads/refactor-hashjoin-probe-build
Commit: a61b99e9e1fcbbe84c60d63b8277cbb67e518030
Parents: ccea2ff
Author: Hakan Memisoglu <ha...@gmail.com>
Authored: Mon Aug 1 16:39:07 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Wed Aug 3 03:10:58 2016 -0500

----------------------------------------------------------------------
 query_optimizer/CMakeLists.txt                  |  1 +
 query_optimizer/ExecutionGenerator.cpp          | 19 +-----
 query_optimizer/PhysicalGenerator.cpp           |  2 +
 query_optimizer/physical/HashJoin.hpp           |  1 +
 query_optimizer/rules/BottomUpRule.hpp          | 10 ++++
 query_optimizer/rules/CMakeLists.txt            | 12 ++++
 query_optimizer/rules/SwapProbeBuild.cpp        | 62 ++++++++++++++++++++
 query_optimizer/rules/SwapProbeBuild.hpp        | 48 +++++++++++++++
 .../tests/physical_generator/Select.test        | 36 ++++++------
 9 files changed, 156 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a61b99e9/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index a56b714..c55881f 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -199,6 +199,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOptimization
+                      quickstep_queryoptimizer_rules_SwapProbeBuild
                       quickstep_queryoptimizer_strategy_Aggregate
                       quickstep_queryoptimizer_strategy_Join
                       quickstep_queryoptimizer_strategy_OneToOne

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a61b99e9/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 88103df..fb24489 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -604,6 +604,8 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   const CatalogRelation *referenced_stored_probe_relation = nullptr;
   const CatalogRelation *referenced_stored_build_relation = nullptr;
 
+  std::size_t build_cardinality = cost_model_->estimateCardinality(build_physical);
+
   bool any_probe_attributes_nullable = false;
   bool any_build_attributes_nullable = false;
 
@@ -671,23 +673,6 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
     key_types.push_back(&left_attribute_type);
   }
 
-  std::size_t probe_cardinality = cost_model_->estimateCardinality(probe_physical);
-  std::size_t build_cardinality = cost_model_->estimateCardinality(build_physical);
-  // For inner join, we may swap the probe table and the build table.
-  if (physical_plan->join_type() == P::HashJoin::JoinType::kInnerJoin)  {
-    // Choose the smaller table as the inner build table,
-    // and the other one as the outer probe table.
-    if (probe_cardinality < build_cardinality) {
-      // Switch the probe and build physical nodes.
-      std::swap(probe_physical, build_physical);
-      std::swap(probe_cardinality, build_cardinality);
-      std::swap(probe_attribute_ids, build_attribute_ids);
-      std::swap(any_probe_attributes_nullable, any_build_attributes_nullable);
-      std::swap(probe_original_attribute_ids, build_original_attribute_ids);
-      std::swap(referenced_stored_probe_relation, referenced_stored_build_relation);
-    }
-  }
-
   // Convert the residual predicate proto.
   QueryContext::predicate_id residual_predicate_index = QueryContext::kInvalidPredicateId;
   if (physical_plan->residual_predicate()) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a61b99e9/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index 75a7bc9..897b212 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -28,6 +28,7 @@
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/rules/PruneColumns.hpp"
 #include "query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp"
+#include "query_optimizer/rules/SwapProbeBuild.hpp"
 #include "query_optimizer/strategy/Aggregate.hpp"
 #include "query_optimizer/strategy/Join.hpp"
 #include "query_optimizer/strategy/OneToOne.hpp"
@@ -98,6 +99,7 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
     rules.emplace_back(new StarSchemaHashJoinOrderOptimization());
   }
   rules.emplace_back(new PruneColumns());
+  rules.emplace_back(new SwapProbeBuild());
 
   for (std::unique_ptr<Rule<P::Physical>> &rule : rules) {
     physical_plan_ = rule->apply(physical_plan_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a61b99e9/query_optimizer/physical/HashJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.hpp b/query_optimizer/physical/HashJoin.hpp
index b904b5f..988b139 100644
--- a/query_optimizer/physical/HashJoin.hpp
+++ b/query_optimizer/physical/HashJoin.hpp
@@ -20,6 +20,7 @@
 #ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_HASHJOIN_HPP_
 #define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_HASHJOIN_HPP_
 
+#include <cstddef>
 #include <memory>
 #include <string>
 #include <type_traits>

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a61b99e9/query_optimizer/rules/BottomUpRule.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/BottomUpRule.hpp b/query_optimizer/rules/BottomUpRule.hpp
index 0acc199..f98dadf 100644
--- a/query_optimizer/rules/BottomUpRule.hpp
+++ b/query_optimizer/rules/BottomUpRule.hpp
@@ -54,6 +54,7 @@ class BottomUpRule : public Rule<TreeType> {
   TreeNodePtr apply(const TreeNodePtr &tree) override {
     DCHECK(tree != nullptr);
 
+    init(tree);
     std::vector<std::shared_ptr<const TreeType>> new_children;
     bool has_changed_children = false;
     for (const std::shared_ptr<const TreeType> &child : tree->children()) {
@@ -80,6 +81,15 @@ class BottomUpRule : public Rule<TreeType> {
    */
   virtual TreeNodePtr applyToNode(const TreeNodePtr &node) = 0;
 
+  /**
+   * @brief Override this method to implement the initialization code
+   *        for the rule.
+   *
+   * @param input The input tree.
+   */
+  virtual void init(const TreeNodePtr &input) {
+  }
+
  private:
   DISALLOW_COPY_AND_ASSIGN(BottomUpRule);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a61b99e9/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 1990174..04a9814 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -29,6 +29,7 @@ add_library(quickstep_queryoptimizer_rules_RuleHelper RuleHelper.cpp RuleHelper.
 add_library(quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOptimization
             StarSchemaHashJoinOrderOptimization.cpp
             StarSchemaHashJoinOrderOptimization.hpp)
+add_library(quickstep_queryoptimizer_rules_SwapProbeBuild SwapProbeBuild.cpp SwapProbeBuild.hpp)
 add_library(quickstep_queryoptimizer_rules_TopDownRule ../../empty_src.cpp TopDownRule.hpp)
 add_library(quickstep_queryoptimizer_rules_UpdateExpression UpdateExpression.cpp UpdateExpression.hpp)
 add_library(quickstep_queryoptimizer_rules_UnnestSubqueries UnnestSubqueries.cpp UnnestSubqueries.hpp)
@@ -127,6 +128,16 @@ target_link_libraries(quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOpti
                       quickstep_queryoptimizer_physical_TopLevelPlan
                       quickstep_queryoptimizer_rules_Rule
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_SwapProbeBuild
+                      quickstep_queryoptimizer_costmodel_SimpleCostModel
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_PatternMatcher
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_TopLevelPlan
+                      quickstep_queryoptimizer_rules_BottomUpRule
+                      quickstep_queryoptimizer_rules_Rule
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_rules_TopDownRule
                       quickstep_queryoptimizer_rules_Rule
                       quickstep_utility_Macros)
@@ -185,6 +196,7 @@ target_link_libraries(quickstep_queryoptimizer_rules
                       quickstep_queryoptimizer_rules_Rule
                       quickstep_queryoptimizer_rules_RuleHelper
                       quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOptimization
+                      quickstep_queryoptimizer_rules_SwapProbeBuild
                       quickstep_queryoptimizer_rules_TopDownRule
                       quickstep_queryoptimizer_rules_UpdateExpression
                       quickstep_queryoptimizer_rules_UnnestSubqueries)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a61b99e9/query_optimizer/rules/SwapProbeBuild.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/SwapProbeBuild.cpp b/query_optimizer/rules/SwapProbeBuild.cpp
new file mode 100644
index 0000000..4b4b38f
--- /dev/null
+++ b/query_optimizer/rules/SwapProbeBuild.cpp
@@ -0,0 +1,62 @@
+#include "query_optimizer/rules/SwapProbeBuild.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+
+
+namespace quickstep {
+namespace optimizer {
+
+P::PhysicalPtr SwapProbeBuild::applyToNode(const P::PhysicalPtr &input) {
+  P::HashJoinPtr hash_join;
+
+  if (P::SomeHashJoin::MatchesWithConditionalCast(input, &hash_join)
+      && hash_join->join_type() == P::HashJoin::JoinType::kInnerJoin) {
+    P::PhysicalPtr left = hash_join->left();
+    P::PhysicalPtr right = hash_join->right();
+
+    std::size_t left_cardinality = cost_model_->estimateCardinality(left);
+    std::size_t right_cardinality = cost_model_->estimateCardinality(right);
+
+    if (right_cardinality > left_cardinality) {
+      std::vector<E::AttributeReferencePtr> left_join_attributes = hash_join->left_join_attributes();
+      std::vector<E::AttributeReferencePtr> right_join_attributes = hash_join->right_join_attributes();
+
+      P::PhysicalPtr output = P::HashJoin::Create(right,
+                                                  left,
+                                                  right_join_attributes,
+                                                  left_join_attributes,
+                                                  hash_join->residual_predicate(),
+                                                  hash_join->project_expressions(),
+                                                  hash_join->join_type());
+      LOG_APPLYING_RULE(input, output);
+      return output;
+    }
+  }
+
+  LOG_IGNORING_RULE(input);
+  return input;
+}
+
+void SwapProbeBuild::init(const P::PhysicalPtr &input) {
+  if (cost_model_ == nullptr) {
+    P::TopLevelPlanPtr top_level;
+    if (P::SomeTopLevelPlan::MatchesWithConditionalCast(input, &top_level)) {
+      cost_model_.reset(new C::SimpleCostModel(top_level->shared_subplans()));
+    } else {
+      std::vector<P::PhysicalPtr> plans = {input};
+      cost_model_.reset(new C::SimpleCostModel(plans));
+    }
+  }
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a61b99e9/query_optimizer/rules/SwapProbeBuild.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/SwapProbeBuild.hpp b/query_optimizer/rules/SwapProbeBuild.hpp
new file mode 100644
index 0000000..da8fdf9
--- /dev/null
+++ b/query_optimizer/rules/SwapProbeBuild.hpp
@@ -0,0 +1,48 @@
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_SWAP_PROBE_BUILD_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_SWAP_PROBE_BUILD_HPP_
+
+#include <memory>
+#include <string>
+
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "query_optimizer/cost_model/SimpleCostModel.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+namespace P = ::quickstep::optimizer::physical;
+namespace E = ::quickstep::optimizer::expressions;
+namespace C = ::quickstep::optimizer::cost;
+
+/**
+ * @brief Rule that applies to a physical plan to arrange probe and
+ *        build side based on the cardinalities.
+ */
+class SwapProbeBuild : public BottomUpRule<P::Physical> {
+ public:
+  SwapProbeBuild() {
+  }
+
+  std::string getName() const override { return "SwapProbeBuild"; }
+
+ protected:
+  P::PhysicalPtr applyToNode(const P::PhysicalPtr &input) override;
+  void init(const P::PhysicalPtr &input) override;
+
+ private:
+  std::unique_ptr<C::SimpleCostModel> cost_model_;
+
+  DISALLOW_COPY_AND_ASSIGN(SwapProbeBuild);
+};
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a61b99e9/query_optimizer/tests/physical_generator/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/physical_generator/Select.test b/query_optimizer/tests/physical_generator/Select.test
index d99916c..76c5683 100644
--- a/query_optimizer/tests/physical_generator/Select.test
+++ b/query_optimizer/tests/physical_generator/Select.test
@@ -2232,10 +2232,7 @@ TopLevelPlan
 [Physical Plan]
 TopLevelPlan
 +-plan=HashJoin
-| +-left=TableReference[relation=b]
-| | +-AttributeReference[id=0,name=w,relation=b,type=Int]
-| | +-AttributeReference[id=1,name=x,relation=b,type=Int]
-| +-right=Aggregate
+| +-left=Aggregate
 | | +-input=TableReference[relation=c]
 | | | +-AttributeReference[id=2,name=x,relation=c,type=Int]
 | | | +-AttributeReference[id=3,name=y,relation=c,type=Int]
@@ -2245,6 +2242,9 @@ TopLevelPlan
 | |   +-Alias[id=4,name=,alias=$aggregate0,relation=$aggregate,type=Long NULL]
 | |     +-AggregateFunction[function=SUM]
 | |       +-AttributeReference[id=3,name=y,relation=c,type=Int]
+| +-right=TableReference[relation=b]
+| | +-AttributeReference[id=0,name=w,relation=b,type=Int]
+| | +-AttributeReference[id=1,name=x,relation=b,type=Int]
 | +-project_expressions=
 | | +-Alias[id=5,name=,alias=(x+SubqueryExpression),relation=,type=Long NULL]
 | |   +-Add
@@ -2252,9 +2252,9 @@ TopLevelPlan
 | |     +-AttributeReference[id=4,name=,alias=$aggregate0,relation=$aggregate,
 | |       type=Long NULL]
 | +-left_join_attributes=
-| | +-AttributeReference[id=0,name=w,relation=b,type=Int]
+| | +-AttributeReference[id=2,name=x,relation=c,type=Int]
 | +-right_join_attributes=
-|   +-AttributeReference[id=2,name=x,relation=c,type=Int]
+|   +-AttributeReference[id=0,name=w,relation=b,type=Int]
 +-output_attributes=
   +-AttributeReference[id=5,name=,alias=(x+SubqueryExpression),relation=,
     type=Long NULL]
@@ -2369,10 +2369,7 @@ TopLevelPlan
 [Physical Plan]
 TopLevelPlan
 +-plan=HashJoin
-| +-left=TableReference[relation=b]
-| | +-AttributeReference[id=0,name=w,relation=b,type=Int]
-| | +-AttributeReference[id=1,name=x,relation=b,type=Int]
-| +-right=Aggregate
+| +-left=Aggregate
 | | +-input=TableReference[relation=c]
 | | | +-AttributeReference[id=2,name=x,relation=c,type=Int]
 | | | +-AttributeReference[id=3,name=y,relation=c,type=Int]
@@ -2382,6 +2379,9 @@ TopLevelPlan
 | |   +-Alias[id=4,name=,alias=$aggregate0,relation=$aggregate,type=Long NULL]
 | |     +-AggregateFunction[function=SUM]
 | |       +-AttributeReference[id=3,name=y,relation=c,type=Int]
+| +-right=TableReference[relation=b]
+| | +-AttributeReference[id=0,name=w,relation=b,type=Int]
+| | +-AttributeReference[id=1,name=x,relation=b,type=Int]
 | +-residual_predicate=Greater
 | | +-AttributeReference[id=1,name=x,relation=b,type=Int]
 | | +-AttributeReference[id=4,name=,alias=$aggregate0,relation=$aggregate,
@@ -2390,9 +2390,9 @@ TopLevelPlan
 | | +-AttributeReference[id=0,name=w,relation=b,type=Int]
 | | +-AttributeReference[id=1,name=x,relation=b,type=Int]
 | +-left_join_attributes=
-| | +-AttributeReference[id=0,name=w,relation=b,type=Int]
+| | +-AttributeReference[id=2,name=x,relation=c,type=Int]
 | +-right_join_attributes=
-|   +-AttributeReference[id=2,name=x,relation=c,type=Int]
+|   +-AttributeReference[id=0,name=w,relation=b,type=Int]
 +-output_attributes=
   +-AttributeReference[id=0,name=w,relation=b,type=Int]
   +-AttributeReference[id=1,name=x,relation=b,type=Int]
@@ -2476,10 +2476,7 @@ TopLevelPlan
 [Physical Plan]
 TopLevelPlan
 +-plan=HashJoin
-| +-left=TableReference[relation=b]
-| | +-AttributeReference[id=0,name=w,relation=b,type=Int]
-| | +-AttributeReference[id=1,name=x,relation=b,type=Int]
-| +-right=NestedLoopsJoin
+| +-left=NestedLoopsJoin
 | | +-left=Aggregate
 | | | +-input=TableReference[relation=c]
 | | | | +-AttributeReference[id=2,name=x,relation=c,type=Int]
@@ -2518,6 +2515,9 @@ TopLevelPlan
 | |       | type=Long NULL]
 | |       +-AttributeReference[id=9,name=,alias=$aggregate0,relation=$aggregate,
 | |         type=Long NULL]
+| +-right=TableReference[relation=b]
+| | +-AttributeReference[id=0,name=w,relation=b,type=Int]
+| | +-AttributeReference[id=1,name=x,relation=b,type=Int]
 | +-project_expressions=
 | | +-Alias[id=11,name=,alias=(x+SubqueryExpression),relation=,type=Long NULL]
 | |   +-Add
@@ -2525,9 +2525,9 @@ TopLevelPlan
 | |     +-AttributeReference[id=10,name=,alias=(SUM(y)+SubqueryExpression),
 | |       relation=,type=Long NULL]
 | +-left_join_attributes=
-| | +-AttributeReference[id=0,name=w,relation=b,type=Int]
+| | +-AttributeReference[id=2,name=x,relation=c,type=Int]
 | +-right_join_attributes=
-|   +-AttributeReference[id=2,name=x,relation=c,type=Int]
+|   +-AttributeReference[id=0,name=w,relation=b,type=Int]
 +-output_attributes=
   +-AttributeReference[id=11,name=,alias=(x+SubqueryExpression),relation=,
     type=Long NULL]


[2/3] incubator-quickstep git commit: TMB: Added Support for an Empty, but Typed TaggedMessage.

Posted by ji...@apache.org.
TMB: Added Support for an Empty, but Typed TaggedMessage.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/52a32a37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/52a32a37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/52a32a37

Branch: refs/heads/refactor-hashjoin-probe-build
Commit: 52a32a372612a3e8f5ab268886d4b9cbb546c205
Parents: 260b862
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Fri Jul 29 19:57:49 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Mon Aug 1 10:11:19 2016 -0700

----------------------------------------------------------------------
 third_party/tmb/include/tmb/tagged_message.h | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/52a32a37/third_party/tmb/include/tmb/tagged_message.h
----------------------------------------------------------------------
diff --git a/third_party/tmb/include/tmb/tagged_message.h b/third_party/tmb/include/tmb/tagged_message.h
index 49dcee7..f3a77ee 100644
--- a/third_party/tmb/include/tmb/tagged_message.h
+++ b/third_party/tmb/include/tmb/tagged_message.h
@@ -63,6 +63,15 @@ class TaggedMessage {
   }
 
   /**
+   * @brief Constructor which creates an empty, but typed message.
+   **/
+  explicit TaggedMessage(const message_type_id message_type)
+      : payload_inline_(true),
+        message_type_(message_type) {
+    payload_.in_line.size = 0;
+  }
+
+  /**
    * @brief Constructor.
    *
    * @param msg A pointer to the message contents in memory, which will be