You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/19 02:18:14 UTC

[1/6] incubator-quickstep git commit: Added ForemanDistributed.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/LIP-for-tpch-merged [created] f4af59652


Added ForemanDistributed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1111ec58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1111ec58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1111ec58

Branch: refs/heads/LIP-for-tpch-merged
Commit: 1111ec585be350236ac7631cd1883b1e74c28af6
Parents: 47d1248
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Sat Aug 13 23:37:59 2016 -0700
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Aug 18 16:10:16 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt         |  24 ++
 query_execution/ForemanDistributed.cpp | 335 ++++++++++++++++++++++++++++
 query_execution/ForemanDistributed.hpp | 130 +++++++++++
 3 files changed, 489 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1111ec58/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index c7b9d61..f0c988e 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -33,6 +33,9 @@ if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
 endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
+if (ENABLE_DISTRIBUTED)
+  add_library(quickstep_queryexecution_ForemanDistributed ForemanDistributed.cpp ForemanDistributed.hpp)
+endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
 if (ENABLE_DISTRIBUTED)
@@ -86,6 +89,26 @@ target_link_libraries(quickstep_queryexecution_ForemanBase
                       quickstep_threading_Thread
                       quickstep_utility_Macros
                       tmb)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryexecution_ForemanDistributed
+                        glog
+                        quickstep_catalog_CatalogDatabase
+                        quickstep_catalog_CatalogRelation
+                        quickstep_catalog_CatalogTypedefs
+                        quickstep_catalog_Catalog_proto
+                        quickstep_queryexecution_AdmitRequestMessage
+                        quickstep_queryexecution_ForemanBase
+                        quickstep_queryexecution_PolicyEnforcerDistributed
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryexecution_ShiftbossDirectory
+                        quickstep_threading_ThreadUtil
+                        quickstep_utility_EqualsAnyConstant
+                        quickstep_utility_Macros
+                        tmb
+                        ${GFLAGS_LIB_NAME})
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_ForemanSingleNode
                       glog
                       quickstep_queryexecution_AdmitRequestMessage
@@ -317,6 +340,7 @@ target_link_libraries(quickstep_queryexecution
 if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution
                         quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_ForemanDistributed
                         quickstep_queryexecution_PolicyEnforcerDistributed
                         quickstep_queryexecution_QueryManagerDistributed
                         quickstep_queryexecution_Shiftboss

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1111ec58/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
new file mode 100644
index 0000000..29f5b9b
--- /dev/null
+++ b/query_execution/ForemanDistributed.cpp
@@ -0,0 +1,335 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/ForemanDistributed.hpp"
+
+#include <cstddef>
+#include <cstdio>
+#include <cstdlib>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/AdmitRequestMessage.hpp"
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "threading/ThreadUtil.hpp"
+#include "utility/EqualsAnyConstant.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/address.h"
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/message_style.h"
+#include "tmb/tagged_message.h"
+
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::AnnotatedMessage;
+using tmb::MessageBus;
+using tmb::TaggedMessage;
+using tmb::client_id;
+
+namespace quickstep {
+
+namespace S = serialization;
+
+class QueryHandle;
+
+ForemanDistributed::ForemanDistributed(
+    MessageBus *bus,
+    CatalogDatabaseLite *catalog_database,
+    const int cpu_id,
+    const bool profile_individual_workorders)
+    : ForemanBase(bus, cpu_id),
+      catalog_database_(DCHECK_NOTNULL(catalog_database)) {
+  const std::vector<QueryExecutionMessageType> sender_message_types{
+      kShiftbossRegistrationResponseMessage,
+      kQueryInitiateMessage,
+      kWorkOrderMessage,
+      kInitiateRebuildMessage,
+      kQueryTeardownMessage,
+      kSaveQueryResultMessage,
+      kQueryExecutionSuccessMessage,
+      kPoisonMessage};
+
+  for (const auto message_type : sender_message_types) {
+    bus_->RegisterClientAsSender(foreman_client_id_, message_type);
+  }
+
+  const std::vector<QueryExecutionMessageType> receiver_message_types{
+      kShiftbossRegistrationMessage,
+      kAdmitRequestMessage,
+      kQueryInitiateResponseMessage,
+      kCatalogRelationNewBlockMessage,
+      kDataPipelineMessage,
+      kInitiateRebuildResponseMessage,
+      kWorkOrderCompleteMessage,
+      kRebuildWorkOrderCompleteMessage,
+      kWorkOrderFeedbackMessage,
+      kSaveQueryResultResponseMessage,
+      kPoisonMessage};
+
+  for (const auto message_type : receiver_message_types) {
+    bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
+  }
+
+  policy_enforcer_.reset(new PolicyEnforcerDistributed(
+      foreman_client_id_,
+      catalog_database_,
+      &shiftboss_directory_,
+      bus_,
+      profile_individual_workorders));
+}
+
+void ForemanDistributed::run() {
+  if (cpu_id_ >= 0) {
+    // We can pin the foreman thread to a CPU if specified.
+    ThreadUtil::BindToCPU(cpu_id_);
+  }
+
+  // Ensure that at least one Shiftboss to register.
+  if (shiftboss_directory_.empty()) {
+    const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type());
+    DLOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type()
+               << "' message from client " << annotated_message.sender;
+
+    S::ShiftbossRegistrationMessage proto;
+    CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+    processShiftbossRegistrationMessage(annotated_message.sender, proto.work_order_capacity());
+    DCHECK_EQ(1u, shiftboss_directory_.size());
+  }
+
+  // Event loop
+  for (;;) {
+    // Receive() causes this thread to sleep until next message is received.
+    const AnnotatedMessage annotated_message =
+        bus_->Receive(foreman_client_id_, 0, true);
+    const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    const tmb::message_type_id message_type = tagged_message.message_type();
+    DLOG(INFO) << "ForemanDistributed received typed '" << message_type
+               << "' message from client " << annotated_message.sender;
+    switch (message_type) {
+      case kShiftbossRegistrationMessage: {
+        S::ShiftbossRegistrationMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        processShiftbossRegistrationMessage(annotated_message.sender, proto.work_order_capacity());
+        break;
+      }
+      case kAdmitRequestMessage: {
+        const AdmitRequestMessage *request_message =
+            static_cast<const AdmitRequestMessage*>(tagged_message.message());
+
+        const vector<QueryHandle *> &query_handles = request_message->getQueryHandles();
+        DCHECK(!query_handles.empty());
+
+        bool all_queries_admitted = true;
+        if (query_handles.size() == 1u) {
+          all_queries_admitted =
+              policy_enforcer_->admitQuery(query_handles.front());
+        } else {
+          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
+        }
+        if (!all_queries_admitted) {
+          LOG(WARNING) << "The scheduler could not admit all the queries";
+          // TODO(harshad) - Inform the main thread about the failure.
+        }
+        break;
+      }
+      case kQueryInitiateResponseMessage: {
+        // TODO(zuyu): check the query id.
+        break;
+      }
+      case kCatalogRelationNewBlockMessage:  // Fall through
+      case kDataPipelineMessage:
+      case kRebuildWorkOrderCompleteMessage:
+      case kWorkOrderCompleteMessage:
+      case kWorkOrderFeedbackMessage: {
+        policy_enforcer_->processMessage(tagged_message);
+        break;
+      }
+      case kInitiateRebuildResponseMessage: {
+        // A unique case in the distributed version.
+        policy_enforcer_->processInitiateRebuildResponseMessage(tagged_message);
+        break;
+      }
+      case kSaveQueryResultResponseMessage: {
+        S::SaveQueryResultResponseMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
+        break;
+      }
+      case kPoisonMessage: {
+        if (policy_enforcer_->hasQueries()) {
+          LOG(WARNING) << "ForemanDistributed thread exiting while some queries are "
+                          "under execution or waiting to be admitted";
+        }
+
+        // Shutdown all Shiftbosses.
+        tmb::Address shiftboss_addresses;
+        for (std::size_t i = 0; i < shiftboss_directory_.size(); ++i) {
+          shiftboss_addresses.AddRecipient(shiftboss_directory_.getClientId(i));
+        }
+
+        tmb::MessageStyle broadcast_style;
+        broadcast_style.Broadcast(true);
+
+        TaggedMessage poison_message(kPoisonMessage);
+
+        const MessageBus::SendStatus send_status =
+            bus_->Send(foreman_client_id_,
+                       shiftboss_addresses,
+                       broadcast_style,
+                       move(poison_message));
+        DCHECK(send_status == MessageBus::SendStatus::kOK);
+        return;
+      }
+      default:
+        LOG(FATAL) << "Unknown message type to ForemanDistributed";
+    }
+
+    if (canCollectNewMessages(message_type)) {
+      vector<unique_ptr<S::WorkOrderMessage>> new_messages;
+      policy_enforcer_->getWorkOrderProtoMessages(&new_messages);
+      dispatchWorkOrderMessages(new_messages);
+    }
+  }
+}
+
+bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) {
+  return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
+                                        kCatalogRelationNewBlockMessage,
+                                        kWorkOrderFeedbackMessage) &&
+         // TODO(zuyu): Multiple Shiftbosses support.
+         !shiftboss_directory_.hasReachedCapacity(0);
+}
+
+void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
+  for (const auto &message : messages) {
+    DCHECK(message != nullptr);
+    // TODO(zuyu): Multiple Shiftbosses support.
+    sendWorkOrderMessage(0, *message);
+    shiftboss_directory_.incrementNumQueuedWorkOrders(0);
+  }
+}
+
+void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index,
+                                              const S::WorkOrderMessage &proto) {
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kWorkOrderMessage);
+  free(proto_bytes);
+
+  const client_id shiftboss_client_id = shiftboss_directory_.getClientId(shiftboss_index);
+  DLOG(INFO) << "ForemanDistributed sent WorkOrderMessage (typed '" << kWorkOrderMessage
+             << "') to Shiftboss with TMB client ID " << shiftboss_client_id;
+  const MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         foreman_client_id_,
+                                         shiftboss_client_id,
+                                         move(message));
+  CHECK(send_status == MessageBus::SendStatus::kOK);
+}
+
+void ForemanDistributed::printWorkOrderProfilingResults(const std::size_t query_id,
+                                                        std::FILE *out) const {
+  const std::vector<WorkOrderTimeEntry> &recorded_times =
+      policy_enforcer_->getProfilingResults(query_id);
+  fputs("Query ID,Worker ID,Operator ID,Time (microseconds)\n", out);
+  for (const auto &workorder_entry : recorded_times) {
+    const std::size_t worker_id = workorder_entry.worker_id;
+    fprintf(out,
+            "%lu,%lu,%lu,%lu\n",
+            query_id,
+            worker_id,
+            workorder_entry.operator_id,  // Operator ID.
+            workorder_entry.end_time - workorder_entry.start_time);  // Time.
+  }
+}
+
+void ForemanDistributed::processShiftbossRegistrationMessage(const client_id shiftboss_client_id,
+                                                              const std::size_t work_order_capacity) {
+  S::ShiftbossRegistrationResponseMessage proto;
+  proto.set_shiftboss_index(shiftboss_directory_.size());
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kShiftbossRegistrationResponseMessage);
+  free(proto_bytes);
+
+  shiftboss_directory_.addShiftboss(shiftboss_client_id, work_order_capacity);
+
+  DLOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage (typed '"
+             << kShiftbossRegistrationResponseMessage
+             << "') to Shiftboss with TMB client id " << shiftboss_client_id;
+  const MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         foreman_client_id_,
+                                         shiftboss_client_id,
+                                         move(message));
+  CHECK(send_status == MessageBus::SendStatus::kOK);
+}
+
+void ForemanDistributed::processSaveQueryResultResponseMessage(const client_id cli_id,
+                                                               const relation_id result_relation_id) {
+  S::QueryExecutionSuccessMessage proto;
+  proto.mutable_result_relation()->MergeFrom(
+      static_cast<CatalogDatabase*>(catalog_database_)->getRelationById(result_relation_id)->getProto());
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kQueryExecutionSuccessMessage);
+  free(proto_bytes);
+
+  // Notify the CLI regarding the query result.
+  DLOG(INFO) << "ForemanDistributed sent QueryExecutionSuccessMessage (typed '"
+             << kQueryExecutionSuccessMessage
+             << "') to CLI with TMB client id " << cli_id;
+  const MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         foreman_client_id_,
+                                         cli_id,
+                                         move(message));
+  CHECK(send_status == MessageBus::SendStatus::kOK);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1111ec58/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
new file mode 100644
index 0000000..f9a326a
--- /dev/null
+++ b/query_execution/ForemanDistributed.hpp
@@ -0,0 +1,130 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
+
+#include <cstddef>
+#include <cstdio>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/ForemanBase.hpp"
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+
+namespace serialization { class WorkOrderMessage; }
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief The Foreman receives queries from the main thread, messages from the
+ *        policy enforcer and dispatches the work to Shiftbosses. It also
+ *        receives work completion messages from Shiftbosses.
+ **/
+class ForemanDistributed final : public ForemanBase {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param bus A pointer to the TMB.
+   * @param catalog_database The catalog database where this query is executed.
+   * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
+   * @param profile_individual_workorders Whether every workorder's execution
+   *        be profiled or not.
+   *
+   * @note If cpu_id is not specified, Foreman thread can be possibly moved
+   *       around on different CPUs by the OS.
+  **/
+  ForemanDistributed(tmb::MessageBus *bus,
+                     CatalogDatabaseLite *catalog_database,
+                     const int cpu_id = -1,
+                     const bool profile_individual_workorders = false);
+
+  ~ForemanDistributed() override {}
+
+  /**
+   * @brief Print the results of profiling individual work orders for a given
+   *        query.
+   *
+   * TODO(harshad) - Add the name of the operator to the output.
+   *
+   * @param query_id The ID of the query for which the results are to be printed.
+   * @param out The file stream.
+   **/
+  void printWorkOrderProfilingResults(const std::size_t query_id,
+                                      std::FILE *out) const;
+
+ protected:
+  void run() override;
+
+ private:
+  /**
+   * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
+   *        worker threads.
+   *
+   * @param messages The messages to be dispatched.
+   **/
+  void dispatchWorkOrderMessages(
+      const std::vector<std::unique_ptr<serialization::WorkOrderMessage>> &messages);
+
+  /**
+   * @brief Send the given message to the specified worker.
+   *
+   * @param worker_index The logical index of the recipient worker in
+   *        ShiftbossDirectory.
+   * @param proto The WorkOrderMessage to be sent.
+   **/
+  void sendWorkOrderMessage(const std::size_t worker_index,
+                            const serialization::WorkOrderMessage &proto);
+
+  void processShiftbossRegistrationMessage(const tmb::client_id shiftboss_client_id,
+                                            const std::size_t work_order_capacity);
+
+  void processSaveQueryResultResponseMessage(const tmb::client_id cli_id,
+                                             const relation_id result_relation_id);
+
+  /**
+   * @brief Check if we can collect new messages from the PolicyEnforcer.
+   *
+   * @param message_type The type of the last received message.
+   **/
+  bool canCollectNewMessages(const tmb::message_type_id message_type);
+
+  ShiftbossDirectory shiftboss_directory_;
+
+  CatalogDatabaseLite *catalog_database_;
+
+  std::unique_ptr<PolicyEnforcerDistributed> policy_enforcer_;
+
+  DISALLOW_COPY_AND_ASSIGN(ForemanDistributed);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_


[4/6] incubator-quickstep git commit: Initial commit.

Posted by ji...@apache.org.
Initial commit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9ba18c07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9ba18c07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9ba18c07

Branch: refs/heads/LIP-for-tpch-merged
Commit: 9ba18c07b3a2ef9f23054f358a94dabdfb772414
Parents: 1111ec5
Author: Hakan Memisoglu <ha...@gmail.com>
Authored: Tue Aug 16 16:40:27 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Aug 18 16:17:51 2016 -0500

----------------------------------------------------------------------
 query_optimizer/CMakeLists.txt               |  1 +
 query_optimizer/ExecutionGenerator.cpp       |  9 +++++
 query_optimizer/PhysicalGenerator.cpp        |  3 ++
 query_optimizer/physical/HashJoin.cpp        |  5 +++
 query_optimizer/physical/HashJoin.hpp        | 13 ++++++-
 query_optimizer/rules/AttachBloomFilters.cpp |  1 +
 query_optimizer/rules/CMakeLists.txt         | 11 ++++++
 query_optimizer/rules/FuseJoinSelect.cpp     | 43 +++++++++++++++++++++++
 query_optimizer/rules/FuseJoinSelect.hpp     | 33 +++++++++++++++++
 relational_operators/HashJoinOperator.cpp    | 13 ++++++-
 relational_operators/HashJoinOperator.hpp    | 12 +++++++
 relational_operators/WorkOrder.proto         |  1 +
 12 files changed, 143 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 7440151..849caaa 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -201,6 +201,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_rules_AttachBloomFilters
+                      quickstep_queryoptimizer_rules_FuseJoinSelect
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOptimization
                       quickstep_queryoptimizer_rules_SwapProbeBuild

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index f8559ec..457366e 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -692,6 +692,14 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
     query_context_proto_->add_predicates()->CopyFrom(residual_predicate->getProto());
   }
 
+  // Convert the left filter predicate proto.
+  QueryContext::predicate_id left_filter_predicate_index = QueryContext::kInvalidPredicateId;
+  if (physical_plan->residual_predicate()) {
+    left_filter_predicate_index = query_context_proto_->predicates_size();
+    unique_ptr<const Predicate> left_filter_predicate(convertPredicate(physical_plan->left_filter_predicate()));
+    query_context_proto_->add_predicates()->CopyFrom(left_filter_predicate->getProto());
+  }
+
   // Convert the project expressions proto.
   const QueryContext::scalar_group_id project_expressions_group_index =
       query_context_proto_->scalar_groups_size();
@@ -796,6 +804,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
               insert_destination_index,
               join_hash_table_index,
               residual_predicate_index,
+              left_filter_predicate_index,
               project_expressions_group_index,
               is_selection_on_build.get(),
               join_type));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index 9ee685d..e093272 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -27,6 +27,7 @@
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/rules/AttachBloomFilters.hpp"
+#include "query_optimizer/rules/FuseJoinSelect.hpp"
 #include "query_optimizer/rules/PruneColumns.hpp"
 #include "query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp"
 #include "query_optimizer/rules/SwapProbeBuild.hpp"
@@ -102,6 +103,8 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
   }
   rules.emplace_back(new PruneColumns());
   // rules.emplace_back(new SwapProbeBuild());
+  rules.emplace_back(new FuseJoinSelect());
+  rules.emplace_back(new PruneColumns());
   rules.emplace_back(new AttachBloomFilters());
 
   for (std::unique_ptr<Rule<P::Physical>> &rule : rules) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/physical/HashJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.cpp b/query_optimizer/physical/HashJoin.cpp
index 883c87a..dc564a7 100644
--- a/query_optimizer/physical/HashJoin.cpp
+++ b/query_optimizer/physical/HashJoin.cpp
@@ -111,6 +111,11 @@ void HashJoin::getFieldStringItems(
     non_container_child_field_names->push_back("residual_predicate");
     non_container_child_fields->push_back(residual_predicate_);
   }
+  if (left_filter_predicate_ != nullptr) {
+    non_container_child_field_names->push_back("left_filter_predicate");
+    non_container_child_fields->push_back(left_filter_predicate_);
+  }
+
   container_child_field_names->push_back("left_join_attributes");
   container_child_fields->push_back(CastSharedPtrVector<OptimizerTreeBase>(left_join_attributes_));
   container_child_field_names->push_back("right_join_attributes");

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/physical/HashJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.hpp b/query_optimizer/physical/HashJoin.hpp
index a830d0b..e24dbeb 100644
--- a/query_optimizer/physical/HashJoin.hpp
+++ b/query_optimizer/physical/HashJoin.hpp
@@ -107,6 +107,10 @@ class HashJoin : public BinaryJoin {
     return join_type_;
   }
 
+  const expressions::PredicatePtr& left_filter_predicate() const {
+    return left_filter_predicate_;
+  }
+
   PhysicalPtr copyWithNewChildren(
       const std::vector<PhysicalPtr> &new_children) const override {
     DCHECK_EQ(children().size(), new_children.size());
@@ -117,7 +121,8 @@ class HashJoin : public BinaryJoin {
                   residual_predicate_,
                   project_expressions(),
                   join_type_,
-                  bloom_filter_config_);
+                  bloom_filter_config_,
+                  left_filter_predicate_);
   }
 
   std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
@@ -144,6 +149,7 @@ class HashJoin : public BinaryJoin {
    * @param residual_predicate Optional filtering predicate evaluated after join.
    * @param project_expressions The project expressions.
    * @param Join type of this hash join.
+   * @param left_filter_predicate Optional filtering predicate for probe side before join.
    * @return An immutable physical HashJoin.
    */
   static HashJoinPtr Create(
@@ -154,6 +160,7 @@ class HashJoin : public BinaryJoin {
       const expressions::PredicatePtr &residual_predicate,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions,
       const JoinType join_type,
+      const expressions::PredicatePtr &left_filter_predicate = nullptr,
       const BloomFilterConfig bloom_filter_config = BloomFilterConfig()) {
     return HashJoinPtr(
         new HashJoin(left,
@@ -163,6 +170,7 @@ class HashJoin : public BinaryJoin {
                      residual_predicate,
                      project_expressions,
                      join_type,
+                     left_filter_predicate,
                      bloom_filter_config));
   }
 
@@ -184,12 +192,14 @@ class HashJoin : public BinaryJoin {
       const expressions::PredicatePtr &residual_predicate,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions,
       const JoinType join_type,
+      const expressions::PredicatePtr &left_filter_predicate,
       const BloomFilterConfig &bloom_filter_config)
       : BinaryJoin(left, right, project_expressions),
         left_join_attributes_(left_join_attributes),
         right_join_attributes_(right_join_attributes),
         residual_predicate_(residual_predicate),
         join_type_(join_type),
+        left_filter_predicate_(left_filter_predicate),
         bloom_filter_config_(bloom_filter_config) {
   }
 
@@ -197,6 +207,7 @@ class HashJoin : public BinaryJoin {
   std::vector<expressions::AttributeReferencePtr> right_join_attributes_;
   expressions::PredicatePtr residual_predicate_;
   JoinType join_type_;
+  expressions::PredicatePtr left_filter_predicate_;
   BloomFilterConfig bloom_filter_config_;
 
   DISALLOW_COPY_AND_ASSIGN(HashJoin);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/rules/AttachBloomFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachBloomFilters.cpp b/query_optimizer/rules/AttachBloomFilters.cpp
index 898d831..f86ba60 100644
--- a/query_optimizer/rules/AttachBloomFilters.cpp
+++ b/query_optimizer/rules/AttachBloomFilters.cpp
@@ -338,6 +338,7 @@ P::PhysicalPtr AttachBloomFilters::performAttach(const physical::PhysicalPtr &no
           hash_join->residual_predicate(),
           hash_join->project_expressions(),
           hash_join->join_type(),
+          hash_join->left_filter_predicate(),
           attach_it->second);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 54b1e59..9990a4d 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -21,6 +21,7 @@ add_subdirectory(tests)
 add_library(quickstep_queryoptimizer_rules_AttachBloomFilters AttachBloomFilters.cpp AttachBloomFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp)
 add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp)
+add_library(quickstep_queryoptimizer_rules_FuseJoinSelect FuseJoinSelect.cpp FuseJoinSelect.hpp)
 add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
 add_library(quickstep_queryoptimizer_rules_PruneColumns PruneColumns.cpp PruneColumns.hpp)
 add_library(quickstep_queryoptimizer_rules_PushDownFilter PushDownFilter.cpp PushDownFilter.hpp)
@@ -65,6 +66,15 @@ target_link_libraries(quickstep_queryoptimizer_rules_CollapseProject
                       quickstep_queryoptimizer_rules_Rule
                       quickstep_queryoptimizer_rules_RuleHelper
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_FuseJoinSelect
+                      quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_PatternMatcher
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_Selection
+                      quickstep_queryoptimizer_physical_TableReference
+                      quickstep_queryoptimizer_rules_BottomUpRule
+                      quickstep_queryoptimizer_rules_Rule)
 target_link_libraries(quickstep_queryoptimizer_rules_GenerateJoins
                       glog
                       quickstep_queryoptimizer_expressions_AttributeReference
@@ -206,6 +216,7 @@ target_link_libraries(quickstep_queryoptimizer_rules
                       quickstep_queryoptimizer_rules_AttachBloomFilters
                       quickstep_queryoptimizer_rules_BottomUpRule
                       quickstep_queryoptimizer_rules_CollapseProject
+                      quickstep_queryoptimizer_rules_FuseJoinSelect
                       quickstep_queryoptimizer_rules_GenerateJoins
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_PushDownFilter

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/rules/FuseJoinSelect.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseJoinSelect.cpp b/query_optimizer/rules/FuseJoinSelect.cpp
new file mode 100644
index 0000000..6a8885f
--- /dev/null
+++ b/query_optimizer/rules/FuseJoinSelect.cpp
@@ -0,0 +1,43 @@
+#include "query_optimizer/rules/FuseJoinSelect.hpp"
+
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TableReference.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace P = ::quickstep::optimizer::physical;
+namespace E = ::quickstep::optimizer::expressions;
+
+P::PhysicalPtr FuseJoinSelect::applyToNode(const P::PhysicalPtr &input) {
+  P::HashJoinPtr hash_join;
+  P::SelectionPtr selection;
+  P::TableReferencePtr table_reference;
+
+  if (P::SomeHashJoin::MatchesWithConditionalCast(input, &hash_join)
+      && hash_join->join_type() == P::HashJoin::JoinType::kInnerJoin
+      && P::SomeSelection::MatchesWithConditionalCast(hash_join->left(), &selection)
+      && P::SomeTableReference::MatchesWithConditionalCast(selection->input(), &table_reference)) {
+    const E::PredicatePtr filter_predicate = selection->filter_predicate();
+    P::PhysicalPtr output = P::HashJoin::Create(hash_join->left(),
+                                                table_reference,
+                                                hash_join->left_join_attributes(),
+                                                hash_join->right_join_attributes(),
+                                                hash_join->residual_predicate(),
+                                                hash_join->project_expressions(),
+                                                hash_join->join_type(),
+                                                filter_predicate);
+    LOG_APPLYING_RULE(input, output);
+    return output;
+  }
+
+  LOG_IGNORING_RULE(input);
+  return input;
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/query_optimizer/rules/FuseJoinSelect.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseJoinSelect.hpp b/query_optimizer/rules/FuseJoinSelect.hpp
new file mode 100644
index 0000000..24ac08b
--- /dev/null
+++ b/query_optimizer/rules/FuseJoinSelect.hpp
@@ -0,0 +1,33 @@
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_JOIN_SELECT_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_JOIN_SELECT_HPP_
+
+#include <string>
+
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace P = ::quickstep::optimizer::physical;
+
+class FuseJoinSelect : public BottomUpRule<P::Physical> {
+ public:
+  FuseJoinSelect() {
+  }
+
+  std::string getName() const override { return "FuseJoinSelect"; }
+
+ protected:
+  P::PhysicalPtr applyToNode(const P::PhysicalPtr &input) override;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(FuseJoinSelect);
+};
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 9b573ac..7357acd 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -192,6 +192,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
 
     const Predicate *residual_predicate =
         query_context->getPredicate(residual_predicate_index_);
+    const Predicate *left_filter_predicate =
+        query_context->getPredicate(left_filter_predicate_index_);
     const vector<unique_ptr<const Scalar>> &selection =
         query_context->getScalarGroup(selection_index_);
     InsertDestination *output_destination =
@@ -210,6 +212,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
                                      any_join_key_attributes_nullable_,
                                      probe_block_id,
                                      residual_predicate,
+                                     left_filter_predicate,
                                      selection,
                                      hash_table,
                                      output_destination,
@@ -230,6 +233,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
                 any_join_key_attributes_nullable_,
                 probe_relation_block_ids_[num_workorders_generated_],
                 residual_predicate,
+                left_filter_predicate,
                 selection,
                 hash_table,
                 output_destination,
@@ -370,6 +374,7 @@ serialization::WorkOrder* HashJoinOperator::createNonOuterJoinWorkOrderProto(
   proto->SetExtension(serialization::HashJoinWorkOrder::selection_index, selection_index_);
   proto->SetExtension(serialization::HashJoinWorkOrder::block_id, block);
   proto->SetExtension(serialization::HashJoinWorkOrder::residual_predicate_index, residual_predicate_index_);
+  proto->SetExtension(serialization::HashJoinWorkOrder::left_filter_predicate_index, left_filter_predicate_index_);
 
   return proto;
 }
@@ -432,7 +437,13 @@ void HashInnerJoinWorkOrder::execute() {
       storage_manager_->getBlock(block_id_, probe_relation_));
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
 
-  std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+  std::unique_ptr<ValueAccessor> probe_accessor(
+      probe_store.createValueAccessor(
+          left_filter_predicate_ == nullptr
+          ? nullptr
+          : probe_block->getMatchesForPredicate(left_filter_predicate_)));
+
+
   MapBasedJoinedTupleCollector collector;
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessor(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 30571a1..05e16a4 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -128,6 +128,7 @@ class HashJoinOperator : public RelationalOperator {
       const QueryContext::insert_destination_id output_destination_index,
       const QueryContext::join_hash_table_id hash_table_index,
       const QueryContext::predicate_id residual_predicate_index,
+      const QueryContext::predicate_id left_filter_predicate_index,
       const QueryContext::scalar_group_id selection_index,
       const std::vector<bool> *is_selection_on_build = nullptr,
       const JoinType join_type = JoinType::kInnerJoin)
@@ -141,6 +142,7 @@ class HashJoinOperator : public RelationalOperator {
         output_destination_index_(output_destination_index),
         hash_table_index_(hash_table_index),
         residual_predicate_index_(residual_predicate_index),
+        left_filter_predicate_index_(left_filter_predicate_index),
         selection_index_(selection_index),
         is_selection_on_build_(is_selection_on_build == nullptr
                                    ? std::vector<bool>()
@@ -256,6 +258,7 @@ class HashJoinOperator : public RelationalOperator {
   const QueryContext::insert_destination_id output_destination_index_;
   const QueryContext::join_hash_table_id hash_table_index_;
   const QueryContext::predicate_id residual_predicate_index_;
+  const QueryContext::predicate_id left_filter_predicate_index_;
   const QueryContext::scalar_group_id selection_index_;
   const std::vector<bool> is_selection_on_build_;
   const JoinType join_type_;
@@ -304,6 +307,7 @@ class HashInnerJoinWorkOrder : public WorkOrder {
       const bool any_join_key_attributes_nullable,
       const block_id lookup_block_id,
       const Predicate *residual_predicate,
+      const Predicate *left_filter_predicate,
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
@@ -316,6 +320,7 @@ class HashInnerJoinWorkOrder : public WorkOrder {
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
         block_id_(lookup_block_id),
         residual_predicate_(residual_predicate),
+        left_filter_predicate_(left_filter_predicate),
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
@@ -388,6 +393,7 @@ class HashInnerJoinWorkOrder : public WorkOrder {
   const bool any_join_key_attributes_nullable_;
   const block_id block_id_;
   const Predicate *residual_predicate_;
+  const Predicate *left_filter_predicate_;
   const std::vector<std::unique_ptr<const Scalar>> &selection_;
   const JoinHashTable &hash_table_;
 
@@ -434,6 +440,7 @@ class HashSemiJoinWorkOrder : public WorkOrder {
       const bool any_join_key_attributes_nullable,
       const block_id lookup_block_id,
       const Predicate *residual_predicate,
+      const Predicate *left_filter_predicate,
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
@@ -446,6 +453,7 @@ class HashSemiJoinWorkOrder : public WorkOrder {
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
         block_id_(lookup_block_id),
         residual_predicate_(residual_predicate),
+        left_filter_predicate_(left_filter_predicate),
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
@@ -514,6 +522,7 @@ class HashSemiJoinWorkOrder : public WorkOrder {
   const bool any_join_key_attributes_nullable_;
   const block_id block_id_;
   const Predicate *residual_predicate_;
+  const Predicate *left_filter_predicate_;
   const std::vector<std::unique_ptr<const Scalar>> &selection_;
   const JoinHashTable &hash_table_;
 
@@ -560,6 +569,7 @@ class HashAntiJoinWorkOrder : public WorkOrder {
       const bool any_join_key_attributes_nullable,
       const block_id lookup_block_id,
       const Predicate *residual_predicate,
+      const Predicate *left_filter_predicate,
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
@@ -572,6 +582,7 @@ class HashAntiJoinWorkOrder : public WorkOrder {
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
         block_id_(lookup_block_id),
         residual_predicate_(residual_predicate),
+        left_filter_predicate_(left_filter_predicate),
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
@@ -645,6 +656,7 @@ class HashAntiJoinWorkOrder : public WorkOrder {
   const bool any_join_key_attributes_nullable_;
   const block_id block_id_;
   const Predicate *residual_predicate_;
+  const Predicate *left_filter_predicate_;
   const std::vector<std::unique_ptr<const Scalar>> &selection_;
   const JoinHashTable &hash_table_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ba18c07/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 02aa50e..4874450 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -128,6 +128,7 @@ message HashJoinWorkOrder {
 
     // Used by all but HashOuterJoinWorkOrder.
     optional int32 residual_predicate_index = 169;
+    optional int32 left_filter_predicate_index = 400;
     // Used by HashOuterJoinWorkOrder only.
     repeated bool is_selection_on_build = 170;
   }


[2/6] incubator-quickstep git commit: Fixed a bug in deserializing WindowAggrWorkOrder.

Posted by ji...@apache.org.
Fixed a bug in deserializing WindowAggrWorkOrder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9838fcd1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9838fcd1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9838fcd1

Branch: refs/heads/LIP-for-tpch-merged
Commit: 9838fcd16d7e70edba4dfca3ec979de1231e37e7
Parents: e7524cb
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Fri Aug 12 08:59:12 2016 -0700
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Aug 18 16:10:16 2016 -0500

----------------------------------------------------------------------
 relational_operators/WorkOrderFactory.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9838fcd1/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 7d7af59..6970486 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -434,7 +434,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index)),
           move(blocks),
           query_context->getInsertDestination(
-              proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index)));
+              proto.GetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index)));
     }
     default:
       LOG(FATAL) << "Unknown WorkOrder Type in WorkOrderFactory::ReconstructFromProto";


[5/6] incubator-quickstep git commit: Fixed the bug.

Posted by ji...@apache.org.
Fixed the bug.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2ea7a846
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2ea7a846
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2ea7a846

Branch: refs/heads/LIP-for-tpch-merged
Commit: 2ea7a84630a184a40b54b88917f8c750307989f6
Parents: 9ba18c0
Author: Hakan Memisoglu <ha...@gmail.com>
Authored: Wed Aug 17 14:39:40 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Aug 18 16:17:51 2016 -0500

----------------------------------------------------------------------
 query_optimizer/rules/FuseJoinSelect.cpp | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ea7a846/query_optimizer/rules/FuseJoinSelect.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseJoinSelect.cpp b/query_optimizer/rules/FuseJoinSelect.cpp
index 6a8885f..e40acfc 100644
--- a/query_optimizer/rules/FuseJoinSelect.cpp
+++ b/query_optimizer/rules/FuseJoinSelect.cpp
@@ -23,8 +23,8 @@ P::PhysicalPtr FuseJoinSelect::applyToNode(const P::PhysicalPtr &input) {
       && P::SomeSelection::MatchesWithConditionalCast(hash_join->left(), &selection)
       && P::SomeTableReference::MatchesWithConditionalCast(selection->input(), &table_reference)) {
     const E::PredicatePtr filter_predicate = selection->filter_predicate();
-    P::PhysicalPtr output = P::HashJoin::Create(hash_join->left(),
-                                                table_reference,
+    P::PhysicalPtr output = P::HashJoin::Create(table_reference,
+                                                hash_join->right(),
                                                 hash_join->left_join_attributes(),
                                                 hash_join->right_join_attributes(),
                                                 hash_join->residual_predicate(),


[6/6] incubator-quickstep git commit: Merged with fuse-select-join

Posted by ji...@apache.org.
Merged with fuse-select-join


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/f4af5965
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/f4af5965
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/f4af5965

Branch: refs/heads/LIP-for-tpch-merged
Commit: f4af596523b17d1d5523e882f1b73e3d6c35d1b3
Parents: 2ea7a84
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Thu Aug 18 21:17:56 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Aug 18 21:17:56 2016 -0500

----------------------------------------------------------------------
 query_optimizer/physical/HashJoin.hpp     | 4 ++--
 relational_operators/HashJoinOperator.hpp | 8 ++++++++
 2 files changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4af5965/query_optimizer/physical/HashJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.hpp b/query_optimizer/physical/HashJoin.hpp
index e24dbeb..32b4f21 100644
--- a/query_optimizer/physical/HashJoin.hpp
+++ b/query_optimizer/physical/HashJoin.hpp
@@ -121,8 +121,8 @@ class HashJoin : public BinaryJoin {
                   residual_predicate_,
                   project_expressions(),
                   join_type_,
-                  bloom_filter_config_,
-                  left_filter_predicate_);
+                  left_filter_predicate_,
+                  bloom_filter_config_);
   }
 
   std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/f4af5965/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 05e16a4..4f53daa 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -511,6 +511,10 @@ class HashSemiJoinWorkOrder : public WorkOrder {
 
   void execute() override;
 
+  const Predicate *left_filter_predicate() const {
+    return left_filter_predicate_;
+  }
+
  private:
   void executeWithoutResidualPredicate();
 
@@ -645,6 +649,10 @@ class HashAntiJoinWorkOrder : public WorkOrder {
     }
   }
 
+  const Predicate *left_filter_predicate() const {
+    return left_filter_predicate_;
+  }
+
  private:
   void executeWithoutResidualPredicate();
 


[3/6] incubator-quickstep git commit: Minor fixes for the distributed version.

Posted by ji...@apache.org.
Minor fixes for the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/47d1248a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/47d1248a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/47d1248a

Branch: refs/heads/LIP-for-tpch-merged
Commit: 47d1248a5c61ba0458246a137b788f238ae094d0
Parents: 9838fcd
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Sat Aug 13 23:22:41 2016 -0700
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Aug 18 16:10:16 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                  | 16 +++++++-------
 query_execution/PolicyEnforcerDistributed.cpp   | 10 ++++-----
 query_execution/PolicyEnforcerDistributed.hpp   |  6 +++---
 query_execution/QueryExecutionTypedefs.hpp      |  4 ++--
 query_execution/Shiftboss.cpp                   | 20 +++++++++++++++---
 query_execution/Shiftboss.hpp                   | 22 +++++++++++++-------
 .../tests/execution_generator/CMakeLists.txt    |  2 +-
 7 files changed, 51 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47d1248a/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index ff0fe08..c7b9d61 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -31,7 +31,7 @@ endif()
 add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
 if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
 add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
@@ -52,12 +52,12 @@ add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryEx
 add_library(quickstep_queryexecution_QueryManagerBase QueryManagerBase.cpp QueryManagerBase.hpp)
 if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_QueryManagerDistributed QueryManagerDistributed.cpp QueryManagerDistributed.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp)
 if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_Shiftboss Shiftboss.cpp Shiftboss.hpp)
   add_library(quickstep_queryexecution_ShiftbossDirectory ../empty_src.cpp ShiftbossDirectory.hpp)
-endif()
+endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp)
 add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp)
 add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp)
@@ -80,7 +80,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_threading_ThreadUtil
                         quickstep_utility_Macros
                         tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_ForemanBase
                       glog
                       quickstep_threading_Thread
@@ -223,7 +223,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_utility_DAG
                         quickstep_utility_Macros
                         tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
                       quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_QueryContext
@@ -262,7 +262,7 @@ if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution_ShiftbossDirectory
                         quickstep_utility_Macros
                         tmb)
-endif()
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_WorkOrderProtosContainer
                       glog
                       quickstep_relationaloperators_WorkOrder_proto
@@ -321,7 +321,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_QueryManagerDistributed
                         quickstep_queryexecution_Shiftboss
                         quickstep_queryexecution_ShiftbossDirectory)
-endif()
+endif(ENABLE_DISTRIBUTED)
 
 # Tests:
 if (ENABLE_DISTRIBUTED)
@@ -347,7 +347,7 @@ if (ENABLE_DISTRIBUTED)
                         tmb
                         ${LIBS})
   add_test(BlockLocator_unittest BlockLocator_unittest)
-endif()
+endif(ENABLE_DISTRIBUTED)
 
 add_executable(QueryManagerSingleNode_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManagerSingleNode_unittest.cpp")

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47d1248a/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index c76a9e1..47491ed 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -58,16 +58,16 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
               " can be allocated in a single round of dispatch of messages to"
               " the workers.");
 
-void PolicyEnforcerDistributed::getWorkOrderMessages(
-    vector<unique_ptr<S::WorkOrderMessage>> *work_order_messages) {
+void PolicyEnforcerDistributed::getWorkOrderProtoMessages(
+    vector<unique_ptr<S::WorkOrderMessage>> *work_order_proto_messages) {
   // Iterate over admitted queries until either there are no more
   // messages available, or the maximum number of messages have
   // been collected.
-  DCHECK(work_order_messages->empty());
+  DCHECK(work_order_proto_messages->empty());
   // TODO(harshad) - Make this function generic enough so that it
   // works well when multiple queries are getting executed.
   if (admitted_queries_.empty()) {
-    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+    LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running";
     return;
   }
 
@@ -86,7 +86,7 @@ void PolicyEnforcerDistributed::getWorkOrderMessages(
           static_cast<QueryManagerDistributed*>(curr_query_manager)->getNextWorkOrderMessage(0);
       if (next_work_order_message != nullptr) {
         ++messages_collected_curr_query;
-        work_order_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message));
+        work_order_proto_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message));
       } else {
         // No more work ordes from the current query at this time.
         // Check if the query's execution is over.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47d1248a/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 16ebe36..bce3e0c 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -76,10 +76,10 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
    * @brief Get work order messages to be dispatched. These messages come from
    *        the active queries.
    *
-   * @param work_order_messages The work order messages to be dispatched.
+   * @param work_order_proto_messages The work order messages to be dispatched.
    **/
-  void getWorkOrderMessages(
-      std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_messages);
+  void getWorkOrderProtoMessages(
+      std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_proto_messages);
 
   /**
    * @brief Process the initiate rebuild work order response message.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47d1248a/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 33a93b0..bba67e3 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -63,8 +63,8 @@ using ClientIDMap = ThreadIDBasedMap<client_id,
 
 // We sort the following message types in the order of a life cycle of a query.
 enum QueryExecutionMessageType : message_type_id {
-  kAdmitRequestMessage,  // Requesting a query (or queries) to be admitted, from
-                         // the main thread to Foreman.
+  kAdmitRequestMessage = 0,  // Requesting a query (or queries) to be admitted, from
+                             // the main thread to Foreman.
   kWorkOrderMessage,  // From Foreman to Worker.
   kWorkOrderCompleteMessage,  // From Worker to Foreman.
   kCatalogRelationNewBlockMessage,  // From InsertDestination to Foreman.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47d1248a/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index ddfd47f..5c2c5e0 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -147,10 +147,11 @@ void Shiftboss::run() {
                                       proto.relation_id());
         break;
       }
-      case kWorkOrderCompleteMessage:  // Fall through.
-      case kRebuildWorkOrderCompleteMessage:
+      case kCatalogRelationNewBlockMessage:  // Fall through.
       case kDataPipelineMessage:
-      case kWorkOrderFeedbackMessage: {
+      case kWorkOrderFeedbackMessage:
+      case kWorkOrderCompleteMessage:
+      case kRebuildWorkOrderCompleteMessage: {
         DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
                    << "') forwarded typed '" << annotated_message.tagged_message.message_type()
                    << "' message from Worker with TMB client ID '" << annotated_message.sender
@@ -165,6 +166,15 @@ void Shiftboss::run() {
         CHECK(send_status == MessageBus::SendStatus::kOK);
         break;
       }
+      case kQueryTeardownMessage: {
+        const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+        serialization::QueryTeardownMessage proto;
+        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+        query_contexts_.erase(proto.query_id());
+        break;
+      }
       case kSaveQueryResultMessage: {
         const TaggedMessage &tagged_message = annotated_message.tagged_message;
 
@@ -175,8 +185,12 @@ void Shiftboss::run() {
           storage_manager_->saveBlockOrBlob(proto.blocks(i));
         }
 
+        // Clean up query execution states, i.e., QueryContext.
+        query_contexts_.erase(proto.query_id());
+
         serialization::SaveQueryResultResponseMessage proto_response;
         proto_response.set_relation_id(proto.relation_id());
+        proto_response.set_cli_id(proto.cli_id());
 
         const size_t proto_response_length = proto_response.ByteSize();
         char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47d1248a/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 30a8d1a..94b10a2 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -21,6 +21,7 @@
 #define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
 
 #include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <unordered_map>
 
@@ -97,27 +98,34 @@ class Shiftboss : public Thread {
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage);
 
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
+
     // Message sent to Worker.
     bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);
 
-    // Message sent to Foreman.
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
-
     // Forward the following message types from Foreman to Workers.
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage);
 
     // Forward the following message types from Workers to Foreman.
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
+
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kDataPipelineMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
+
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderFeedbackMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
+
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage);
 
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
 
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
+    // Clean up query execution states, i.e., QueryContext.
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage);
 
     // Stop itself.
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/47d1248a/query_optimizer/tests/execution_generator/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt
index 1980980..0c00ff6 100644
--- a/query_optimizer/tests/execution_generator/CMakeLists.txt
+++ b/query_optimizer/tests/execution_generator/CMakeLists.txt
@@ -83,4 +83,4 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Join)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Select)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatching)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/TableGenerator)
-file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)
\ No newline at end of file
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)