You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/02/09 23:42:43 UTC

[1/9] incubator-quickstep git commit: Refactor building.md [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/two-level-tmb 22cd19223 -> d8fc9461b (forced update)


Refactor building.md


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5773027f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5773027f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5773027f

Branch: refs/heads/two-level-tmb
Commit: 5773027ffcc55eed51a693df524a5558ac7305db
Parents: a9fe07d
Author: cramja <ma...@gmail.com>
Authored: Tue Feb 7 18:41:16 2017 -0600
Committer: cramja <ma...@gmail.com>
Committed: Wed Feb 8 19:27:14 2017 -0600

----------------------------------------------------------------------
 BUILDING.md | 167 +++++++++++++++++++++++++++++++------------------------
 1 file changed, 94 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5773027f/BUILDING.md
----------------------------------------------------------------------
diff --git a/BUILDING.md b/BUILDING.md
index 02a3a58..cc362fc 100644
--- a/BUILDING.md
+++ b/BUILDING.md
@@ -1,60 +1,73 @@
-Quickstep Build Guide
-=====================
+# Quickstep Build Guide
 
+**Contents**
+* [Basic Instructions](#basic-instructions)
+  * [Prerequisites](#prerequisites)
+  * [Building](#building)
+  * [Running Quickstep](#running-quickstep)
+  * [Running Tests](#running-tests)
+  * [Configuring with CMake](#configuring-with-cmake)
+* [Advanced Configuration](#advanced-configuration)
+* [Appendix](#appendix)
+  * [Building on Windows](#building-on-windows)
+  * [Building in Vagrant](#building-in-vagrant)
 
-What You Will Need
-------------------
 
-To build quickstep, you will need a C++ compiler that supports the C++14
-standard (GCC 4.9 or Clang 3.4 or higher are known to work), and CMake. If you
-have GNU Bison and Flex as well, they will be used to build the parser and
-lexer sources (otherwise, preprocessed sources made with Bison and Flex will be
-used).
+**Short Version**
 
-### Vagrant
+```sh
+git submodule init
+git submodule update
+cd third_party
+./download_and_patch_prerequisites.sh
+cd ../build
+cmake ..
+make quickstep_cli_shell
+./quickstep_cli_shell -initialize_db=true
+```
 
-For your convenience, we have provided Vagrant virtual machine configurations
-that have a complete development environment for Quickstep with all necessary
-tools and dependencies already installed. [See here for instructions on how to
-use them](build/vagrant/README.md).
+# Basic Instructions
 
-### Getting CMake
+## Prerequisites
 
-Quickstep uses the CMake build system.
+- C++ compiler that supports the C++14 standard (GCC 4.9+ or Clang 3.4+ are good)
+- [cmake](http://www.cmake.org/download/) 2.8.6+
+- curl
 
-If you're on a Linux machine, most distributions' package managers have a
-package for CMake that you can install. The same goes for all of the major
-flavors of BSD UNIX (Free/Net/Open/Dragonfly), OpenSolaris, and Cygwin.
+All these programs should be available on your distro's package manager.
 
-If you're using Mac OS X or Windows, you can download CMake binaries from:
-http://www.cmake.org/download/
+**Optional**
 
-On Mac or Windows, be sure to let the installer put links to the CMake command-
-line tools in bin or add them to your PATH.
+- GNU Bison and Flex (They will be used to build the parser and lexer, but pre-processed copies are provided)
 
-### Special Note: Building on Windows
+## Building
 
-To build on Windows, you will need some variety of Microsoft's C++ compiler and
-the nmake tool (either from Visual Studio, Visual C++ Express, or the Windows
-SDK). Only Visual Studio 2015 or higher is sufficiently modern to build
-Quickstep.
+Once cmake finishes, you are ready to actually build quickstep by running
+`make` (or `nmake` on Windows) (this will also build bundled third-party
+libraries as necesary). If you want to see the actual commands that make is
+running, you can do `make VERBOSE=1`. It is highly recommended to do a parallel
+make to speed up the build time, which you can do with `make -jX`, where X is
+the number of parallel jobs (the number of CPU cores on your system is a good
+choice, unless you are low on RAM, in which case you may want to reduce the
+number of jobs).
 
-Once you have the necessary tools installed, run the "Visual Studio Command
-Prompt" (use the 64-bit version if you have it). Change into the build
-directory and run:
+## Running Quickstep
 
-    cmake -G "NMake Makefiles" ..
+To use quickstep, just run `quickstep_cli_shell` in the build directory. For the
+first time user, run once with `-initialize_db=true` to set up an empty catalog.
+Quickstep has number of command-line flags that control its behavior. Run
+`quickstep_cli_shell --help` to see a listing of the options and how to use
+them.
 
-The `-G "NMake Makefiles"` option tells CMake to generate makefiles for the nmake
-tool instead of project files for Visual Studio. You can also specify the usual
-cmake options described below like `-D CMAKE_BUILD_TYPE=Release`.
+## Running Tests
 
-Once cmake finishes, run `nmake` to actually build quickstep. Unfortunately,
-nmake does not support parallel jobs like UNIX make, so you're in for a bit of
-a wait.
+Quickstep comes with an extensive suite of unit tests. After a successful
+build, you can run the whole test suite by doing `make test` or `ctest`. If
+you use `ctest`, you may also run tests in parallel with `ctest -jX`, where
+X is the number of parallel jobs (as with `make`, your number of CPU cores is
+usually a good choice).
 
-Configuring with CMake
-----------------------
+## Configuring with CMake
 
 CMake recommends building outside of the source tree (a recommendation which we
 follow). For your convenience, a "build" directory with a skeleton of files
@@ -65,21 +78,25 @@ Like a conventional configure script, you can configure some settings about how
 quickstep is built when you invoke cmake. The most important is the build type.
 You can build an unoptimized build with debugging information by doing:
 
-    cmake -D CMAKE_BUILD_TYPE=Debug ..
+```
+cmake -D CMAKE_BUILD_TYPE=Debug ..
+```
 
 You can build a fast, optimized release build by doing:
 
-    cmake -D CMAKE_BUILD_TYPE=Release ..
+```
+cmake -D CMAKE_BUILD_TYPE=Release ..
+```
 
 The first time you check out the Quickstep source repo, you will also need to
-fetch some third-party dependencies that are packaged as git submodules. Do
-this by running the following 2 commands in the root quickstep directory:
+fetch some third-party dependencies. Do this by running the following commands 
+in the root quickstep directory:
 
-    git submodule init
-    git submodule update
-    cd third_party && ./download_and_patch_prerequisites.sh
+```
+cd third_party && ./download_and_patch_prerequisites.sh
+```
 
-### Advanced Configuration
+# Advanced Configuration
 
 There are a number of advanced options you can pass to CMake to control how
 Quickstep is built. These all have sensible defaults, so you may skip this
@@ -91,7 +108,9 @@ section and go straight to "Building" below if you are not interested.
   `CMAKE_C_COMPILER` and `CMAKE_CXX_COMPILER` options. For example, if you
   wish to use clang instead of gcc, you would do this:
 
-      cmake -D CMAKE_BUILD_TYPE=Release -D CMAKE_C_COMPILER=clang -D CMAKE_CXX_COMPILER=clang++ ../
+```
+cmake -D CMAKE_BUILD_TYPE=Release -D CMAKE_C_COMPILER=clang -D CMAKE_CXX_COMPILER=clang++ ../
+```
 
 * **Disabling TCMalloc**: You can configure whether quickstep should use
   tcmalloc (it does by default). tcmalloc stands for thread-cacheing malloc, it
@@ -160,6 +179,7 @@ section and go straight to "Building" below if you are not interested.
   default, the Quickstep storage engine will always try to rebuild an index if
   it runs out of space, but this behavior can be disabled by setting
   `-D REBUILD_INDEX_ON_UPDATE_OVERFLOW=0`.
+
 * **Building With libc++**: The Clang compiler is usually used with the
   system-default C++ standard library (on most Linux systems, this is GNU
   libstdc++, which is packaged with GCC). Clang can also be used with the LLVM
@@ -168,6 +188,7 @@ section and go straight to "Building" below if you are not interested.
   standard library). If you are using Clang on a system that has libc++
   installed but doesn't use it by default, add `-D USE_LIBCXX=1` to make
   Clang use libc++.
+
 * **Link-Time Optimization**: Some compilers support link-time optimization,
   where all the objects linked into an executable are analyzed and optimized
   together as if they were a single translation unit. This potentially enables
@@ -176,32 +197,32 @@ section and go straight to "Building" below if you are not interested.
   release builds with GCC or ICC by doing `-D ENABLE_LTO=1`. Be aware that the
   build may take a very long time.
 
-Building
---------
+# Appendix
 
-Once cmake finishes, you are ready to actually build quickstep by running
-`make` (or `nmake` on Windows) (this will also build bundled third-party
-libraries as necesary). If you want to see the actual commands that make is
-running, you can do `make VERBOSE=1`. It is highly recommended to do a parallel
-make to speed up the build time, which you can do with `make -jX`, where X is
-the number of parallel jobs (the number of CPU cores on your system is a good
-choice, unless you are low on RAM, in which case you may want to reduce the
-number of jobs).
+## Building on Windows
 
-Running Quickstep
------------------
+To build on Windows, you will need some variety of Microsoft's C++ compiler and
+the nmake tool (either from Visual Studio, Visual C++ Express, or the Windows
+SDK). Only Visual Studio 2015 or higher is sufficiently modern to build
+Quickstep.
 
-To use quickstep, just run `quickstep_cli_shell` in the build directory. For the
-first time user, run once with `-initialize_db=true` to set up an empty catalog.
-Quickstep has number of command-line flags that control its behavior. Run
-`quickstep_cli_shell --help` to see a listing of the options and how to use
-them.
+Once you have the necessary tools installed, run the "Visual Studio Command
+Prompt" (use the 64-bit version if you have it). Change into the build
+directory and run:
 
-Running Tests
--------------
+    cmake -G "NMake Makefiles" ..
 
-Quickstep comes with an extensive suite of unit tests. After a successful
-build, you can run the whole test suite by doing `make test` or `ctest`. If
-you use `ctest`, you may also run tests in parallel with `ctest -jX`, where
-X is the number of parallel jobs (as with `make`, your number of CPU cores is
-usually a good choice).
+The `-G "NMake Makefiles"` option tells CMake to generate makefiles for the nmake
+tool instead of project files for Visual Studio. You can also specify the usual
+cmake options described below like `-D CMAKE_BUILD_TYPE=Release`.
+
+Once cmake finishes, run `nmake` to actually build quickstep. Unfortunately,
+nmake does not support parallel jobs like UNIX make, so you're in for a bit of
+a wait.
+
+## Building in Vagrant
+
+For your convenience, we have provided Vagrant virtual machine configurations
+that have a complete development environment for Quickstep with all necessary
+tools and dependencies already installed. [See here for instructions on how to
+use them](build/vagrant/README.md).


[6/9] incubator-quickstep git commit: Defined TMB Message Poll Interval as a gflag.

Posted by zu...@apache.org.
Defined TMB Message Poll Interval as a gflag.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1cfc1c40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1cfc1c40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1cfc1c40

Branch: refs/heads/two-level-tmb
Commit: 1cfc1c40e2bcf7ff8671d5b899a8304c9e9fd455
Parents: a28b1e4
Author: Zuyu Zhang <zu...@apache.org>
Authored: Mon Feb 6 12:26:56 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Feb 9 10:31:08 2017 -0800

----------------------------------------------------------------------
 third_party/src/tmb/CMakeLists.txt            | 41 ++++++++++++----------
 third_party/src/tmb/include/tmb/message_bus.h |  5 ---
 third_party/src/tmb/src/message_bus.cc        | 20 +++++++++--
 3 files changed, 40 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1cfc1c40/third_party/src/tmb/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/third_party/src/tmb/CMakeLists.txt b/third_party/src/tmb/CMakeLists.txt
index 14e467b..a280a93 100644
--- a/third_party/src/tmb/CMakeLists.txt
+++ b/third_party/src/tmb/CMakeLists.txt
@@ -379,6 +379,26 @@ if (ENABLE_ZOOKEEPER)
   include_directories(${ZOOKEEPER_INCLUDE_DIRS})
 endif()
 
+set_gflags_lib_name ()
+
+# NOTE(chasseur): We only add gflags and gtest to the build if those targets
+# don't already exist, so that TMB can be brought in to a build that already
+# uses one or both of those libraries with add_subdirectory() and not cause
+# name collisions.
+
+# Build GFlags command-line processing library if needed.
+if ((NOT TARGET ${GFLAGS_LIB_NAME}) AND (BUILD_BENCHMARKS OR ENABLE_NATIVENET))
+  add_subdirectory(third_party/gflags)
+  include_directories(${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include)
+endif()
+
+# Googletest Framework For Unit Testing
+if (NOT TARGET gtest)
+  add_subdirectory(third_party/gtest)
+  include_directories(third_party/gtest/include)
+  enable_testing()
+endif()
+
 # Include path for TMB.
 include_directories(${PROJECT_SOURCE_DIR}/include)
 set(TMB_INCLUDE_DIRS ${TMB_INCLUDE_DIRS} CACHE STRING
@@ -391,7 +411,8 @@ link_directories(${tmb_BINARY_DIR}/src)
 add_library(tmb
             ${TMB_SRCS})
 target_link_libraries(tmb
-                      ${CMAKE_THREAD_LIBS_INIT})
+                      ${CMAKE_THREAD_LIBS_INIT}
+                      ${GFLAGS_LIB_NAME})
 
 if (ENABLE_LEVELDB)
   target_link_libraries(tmb
@@ -418,24 +439,6 @@ if (ENABLE_ZOOKEEPER)
                         ${ZOOKEEPER_LIBRARIES})
 endif()
 
-# NOTE(chasseur): We only add gflags and gtest to the build if those targets
-# don't already exist, so that TMB can be brought in to a build that already
-# uses one or both of those libraries with add_subdirectory() and not cause
-# name collisions.
-
-# Build GFlags command-line processing library if needed.
-if ((NOT TARGET gflags_nothreads-static) AND (BUILD_BENCHMARKS OR ENABLE_NATIVENET))
-  add_subdirectory(third_party/gflags)
-  include_directories(${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include)
-endif()
-
-# Googletest Framework For Unit Testing
-if (NOT TARGET gtest)
-  add_subdirectory(third_party/gtest)
-  include_directories(third_party/gtest/include)
-  enable_testing()
-endif()
-
 # Build the tmb_net_server executable if enabled.
 if (ENABLE_NATIVENET)
   add_executable(tmb_net_server src/tmb_net_server.cc)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1cfc1c40/third_party/src/tmb/include/tmb/message_bus.h
----------------------------------------------------------------------
diff --git a/third_party/src/tmb/include/tmb/message_bus.h b/third_party/src/tmb/include/tmb/message_bus.h
index a4ca525..74e298d 100644
--- a/third_party/src/tmb/include/tmb/message_bus.h
+++ b/third_party/src/tmb/include/tmb/message_bus.h
@@ -496,11 +496,6 @@ class MessageBus {
       internal::IteratorAdapter<const AnnotatedMessage> *adapter) = 0;
 
  private:
-  // The number of milliseconds to sleep between calls to
-  // ReceiveIfAvailableImpl() in the default active-polling implementation of
-  // ReceiveImpl().
-  static const unsigned int kReceivePollIntervalMS = 100;
-
   // Disallow copy and assign:
   MessageBus(const MessageBus &orig) = delete;
   MessageBus& operator=(const MessageBus &rhs) = delete;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1cfc1c40/third_party/src/tmb/src/message_bus.cc
----------------------------------------------------------------------
diff --git a/third_party/src/tmb/src/message_bus.cc b/third_party/src/tmb/src/message_bus.cc
index 44324ec..7fd0efb 100644
--- a/third_party/src/tmb/src/message_bus.cc
+++ b/third_party/src/tmb/src/message_bus.cc
@@ -24,9 +24,25 @@
 #include <cstdlib>
 #include <thread>  // NOLINT(build/c++11)
 
+#include "gflags/gflags.h"
+
 namespace tmb {
 
-const unsigned int MessageBus::kReceivePollIntervalMS;
+static bool ValidateTmbReceivePollInterval(const char *flagname,
+                                           std::int32_t value) {
+  if (value > 0) {
+    return true;
+  } else {
+    std::fprintf(stderr, "--%s must be at least 1\n", flagname);
+    return false;
+  }
+}
+DEFINE_int32(tmb_receive_poll_interval, 50,
+             "The number of milliseconds to sleep between calls to ReceiveIfAvailableImpl() "
+             "in the default active-polling implementation of ReceiveImpl().");
+static const bool tmb_receive_poll_interval_dummy = gflags::RegisterFlagValidator(
+    &FLAGS_tmb_receive_poll_interval,
+    &ValidateTmbReceivePollInterval);
 
 internal::NetMessageRemovalInterface*
     MessageBus::GetNetMessageRemovalInterface() {
@@ -49,7 +65,7 @@ std::size_t MessageBus::ReceiveImpl(const client_id receiver_id,
                                                 pusher);
   while (received == 0) {
     std::this_thread::sleep_for(
-        std::chrono::milliseconds(kReceivePollIntervalMS));
+        std::chrono::milliseconds(FLAGS_tmb_receive_poll_interval));
     received = ReceiveIfAvailableImpl(receiver_id,
                                       minimum_priority,
                                       max_messages,


[4/9] incubator-quickstep git commit: Fuse Aggregate with LeftOuterJoin to accelerate evaluation.

Posted by zu...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/relational_operators/BuildAggregationExistenceMapOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.hpp b/relational_operators/BuildAggregationExistenceMapOperator.hpp
new file mode 100644
index 0000000..e2928a8
--- /dev/null
+++ b/relational_operators/BuildAggregationExistenceMapOperator.hpp
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
+
+#include <cstddef>
+
+#include <string>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class AggregationOperationState;
+class CatalogRelationSchema;
+class StorageManager;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ *  @{
+ */
+
+/**
+ * @brief An operator which builds a bit vector on the input relation's one
+ *        attribute where the bit vector serves as the existence map for an
+ *        AggregationOperationState's CollisionFreeVectorTable.
+ **/
+class BuildAggregationExistenceMapOperator : public RelationalOperator {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param query_id The ID of the query to which this operator belongs.
+   * @param input_relation The relation to build the existence map on.
+   * @param build_attribute The ID of the attribute to build the existence map on.
+   * @param input_relation_is_stored If input_relation is a stored relation and
+   *        is fully available to the operator before it can start generating
+   *        workorders.
+   * @param aggr_state_index The index of the AggregationState in QueryContext.
+   **/
+  BuildAggregationExistenceMapOperator(const std::size_t query_id,
+                                       const CatalogRelation &input_relation,
+                                       const attribute_id build_attribute,
+                                       const bool input_relation_is_stored,
+                                       const QueryContext::aggregation_state_id aggr_state_index)
+      : RelationalOperator(query_id),
+        input_relation_(input_relation),
+        build_attribute_(build_attribute),
+        input_relation_is_stored_(input_relation_is_stored),
+        input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
+                                                           : std::vector<block_id>()),
+        aggr_state_index_(aggr_state_index),
+        num_workorders_generated_(0),
+        started_(false) {}
+
+  ~BuildAggregationExistenceMapOperator() override {}
+
+  std::string getName() const override {
+    return "BuildAggregationExistenceMapOperator";
+  }
+
+  /**
+   * @return The input relation.
+   */
+  const CatalogRelation& input_relation() const {
+    return input_relation_;
+  }
+
+  bool getAllWorkOrders(WorkOrdersContainer *container,
+                        QueryContext *query_context,
+                        StorageManager *storage_manager,
+                        const tmb::client_id scheduler_client_id,
+                        tmb::MessageBus *bus) override;
+
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+                      const partition_id part_id) override {
+    input_relation_block_ids_.push_back(input_block_id);
+  }
+
+ private:
+  serialization::WorkOrder* createWorkOrderProto(const block_id block);
+
+  const CatalogRelation &input_relation_;
+  const attribute_id build_attribute_;
+  const bool input_relation_is_stored_;
+  std::vector<block_id> input_relation_block_ids_;
+  const QueryContext::aggregation_state_id aggr_state_index_;
+
+  std::vector<block_id>::size_type num_workorders_generated_;
+  bool started_;
+
+  DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapOperator);
+};
+
+/**
+ * @brief A WorkOrder produced by BuildAggregationExistenceMapOperator.
+ **/
+class BuildAggregationExistenceMapWorkOrder : public WorkOrder {
+ public:
+  /**
+   * @brief Constructor
+   *
+   * @param query_id The ID of this query.
+   * @param input_relation The relation to build the existence map on.
+   * @param build_block_id The block id.
+   * @param build_attribute The ID of the attribute to build on.
+   * @param state The AggregationState to use.
+   * @param storage_manager The StorageManager to use.
+   **/
+  BuildAggregationExistenceMapWorkOrder(const std::size_t query_id,
+                                        const CatalogRelationSchema &input_relation,
+                                        const block_id build_block_id,
+                                        const attribute_id build_attribute,
+                                        AggregationOperationState *state,
+                                        StorageManager *storage_manager)
+      : WorkOrder(query_id),
+        input_relation_(input_relation),
+        build_block_id_(build_block_id),
+        build_attribute_(build_attribute),
+        state_(DCHECK_NOTNULL(state)),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
+  ~BuildAggregationExistenceMapWorkOrder() override {}
+
+  void execute() override;
+
+ private:
+  const CatalogRelationSchema &input_relation_;
+  const block_id build_block_id_;
+  const attribute_id build_attribute_;
+  AggregationOperationState *state_;
+
+  StorageManager *storage_manager_;
+
+  DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapWorkOrder);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index df4114d..457d58a 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -33,6 +33,9 @@ set_gflags_lib_name ()
 
 # Declare micro-libs:
 add_library(quickstep_relationaloperators_AggregationOperator AggregationOperator.cpp AggregationOperator.hpp)
+add_library(quickstep_relationaloperators_BuildAggregationExistenceMapOperator
+            BuildAggregationExistenceMapOperator.cpp
+            BuildAggregationExistenceMapOperator.hpp)
 add_library(quickstep_relationaloperators_BuildHashOperator BuildHashOperator.cpp BuildHashOperator.hpp)
 add_library(quickstep_relationaloperators_BuildLIPFilterOperator BuildLIPFilterOperator.cpp BuildLIPFilterOperator.hpp)
 add_library(quickstep_relationaloperators_CreateIndexOperator CreateIndexOperator.cpp CreateIndexOperator.hpp)
@@ -95,6 +98,31 @@ target_link_libraries(quickstep_relationaloperators_AggregationOperator
                       quickstep_utility_lipfilter_LIPFilterAdaptiveProber
                       quickstep_utility_lipfilter_LIPFilterUtil
                       tmb)
+target_link_libraries(quickstep_relationaloperators_BuildAggregationExistenceMapOperator
+                      glog
+                      quickstep_catalog_CatalogAttribute
+                      quickstep_catalog_CatalogRelation
+                      quickstep_catalog_CatalogRelationSchema
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_WorkOrderProtosContainer
+                      quickstep_queryexecution_WorkOrdersContainer
+                      quickstep_relationaloperators_RelationalOperator
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_storage_AggregationOperationState
+                      quickstep_storage_CollisionFreeVectorTable
+                      quickstep_storage_StorageBlock
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageManager
+                      quickstep_storage_TupleStorageSubBlock
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_types_TypeID
+                      quickstep_utility_BarrieredReadWriteConcurrentBitVector
+                      quickstep_utility_Macros
+                      tmb)
 target_link_libraries(quickstep_relationaloperators_BuildHashOperator
                       glog
                       quickstep_catalog_CatalogRelation
@@ -518,6 +546,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
                       quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_QueryContext
                       quickstep_relationaloperators_AggregationOperator
+                      quickstep_relationaloperators_BuildAggregationExistenceMapOperator
                       quickstep_relationaloperators_BuildHashOperator
                       quickstep_relationaloperators_BuildLIPFilterOperator
                       quickstep_relationaloperators_DeleteOperator
@@ -552,6 +581,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrder_proto
 add_library(quickstep_relationaloperators ../empty_src.cpp RelationalOperatorsModule.hpp)
 target_link_libraries(quickstep_relationaloperators
                       quickstep_relationaloperators_AggregationOperator
+                      quickstep_relationaloperators_BuildAggregationExistenceMapOperator
                       quickstep_relationaloperators_BuildLIPFilterOperator
                       quickstep_relationaloperators_BuildHashOperator
                       quickstep_relationaloperators_CreateIndexOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 76753d2..d0d0753 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -44,6 +44,7 @@ enum WorkOrderType {
   UPDATE = 20;
   WINDOW_AGGREGATION = 21;
   DESTROY_AGGREGATION_STATE = 22;
+  BUILD_AGGREGATION_EXISTENCE_MAP = 23;
 }
 
 message WorkOrder {
@@ -278,6 +279,15 @@ message WindowAggregationWorkOrder {
 
 message DestroyAggregationStateWorkOrder {
   extend WorkOrder {
-    optional uint32 aggr_state_index = 339;
+    optional uint32 aggr_state_index = 352;
+  }
+}
+
+message BuildAggregationExistenceMapWorkOrder {
+  extend WorkOrder {
+    optional int32 relation_id = 368;
+    optional fixed64 build_block_id = 369;
+    optional int32 build_attribute = 370;
+    optional uint32 aggr_state_index = 371;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index bd2a0f8..d2c8251 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -29,6 +29,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "relational_operators/AggregationOperator.hpp"
+#include "relational_operators/BuildAggregationExistenceMapOperator.hpp"
 #include "relational_operators/BuildHashOperator.hpp"
 #include "relational_operators/BuildLIPFilterOperator.hpp"
 #include "relational_operators/DeleteOperator.hpp"
@@ -91,6 +92,19 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           CreateLIPFilterAdaptiveProberHelper(
               proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index), query_context));
     }
+    case serialization::BUILD_AGGREGATION_EXISTENCE_MAP: {
+      LOG(INFO) << "Creating BuildAggregationExistenceMapWorkOrder in Shiftboss " << shiftboss_index;
+
+      return new BuildAggregationExistenceMapWorkOrder(
+          proto.query_id(),
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id)),
+          proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id),
+          proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_attribute),
+          query_context->getAggregationState(
+              proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)),
+          storage_manager);
+    }
     case serialization::BUILD_LIP_FILTER: {
       LOG(INFO) << "Creating BuildLIPFilterWorkOrder in Shiftboss " << shiftboss_index;
 
@@ -525,6 +539,29 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
              query_context.isValidAggregationStateId(
                  proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index));
     }
+    case serialization::BUILD_AGGREGATION_EXISTENCE_MAP: {
+      if (!proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id)) {
+        return false;
+      }
+
+      const relation_id rel_id =
+          proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id);
+      if (!catalog_database.hasRelationWithId(rel_id)) {
+        return false;
+      }
+
+      const CatalogRelationSchema &relation = catalog_database.getRelationSchemaById(rel_id);
+      const attribute_id build_attribute =
+          proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_attribute);
+      if (!relation.hasAttributeWithId(build_attribute)) {
+        return false;
+      }
+
+      return proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id) &&
+             proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index) &&
+             query_context.isValidAggregationStateId(
+                 proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index));
+    }
     case serialization::BUILD_HASH: {
       if (!proto.HasExtension(serialization::BuildHashWorkOrder::relation_id)) {
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 0b34908..0f39b41 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -412,12 +412,18 @@ std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
   }
 }
 
+CollisionFreeVectorTable* AggregationOperationState
+    ::getCollisionFreeVectorTable() const {
+  return static_cast<CollisionFreeVectorTable *>(
+      collision_free_hashtable_.get());
+}
+
 void AggregationOperationState::initialize(const std::size_t partition_id) {
   if (is_aggregate_collision_free_) {
     static_cast<CollisionFreeVectorTable *>(
         collision_free_hashtable_.get())->initialize(partition_id);
   } else {
-    LOG(FATAL) << "AggregationOperationState::initializeState() "
+    LOG(FATAL) << "AggregationOperationState::initialize() "
                << "is not supported by this aggregation";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 13ee377..c8930ee 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -41,6 +41,7 @@ namespace serialization { class AggregationOperationState; }
 class AggregateFunction;
 class CatalogDatabaseLite;
 class CatalogRelationSchema;
+class CollisionFreeVectorTable;
 class InsertDestination;
 class LIPFilterAdaptiveProber;
 class StorageManager;
@@ -198,6 +199,14 @@ class AggregationOperationState {
   void finalizeAggregate(const std::size_t partition_id,
                          InsertDestination *output_destination);
 
+  /**
+   * @brief Get the collision-free vector table used by this aggregation.
+   *
+   * @return The collision-free vector table used by this aggregation.
+   *         Returns NULL if collision-free vector table is not used.
+   */
+  CollisionFreeVectorTable* getCollisionFreeVectorTable() const;
+
  private:
   // Check whether partitioned aggregation can be applied.
   bool checkAggregatePartitioned(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 4f3e238..772d47d 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -105,6 +105,15 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
   }
 
   /**
+   * @brief Get the existence map for this vector table.
+   *
+   * @return The existence map for this vector table.
+   */
+  inline BarrieredReadWriteConcurrentBitVector* getExistenceMap() const {
+    return existence_map_.get();
+  }
+
+  /**
    * @brief Initialize the specified partition of this aggregation table.
    *
    * @param partition_id ID of the partition to be initialized.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/utility/lip_filter/BitVectorExactFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/BitVectorExactFilter.hpp b/utility/lip_filter/BitVectorExactFilter.hpp
index 6ad0567..48fd5e1 100644
--- a/utility/lip_filter/BitVectorExactFilter.hpp
+++ b/utility/lip_filter/BitVectorExactFilter.hpp
@@ -20,17 +20,16 @@
 #ifndef QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
 #define QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
 
-#include <atomic>
+#include <cstddef>
 #include <cstdint>
-#include <cstring>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageConstants.hpp"
 #include "storage/ValueAccessor.hpp"
 #include "storage/ValueAccessorUtil.hpp"
 #include "types/Type.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
 #include "utility/Macros.hpp"
 #include "utility/lip_filter/LIPFilter.hpp"
 
@@ -64,14 +63,10 @@ class BitVectorExactFilter : public LIPFilter {
       : LIPFilter(LIPFilterType::kBitVectorExactFilter),
         min_value_(static_cast<CppType>(min_value)),
         max_value_(static_cast<CppType>(max_value)),
-        bit_array_(GetByteSize(max_value - min_value + 1)) {
+        bit_vector_(max_value - min_value + 1) {
     DCHECK_EQ(min_value, static_cast<std::int64_t>(min_value_));
     DCHECK_EQ(max_value, static_cast<std::int64_t>(max_value_));
     DCHECK_GE(max_value_, min_value_);
-
-    std::memset(bit_array_.data(),
-                0x0,
-                sizeof(std::atomic<std::uint8_t>) * GetByteSize(max_value - min_value + 1));
   }
 
   void insertValueAccessor(ValueAccessor *accessor,
@@ -109,13 +104,6 @@ class BitVectorExactFilter : public LIPFilter {
 
  private:
   /**
-   * @brief Round up bit_size to multiples of 8.
-   */
-  inline static std::size_t GetByteSize(const std::size_t bit_size) {
-    return (bit_size + 7u) / 8u;
-  }
-
-  /**
    * @brief Iterate through the accessor and hash values into the internal bit
    *        array.
    */
@@ -164,8 +152,7 @@ class BitVectorExactFilter : public LIPFilter {
     DCHECK_GE(value, min_value_);
     DCHECK_LE(value, max_value_);
 
-    const CppType loc = value - min_value_;
-    bit_array_[loc >> 3u].fetch_or(1u << (loc & 7u), std::memory_order_relaxed);
+    bit_vector_.setBit(value - min_value_);
   }
 
   /**
@@ -177,9 +164,7 @@ class BitVectorExactFilter : public LIPFilter {
       return is_anti_filter;
     }
 
-    const CppType loc = value - min_value_;
-    const bool is_bit_set =
-        (bit_array_[loc >> 3u].load(std::memory_order_relaxed) & (1u << (loc & 7u))) != 0;
+    const bool is_bit_set = bit_vector_.getBit(value - min_value_);
 
     if (is_anti_filter) {
       return !is_bit_set;
@@ -190,7 +175,7 @@ class BitVectorExactFilter : public LIPFilter {
 
   const CppType min_value_;
   const CppType max_value_;
-  alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+  BarrieredReadWriteConcurrentBitVector bit_vector_;
 
   DISALLOW_COPY_AND_ASSIGN(BitVectorExactFilter);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index edd0d24..519d3e9 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -35,12 +35,12 @@ add_library(quickstep_utility_lipfilter_SingleIdentityHashFilter ../../empty_src
 target_link_libraries(quickstep_utility_lipfilter_BitVectorExactFilter
                       quickstep_catalog_CatalogTypedefs
                       quickstep_storage_StorageBlockInfo
-                      quickstep_storage_StorageConstants
                       quickstep_storage_ValueAccessor
                       quickstep_storage_ValueAccessorUtil
                       quickstep_types_Type
-                      quickstep_utility_lipfilter_LIPFilter
-                      quickstep_utility_Macros)
+                      quickstep_utility_BarrieredReadWriteConcurrentBitVector
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter)
 target_link_libraries(quickstep_utility_lipfilter_LIPFilter
                       quickstep_catalog_CatalogTypedefs
                       quickstep_storage_StorageBlockInfo
@@ -79,9 +79,9 @@ target_link_libraries(quickstep_utility_lipfilter_LIPFilter_proto
 target_link_libraries(quickstep_utility_lipfilter_SingleIdentityHashFilter
                       quickstep_catalog_CatalogTypedefs
                       quickstep_storage_StorageBlockInfo
-                      quickstep_storage_StorageConstants
                       quickstep_storage_ValueAccessor
                       quickstep_storage_ValueAccessorUtil
                       quickstep_types_Type
-                      quickstep_utility_lipfilter_LIPFilter
-                      quickstep_utility_Macros)
+                      quickstep_utility_BarrieredReadWriteConcurrentBitVector
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/utility/lip_filter/SingleIdentityHashFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/SingleIdentityHashFilter.hpp b/utility/lip_filter/SingleIdentityHashFilter.hpp
index 5c0e8a2..d7e3475 100644
--- a/utility/lip_filter/SingleIdentityHashFilter.hpp
+++ b/utility/lip_filter/SingleIdentityHashFilter.hpp
@@ -20,18 +20,15 @@
 #ifndef QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
 #define QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
 
-#include <atomic>
 #include <cstddef>
-#include <cstdint>
-#include <cstring>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageConstants.hpp"
 #include "storage/ValueAccessor.hpp"
 #include "storage/ValueAccessorUtil.hpp"
 #include "types/Type.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
 #include "utility/Macros.hpp"
 #include "utility/lip_filter/LIPFilter.hpp"
 
@@ -65,11 +62,8 @@ class SingleIdentityHashFilter : public LIPFilter {
   explicit SingleIdentityHashFilter(const std::size_t filter_cardinality)
       : LIPFilter(LIPFilterType::kSingleIdentityHashFilter),
         filter_cardinality_(filter_cardinality),
-        bit_array_(GetByteSize(filter_cardinality)) {
+        bit_vector_(filter_cardinality) {
     DCHECK_GE(filter_cardinality, 1u);
-    std::memset(bit_array_.data(),
-                0x0,
-                sizeof(std::atomic<std::uint8_t>) * GetByteSize(filter_cardinality));
   }
 
   void insertValueAccessor(ValueAccessor *accessor,
@@ -158,8 +152,9 @@ class SingleIdentityHashFilter : public LIPFilter {
    * @brief Inserts a given value into the hash filter.
    */
   inline void insert(const void *key_begin) {
-    const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
-    bit_array_[hash >> 3u].fetch_or(1u << (hash & 7u), std::memory_order_relaxed);
+    const CppType hash =
+        *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+    bit_vector_.setBit(hash);
   }
 
   /**
@@ -168,12 +163,13 @@ class SingleIdentityHashFilter : public LIPFilter {
    *        If false is returned, a value is certainly not present in the hash filter.
    */
   inline bool contains(const void *key_begin) const {
-    const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
-    return (bit_array_[hash >> 3u].load(std::memory_order_relaxed) & (1u << (hash & 7u)));
+    const CppType hash =
+        *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+    return bit_vector_.getBit(hash);
   }
 
   std::size_t filter_cardinality_;
-  alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+  BarrieredReadWriteConcurrentBitVector bit_vector_;
 
   DISALLOW_COPY_AND_ASSIGN(SingleIdentityHashFilter);
 };


[8/9] incubator-quickstep git commit: Fixed a bug in the distributed version to check rebuild finished.

Posted by zu...@apache.org.
Fixed a bug in the distributed version to check rebuild finished.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c40c5534
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c40c5534
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c40c5534

Branch: refs/heads/two-level-tmb
Commit: c40c5534cbf6f8bc5f1dd5adc43ba705021a8c74
Parents: 8229994
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Feb 9 15:41:04 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Feb 9 15:41:04 2017 -0800

----------------------------------------------------------------------
 query_execution/QueryManagerDistributed.hpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c40c5534/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 759fa70..14401a6 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -152,7 +152,7 @@ class QueryManagerDistributed final : public QueryManagerBase {
 
   bool checkRebuildOver(const dag_node_index index) const override {
     return query_exec_state_->hasRebuildInitiated(index) &&
-           (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
+           query_exec_state_->hasRebuildFinished(index, num_shiftbosses_);
   }
 
   const tmb::client_id foreman_client_id_;


[9/9] incubator-quickstep git commit: Used two TMB implementations in Shiftboss.

Posted by zu...@apache.org.
Used two TMB implementations in Shiftboss.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d8fc9461
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d8fc9461
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d8fc9461

Branch: refs/heads/two-level-tmb
Commit: d8fc9461b985ebbdc6c8feee3f3ce874de410f05
Parents: c40c553
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Feb 8 12:48:31 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Feb 9 15:42:36 2017 -0800

----------------------------------------------------------------------
 cli/distributed/Executor.cpp                    |   7 +-
 cli/distributed/Executor.hpp                    |   4 +
 query_execution/Shiftboss.cpp                   | 334 ++++++++++---------
 query_execution/Shiftboss.hpp                   |  79 +++--
 .../DistributedExecutionGeneratorTestRunner.cpp |   8 +-
 .../DistributedExecutionGeneratorTestRunner.hpp |   1 +
 6 files changed, 229 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/cli/distributed/Executor.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp
index 1d03579..5cc7df0 100644
--- a/cli/distributed/Executor.cpp
+++ b/cli/distributed/Executor.cpp
@@ -35,6 +35,7 @@
 
 #include "tmb/id_typedefs.h"
 #include "tmb/native_net_client_message_bus.h"
+#include "tmb/pure_memory_message_bus.h"
 
 #include "glog/logging.h"
 
@@ -47,6 +48,8 @@ using tmb::client_id;
 namespace quickstep {
 
 void Executor::init() {
+  bus_local_.Initialize();
+
   executor_client_id_ = bus_.Connect();
   DLOG(INFO) << "Executor TMB Client ID: " << executor_client_id_;
 
@@ -59,7 +62,7 @@ void Executor::init() {
   for (std::size_t worker_thread_index = 0;
        worker_thread_index < FLAGS_num_workers;
        ++worker_thread_index) {
-    workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_));
+    workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_local_));
     worker_client_ids.push_back(workers_.back()->getBusClientID());
   }
 
@@ -76,7 +79,7 @@ void Executor::init() {
   data_exchanger_.start();
 
   shiftboss_ =
-      make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get());
+      make_unique<Shiftboss>(&bus_, &bus_local_, storage_manager_.get(), worker_directory_.get());
   shiftboss_->start();
 
   for (const auto &worker : workers_) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/cli/distributed/Executor.hpp
----------------------------------------------------------------------
diff --git a/cli/distributed/Executor.hpp b/cli/distributed/Executor.hpp
index 6ffa756..aafeeae 100644
--- a/cli/distributed/Executor.hpp
+++ b/cli/distributed/Executor.hpp
@@ -24,6 +24,7 @@
 #include <vector>
 
 #include "cli/distributed/Role.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/Shiftboss.hpp"
 #include "query_execution/Worker.hpp"
 #include "query_execution/WorkerDirectory.hpp"
@@ -65,6 +66,9 @@ class Executor final : public Role {
   void run() override {}
 
  private:
+  // Used between Shiftboss and Workers.
+  MessageBusImpl bus_local_;
+
   tmb::client_id executor_client_id_;
 
   std::vector<std::unique_ptr<Worker>> workers_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 2ed42d0..5e6014d 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -75,156 +75,164 @@ void Shiftboss::run() {
   for (;;) {
     // Receive() is a blocking call, causing this thread to sleep until next
     // message is received.
-    AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
-    DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-               << "') received the typed '" << annotated_message.tagged_message.message_type()
-               << "' message from client " << annotated_message.sender;
-    switch (annotated_message.tagged_message.message_type()) {
-      case kQueryInitiateMessage: {
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::QueryInitiateMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context());
-        break;
-      }
-      case kWorkOrderMessage: {
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::WorkOrderMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        const std::size_t query_id = proto.query_id();
-        DCHECK_EQ(1u, query_contexts_.count(query_id));
-
-        WorkOrder *work_order = WorkOrderFactory::ReconstructFromProto(proto.work_order(),
-                                                                       shiftboss_index_,
-                                                                       &database_cache_,
-                                                                       query_contexts_[query_id].get(),
-                                                                       storage_manager_,
-                                                                       shiftboss_client_id_,
-                                                                       bus_);
-
-        unique_ptr<WorkerMessage> worker_message(
-            WorkerMessage::WorkOrderMessage(work_order, proto.operator_index()));
-
-        TaggedMessage worker_tagged_message(worker_message.get(),
-                                            sizeof(*worker_message),
-                                            kWorkOrderMessage);
-
-        const size_t worker_index = getSchedulableWorker();
-        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-                   << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
-                   << "') from Foreman to worker " << worker_index;
-
-        const MessageBus::SendStatus send_status =
-            QueryExecutionUtil::SendTMBMessage(bus_,
-                                               shiftboss_client_id_,
-                                               workers_->getClientID(worker_index),
-                                               move(worker_tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK);
-        break;
-      }
-      case kInitiateRebuildMessage: {
-        // Construct rebuild work orders, and send back their number to
-        // 'ForemanDistributed'.
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::InitiateRebuildMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
-        processInitiateRebuildMessage(proto.query_id(),
-                                      proto.operator_index(),
-                                      proto.insert_destination_index(),
-                                      proto.relation_id());
-        break;
-      }
-      case kCatalogRelationNewBlockMessage:  // Fall through.
-      case kDataPipelineMessage:
-      case kWorkOrderFeedbackMessage:
-      case kWorkOrderCompleteMessage:
-      case kRebuildWorkOrderCompleteMessage: {
-        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-                   << "') forwarded typed '" << annotated_message.tagged_message.message_type()
-                   << "' message from Worker with TMB client ID '" << annotated_message.sender
-                   << "' to Foreman with TMB client ID " << foreman_client_id_;
-
-        DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
-        const MessageBus::SendStatus send_status =
-            QueryExecutionUtil::SendTMBMessage(bus_,
-                                               shiftboss_client_id_,
-                                               foreman_client_id_,
-                                               move(annotated_message.tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK);
-        break;
-      }
-      case kQueryTeardownMessage: {
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
+    AnnotatedMessage annotated_message;
+    if (bus_global_->ReceiveIfAvailable(shiftboss_client_id_global_, &annotated_message, 0, true)) {
+      DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
+                 << "') received the typed '" << annotated_message.tagged_message.message_type()
+                 << "' message from Foreman " << annotated_message.sender;
+      switch (annotated_message.tagged_message.message_type()) {
+        case kQueryInitiateMessage: {
+          const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+          serialization::QueryInitiateMessage proto;
+          CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+          processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context());
+          break;
+        }
+        case kWorkOrderMessage: {
+          const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+          serialization::WorkOrderMessage proto;
+          CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+          const std::size_t query_id = proto.query_id();
+          DCHECK_EQ(1u, query_contexts_.count(query_id));
+
+          WorkOrder *work_order = WorkOrderFactory::ReconstructFromProto(proto.work_order(),
+                                                                         shiftboss_index_,
+                                                                         &database_cache_,
+                                                                         query_contexts_[query_id].get(),
+                                                                         storage_manager_,
+                                                                         shiftboss_client_id_local_,
+                                                                         bus_local_);
+
+          unique_ptr<WorkerMessage> worker_message(
+              WorkerMessage::WorkOrderMessage(work_order, proto.operator_index()));
+
+          TaggedMessage worker_tagged_message(worker_message.get(),
+                                              sizeof(*worker_message),
+                                              kWorkOrderMessage);
+
+          const size_t worker_index = getSchedulableWorker();
+          DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_local_
+                     << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
+                     << "') from Foreman to worker " << worker_index;
+
+          const MessageBus::SendStatus send_status =
+              QueryExecutionUtil::SendTMBMessage(bus_local_,
+                                                 shiftboss_client_id_local_,
+                                                 workers_->getClientID(worker_index),
+                                                 move(worker_tagged_message));
+          CHECK(send_status == MessageBus::SendStatus::kOK);
+          break;
+        }
+        case kInitiateRebuildMessage: {
+          // Construct rebuild work orders, and send back their number to
+          // 'ForemanDistributed'.
+          const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+          serialization::InitiateRebuildMessage proto;
+          CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+          processInitiateRebuildMessage(proto.query_id(),
+                                        proto.operator_index(),
+                                        proto.insert_destination_index(),
+                                        proto.relation_id());
+          break;
+        }
+        case kQueryTeardownMessage: {
+          const TaggedMessage &tagged_message = annotated_message.tagged_message;
 
-        serialization::QueryTeardownMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+          serialization::QueryTeardownMessage proto;
+          CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
 
-        query_contexts_.erase(proto.query_id());
-        break;
+          query_contexts_.erase(proto.query_id());
+          break;
+        }
+        case kSaveQueryResultMessage: {
+          const TaggedMessage &tagged_message = annotated_message.tagged_message;
+
+          serialization::SaveQueryResultMessage proto;
+          CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+          for (int i = 0; i < proto.blocks_size(); ++i) {
+            storage_manager_->saveBlockOrBlob(proto.blocks(i));
+          }
+
+          // Clean up query execution states, i.e., QueryContext.
+          query_contexts_.erase(proto.query_id());
+
+          serialization::SaveQueryResultResponseMessage proto_response;
+          proto_response.set_query_id(proto.query_id());
+          proto_response.set_relation_id(proto.relation_id());
+          proto_response.set_cli_id(proto.cli_id());
+          proto_response.set_shiftboss_index(shiftboss_index_);
+
+          const size_t proto_response_length = proto_response.ByteSize();
+          char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
+          CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
+
+          TaggedMessage message_response(static_cast<const void*>(proto_response_bytes),
+                                         proto_response_length,
+                                         kSaveQueryResultResponseMessage);
+          free(proto_response_bytes);
+
+          DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
+                     << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
+                     << "') to Foreman with TMB client ID " << foreman_client_id_;
+          const MessageBus::SendStatus send_status =
+              QueryExecutionUtil::SendTMBMessage(bus_global_,
+                                                 shiftboss_client_id_global_,
+                                                 foreman_client_id_,
+                                                 move(message_response));
+          CHECK(send_status == MessageBus::SendStatus::kOK);
+          break;
+        }
+        case kPoisonMessage: {
+          DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
+                     << "') forwarded PoisonMessage (typed '" << kPoisonMessage
+                     << "') from Foreman to all workers";
+
+          tmb::MessageStyle broadcast_style;
+          broadcast_style.Broadcast(true);
+
+          const MessageBus::SendStatus send_status =
+              bus_local_->Send(shiftboss_client_id_local_, worker_addresses_, broadcast_style,
+                               move(annotated_message.tagged_message));
+          CHECK(send_status == MessageBus::SendStatus::kOK);
+          return;
+        }
+        default: {
+          LOG(FATAL) << "Unknown TMB message type";
+        }
       }
-      case kSaveQueryResultMessage: {
-        const TaggedMessage &tagged_message = annotated_message.tagged_message;
-
-        serialization::SaveQueryResultMessage proto;
-        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+    }
 
-        for (int i = 0; i < proto.blocks_size(); ++i) {
-          storage_manager_->saveBlockOrBlob(proto.blocks(i));
+    while (bus_local_->ReceiveIfAvailable(shiftboss_client_id_local_, &annotated_message, 0, true)) {
+      switch (annotated_message.tagged_message.message_type()) {
+        case kCatalogRelationNewBlockMessage:
+        case kDataPipelineMessage:
+        case kWorkOrderFeedbackMessage:
+        case kWorkOrderCompleteMessage:
+        case kRebuildWorkOrderCompleteMessage: {
+          DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
+                     << "') forwarded typed '" << annotated_message.tagged_message.message_type()
+                     << "' message from Worker with TMB client ID '" << annotated_message.sender
+                     << "' to Foreman with TMB client ID " << foreman_client_id_;
+
+          DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
+          const MessageBus::SendStatus send_status =
+              QueryExecutionUtil::SendTMBMessage(bus_global_,
+                                                 shiftboss_client_id_global_,
+                                                 foreman_client_id_,
+                                                 move(annotated_message.tagged_message));
+          CHECK(send_status == MessageBus::SendStatus::kOK);
+          break;
+        }
+        default: {
+          LOG(FATAL) << "Unknown TMB message type";
         }
-
-        // Clean up query execution states, i.e., QueryContext.
-        query_contexts_.erase(proto.query_id());
-
-        serialization::SaveQueryResultResponseMessage proto_response;
-        proto_response.set_query_id(proto.query_id());
-        proto_response.set_relation_id(proto.relation_id());
-        proto_response.set_cli_id(proto.cli_id());
-        proto_response.set_shiftboss_index(shiftboss_index_);
-
-        const size_t proto_response_length = proto_response.ByteSize();
-        char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
-        CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
-
-        TaggedMessage message_response(static_cast<const void*>(proto_response_bytes),
-                                       proto_response_length,
-                                       kSaveQueryResultResponseMessage);
-        free(proto_response_bytes);
-
-        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-                   << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
-                   << "') to Foreman with TMB client ID " << foreman_client_id_;
-        const MessageBus::SendStatus send_status =
-            QueryExecutionUtil::SendTMBMessage(bus_,
-                                               shiftboss_client_id_,
-                                               foreman_client_id_,
-                                               move(message_response));
-        CHECK(send_status == MessageBus::SendStatus::kOK);
-        break;
-      }
-      case kPoisonMessage: {
-        DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
-                   << "') forwarded PoisonMessage (typed '" << kPoisonMessage
-                   << "') from Foreman to all workers";
-
-        tmb::MessageStyle broadcast_style;
-        broadcast_style.Broadcast(true);
-
-        const MessageBus::SendStatus send_status =
-            bus_->Send(shiftboss_client_id_,
-                       worker_addresses_,
-                       broadcast_style,
-                       move(annotated_message.tagged_message));
-        CHECK(send_status == MessageBus::SendStatus::kOK);
-        return;
-      }
-      default: {
-        LOG(FATAL) << "Unknown TMB message type";
       }
     }
   }
@@ -264,21 +272,21 @@ void Shiftboss::registerWithForeman() {
                         kShiftbossRegistrationMessage);
   free(proto_bytes);
 
-  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
+  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
              << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
              << "') to all";
   tmb::MessageBus::SendStatus send_status =
-      bus_->Send(shiftboss_client_id_, all_addresses, style, move(message));
+      bus_global_->Send(shiftboss_client_id_global_, all_addresses, style, move(message));
   DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
 }
 
 void Shiftboss::processShiftbossRegistrationResponseMessage() {
-  AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
+  AnnotatedMessage annotated_message(bus_global_->Receive(shiftboss_client_id_global_, 0, true));
   const TaggedMessage &tagged_message = annotated_message.tagged_message;
   DCHECK_EQ(kShiftbossRegistrationResponseMessage, tagged_message.message_type());
 
   foreman_client_id_ = annotated_message.sender;
-  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
+  DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_local_
              << "') received the typed '" << kShiftbossRegistrationResponseMessage
              << "' message from ForemanDistributed with client " << foreman_client_id_;
 
@@ -289,10 +297,10 @@ void Shiftboss::processShiftbossRegistrationResponseMessage() {
   storage_manager_->sendBlockDomainToShiftbossIndexMessage(shiftboss_index_);
 
   // Forward this message to Workers regarding <shiftboss_index_>.
-  QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_,
+  QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_local_,
                                        worker_addresses_,
                                        move(annotated_message.tagged_message),
-                                       bus_);
+                                       bus_local_);
 }
 
 void Shiftboss::processQueryInitiateMessage(
@@ -302,7 +310,7 @@ void Shiftboss::processQueryInitiateMessage(
   database_cache_.update(catalog_database_cache_proto);
 
   auto query_context = std::make_unique<QueryContext>(
-      query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_, bus_);
+      query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_local_, bus_local_);
   query_contexts_.emplace(query_id, move(query_context));
 
   serialization::QueryInitiateResponseMessage proto;
@@ -317,12 +325,12 @@ void Shiftboss::processQueryInitiateMessage(
                                  kQueryInitiateResponseMessage);
   free(proto_bytes);
 
-  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
+  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
              << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage
              << "') to Foreman with TMB client ID " << foreman_client_id_;
   const MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         shiftboss_client_id_,
+      QueryExecutionUtil::SendTMBMessage(bus_global_,
+                                         shiftboss_client_id_global_,
                                          foreman_client_id_,
                                          move(message_response));
   CHECK(send_status == MessageBus::SendStatus::kOK);
@@ -356,12 +364,12 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                                  kInitiateRebuildResponseMessage);
   free(proto_bytes);
 
-  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
+  DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_
              << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
              << "') to Foreman with TMB client ID " << foreman_client_id_;
   const MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         shiftboss_client_id_,
+      QueryExecutionUtil::SendTMBMessage(bus_global_,
+                                         shiftboss_client_id_global_,
                                          foreman_client_id_,
                                          move(message_response));
   CHECK(send_status == MessageBus::SendStatus::kOK);
@@ -374,8 +382,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                              move(partially_filled_block_refs[i]),
                              op_index,
                              rel_id,
-                             shiftboss_client_id_,
-                             bus_);
+                             shiftboss_client_id_local_,
+                             bus_local_);
 
     unique_ptr<WorkerMessage> worker_message(
         WorkerMessage::RebuildWorkOrderMessage(rebuild_work_order, op_index));
@@ -385,13 +393,13 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                                         kRebuildWorkOrderMessage);
 
     const size_t worker_index = getSchedulableWorker();
-    DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_
+    DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_local_
                << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
                << "') to worker " << worker_index;
 
     const MessageBus::SendStatus send_status =
-        QueryExecutionUtil::SendTMBMessage(bus_,
-                                           shiftboss_client_id_,
+        QueryExecutionUtil::SendTMBMessage(bus_local_,
+                                           shiftboss_client_id_local_,
                                            workers_->getClientID(worker_index),
                                            move(worker_tagged_message));
     CHECK(send_status == MessageBus::SendStatus::kOK);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 6538d48..4864988 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -61,7 +61,8 @@ class Shiftboss : public Thread {
   /**
    * @brief Constructor.
    *
-   * @param bus A pointer to the TMB.
+   * @param bus_global A pointer to the TMB for Foreman.
+   * @param bus_local A pointer to the TMB for Workers.
    * @param storage_manager The StorageManager to use.
    * @param workers A pointer to the WorkerDirectory.
    * @param cpu_id The ID of the CPU to which the Shiftboss thread can be pinned.
@@ -69,69 +70,75 @@ class Shiftboss : public Thread {
    * @note If cpu_id is not specified, Shiftboss thread can be possibly moved
    *       around on different CPUs by the OS.
   **/
-  Shiftboss(tmb::MessageBus *bus,
+  Shiftboss(tmb::MessageBus *bus_global,
+            tmb::MessageBus *bus_local,
             StorageManager *storage_manager,
             WorkerDirectory *workers,
             const int cpu_id = -1)
-      : bus_(DCHECK_NOTNULL(bus)),
+      : bus_global_(DCHECK_NOTNULL(bus_global)),
+        bus_local_(DCHECK_NOTNULL(bus_local)),
         storage_manager_(DCHECK_NOTNULL(storage_manager)),
         workers_(DCHECK_NOTNULL(workers)),
         cpu_id_(cpu_id),
-        shiftboss_client_id_(tmb::kClientIdNone),
+        shiftboss_client_id_global_(tmb::kClientIdNone),
+        shiftboss_client_id_local_(tmb::kClientIdNone),
         foreman_client_id_(tmb::kClientIdNone),
         max_msgs_per_worker_(1),
         start_worker_index_(0u) {
     // Check to have at least one Worker.
     DCHECK_GT(workers->getNumWorkers(), 0u);
 
-    shiftboss_client_id_ = bus_->Connect();
-    LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_;
-    DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone);
+    shiftboss_client_id_global_ = bus_global_->Connect();
+    LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_global_;
+    DCHECK_NE(shiftboss_client_id_global_, tmb::kClientIdNone);
+
+    shiftboss_client_id_local_ = bus_local_->Connect();
+    DCHECK_NE(shiftboss_client_id_local_, tmb::kClientIdNone);
 
     // Messages between Foreman and Shiftboss.
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationMessage);
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);
+    bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kShiftbossRegistrationMessage);
+    bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kShiftbossRegistrationResponseMessage);
 
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryInitiateMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryInitiateResponseMessage);
+    bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kQueryInitiateMessage);
+    bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kQueryInitiateResponseMessage);
 
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage);
+    bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kInitiateRebuildMessage);
+    bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kInitiateRebuildResponseMessage);
 
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
+    bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kSaveQueryResultMessage);
+    bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kSaveQueryResultResponseMessage);
 
     // Message sent to Worker.
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);
+    bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kShiftbossRegistrationResponseMessage);
+    bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kRebuildWorkOrderMessage);
 
     // Forward the following message types from Foreman to Workers.
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage);
+    bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kWorkOrderMessage);
+    bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kWorkOrderMessage);
 
     // Forward the following message types from Workers to Foreman.
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
+    bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kCatalogRelationNewBlockMessage);
+    bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kCatalogRelationNewBlockMessage);
 
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kDataPipelineMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
+    bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kDataPipelineMessage);
+    bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kDataPipelineMessage);
 
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderFeedbackMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
+    bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kWorkOrderFeedbackMessage);
+    bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kWorkOrderFeedbackMessage);
 
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage);
+    bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kWorkOrderCompleteMessage);
+    bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kWorkOrderCompleteMessage);
 
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
+    bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kRebuildWorkOrderCompleteMessage);
+    bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kRebuildWorkOrderCompleteMessage);
 
     // Clean up query execution states, i.e., QueryContext.
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage);
+    bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kQueryTeardownMessage);
 
     // Stop itself.
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);
+    bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kPoisonMessage);
     // Stop all workers.
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kPoisonMessage);
+    bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kPoisonMessage);
 
     for (std::size_t i = 0; i < workers_->getNumWorkers(); ++i) {
       worker_addresses_.AddRecipient(workers_->getClientID(i));
@@ -149,7 +156,7 @@ class Shiftboss : public Thread {
    * @return TMB client ID of shiftboss thread.
    **/
   inline tmb::client_id getBusClientID() const {
-    return shiftboss_client_id_;
+    return shiftboss_client_id_global_;
   }
 
   /**
@@ -220,9 +227,7 @@ class Shiftboss : public Thread {
                                      const QueryContext::insert_destination_id dest_index,
                                      const relation_id rel_id);
 
-  // TODO(zuyu): Use two buses for the message communication between Foreman and Shiftboss,
-  // and Shiftboss and Worker thread pool.
-  tmb::MessageBus *bus_;
+  tmb::MessageBus *bus_global_, *bus_local_;
 
   CatalogDatabaseCache database_cache_;
   StorageManager *storage_manager_;
@@ -231,7 +236,7 @@ class Shiftboss : public Thread {
   // The ID of the CPU that the Shiftboss thread can optionally be pinned to.
   const int cpu_id_;
 
-  tmb::client_id shiftboss_client_id_, foreman_client_id_;
+  tmb::client_id shiftboss_client_id_global_, shiftboss_client_id_local_, foreman_client_id_;
 
   // Unique per Shiftboss instance.
   std::uint64_t shiftboss_index_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
index 2e18467..71965e6 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp
@@ -76,6 +76,7 @@ const char *DistributedExecutionGeneratorTestRunner::kResetOption =
 
 DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner(const string &storage_path)
     : query_id_(0),
+      bus_locals_(kNumInstances),
       data_exchangers_(kNumInstances) {
   bus_.Initialize();
 
@@ -113,7 +114,10 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
                                         kAnyNUMANodeID);
 
   for (int i = 0; i < kNumInstances; ++i) {
-    workers_.push_back(make_unique<Worker>(0 /* worker_thread_index */, &bus_));
+    tmb::MessageBus *bus_local = &bus_locals_[i];
+    bus_local->Initialize();
+
+    workers_.push_back(make_unique<Worker>(0 /* worker_thread_index */, bus_local));
 
     const vector<tmb::client_id> worker_client_ids(1, workers_.back()->getBusClientID());
     worker_directories_.push_back(
@@ -128,7 +132,7 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner
 
     data_exchangers_[i].set_storage_manager(storage_manager.get());
     shiftbosses_.push_back(
-        make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get()));
+        make_unique<Shiftboss>(&bus_, bus_local, storage_manager.get(), worker_directories_.back().get()));
 
     storage_managers_.push_back(move(storage_manager));
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
index 63e320d..2cd2427 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp
@@ -129,6 +129,7 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner {
 
   std::unique_ptr<ForemanDistributed> foreman_;
 
+  std::vector<MessageBusImpl> bus_locals_;
   std::vector<std::unique_ptr<Worker>> workers_;
   std::vector<std::unique_ptr<WorkerDirectory>> worker_directories_;
   std::vector<DataExchangerAsync> data_exchangers_;



[2/9] incubator-quickstep git commit: Minor refactored distributed query execution.

Posted by zu...@apache.org.
Minor refactored distributed query execution.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3011ddf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3011ddf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3011ddf6

Branch: refs/heads/two-level-tmb
Commit: 3011ddf61ec92efcb833ef0a1168255ff97fb9f9
Parents: 5773027
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Feb 8 17:36:45 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Feb 8 17:42:42 2017 -0800

----------------------------------------------------------------------
 query_execution/ForemanDistributed.cpp        |  1 -
 query_execution/PolicyEnforcerBase.cpp        |  2 -
 query_execution/PolicyEnforcerBase.hpp        | 14 -----
 query_execution/PolicyEnforcerDistributed.cpp | 59 ++++++++++------------
 4 files changed, 27 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 4d95f16..8c20e65 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -175,7 +175,6 @@ void ForemanDistributed::run() {
       case kQueryInitiateResponseMessage: {
         S::QueryInitiateResponseMessage proto;
         CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-        CHECK(policy_enforcer_->existQuery(proto.query_id()));
         break;
       }
       case kCatalogRelationNewBlockMessage:  // Fall through

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index a26b84e..082f6e9 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -156,8 +156,6 @@ void PolicyEnforcerBase::removeQuery(const std::size_t query_id) {
                  << " that hasn't finished its execution";
   }
   admitted_queries_.erase(query_id);
-
-  removed_query_ids_.insert(query_id);
 }
 
 bool PolicyEnforcerBase::admitQueries(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index baf9c68..4107817 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -103,16 +103,6 @@ class PolicyEnforcerBase {
   void processMessage(const TaggedMessage &tagged_message);
 
   /**
-   * @brief Check if the given query id ever exists.
-   *
-   * @return True if the query ever exists, otherwise false.
-   **/
-  inline bool existQuery(const std::size_t query_id) const {
-    return admitted_queries_.find(query_id) != admitted_queries_.end() ||
-           removed_query_ids_.find(query_id) != removed_query_ids_.end();
-  }
-
-  /**
    * @brief Check if there are any queries to be executed.
    *
    * @return True if there is at least one active or waiting query, false if
@@ -179,10 +169,6 @@ class PolicyEnforcerBase {
   // Key = query ID, value = QueryManagerBase* for the key query.
   std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_;
 
-  // TODO(quickstep-team): Delete a 'query_id' after receiving all
-  // 'QueryInitiateResponseMessage's for the 'query_id'.
-  std::unordered_set<std::size_t> removed_query_ids_;
-
   // The queries which haven't been admitted yet.
   std::queue<QueryHandle*> waiting_queries_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 49a1d9a..ef5abb0 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -68,8 +68,15 @@ void PolicyEnforcerDistributed::getWorkOrderProtoMessages(
   // TODO(harshad) - Make this function generic enough so that it
   // works well when multiple queries are getting executed.
   if (admitted_queries_.empty()) {
-    LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running";
-    return;
+    if (waiting_queries_.empty()) {
+      LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running";
+      return;
+    } else {
+      // Admit the earliest waiting query.
+      QueryHandle *new_query = waiting_queries_.front();
+      waiting_queries_.pop();
+      admitQuery(new_query);
+    }
   }
 
   const std::size_t per_query_share =
@@ -106,28 +113,28 @@ void PolicyEnforcerDistributed::getWorkOrderProtoMessages(
 }
 
 bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) {
-  if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
-    // Ok to admit the query.
-    const std::size_t query_id = query_handle->query_id();
-    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
-      // NOTE(zuyu): Should call before constructing a 'QueryManager'.
-      // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext'
-      // initializes.
-      initiateQueryInShiftboss(query_handle);
-
-      // Query with the same ID not present, ok to admit.
-      admitted_queries_[query_id].reset(
-          new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_));
-      return true;
-    } else {
-      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
-      return false;
-    }
-  } else {
+  if (admitted_queries_.size() >= PolicyEnforcerBase::kMaxConcurrentQueries) {
     // This query will have to wait.
     waiting_queries_.push(query_handle);
     return false;
   }
+
+  const std::size_t query_id = query_handle->query_id();
+  if (admitted_queries_.find(query_id) != admitted_queries_.end()) {
+    LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+    return false;
+  }
+
+  // Ok to admit the query.
+  // NOTE(zuyu): Should call before constructing a 'QueryManager'.
+  // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext'
+  // initializes.
+  initiateQueryInShiftboss(query_handle);
+
+  // Query with the same ID not present, ok to admit.
+  admitted_queries_[query_id].reset(
+      new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_));
+  return true;
 }
 
 void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message) {
@@ -144,18 +151,6 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb:
   query_manager->processInitiateRebuildResponseMessage(
       proto.operator_index(), num_rebuild_work_orders, shiftboss_index);
   shiftboss_directory_->addNumQueuedWorkOrders(shiftboss_index, num_rebuild_work_orders);
-
-  if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
-    onQueryCompletion(query_manager);
-
-    removeQuery(query_id);
-    if (!waiting_queries_.empty()) {
-      // Admit the earliest waiting query.
-      QueryHandle *new_query = waiting_queries_.front();
-      waiting_queries_.pop();
-      admitQuery(new_query);
-    }
-  }
 }
 
 void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(


[3/9] incubator-quickstep git commit: Fixed the dangling reference bug in CreateIndexOperator.

Posted by zu...@apache.org.
Fixed the dangling reference bug in CreateIndexOperator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/266b9b9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/266b9b9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/266b9b9a

Branch: refs/heads/two-level-tmb
Commit: 266b9b9a96d94a4461eb451cb214ea545f3e9415
Parents: 3011ddf
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Feb 8 17:44:00 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Feb 8 17:44:00 2017 -0800

----------------------------------------------------------------------
 relational_operators/CreateIndexOperator.hpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/266b9b9a/relational_operators/CreateIndexOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/CreateIndexOperator.hpp b/relational_operators/CreateIndexOperator.hpp
index fa992c9..0286e1c 100644
--- a/relational_operators/CreateIndexOperator.hpp
+++ b/relational_operators/CreateIndexOperator.hpp
@@ -94,7 +94,7 @@ class CreateIndexOperator : public RelationalOperator {
 
  private:
   CatalogRelation *relation_;
-  const std::string &index_name_;
+  const std::string index_name_;
   IndexSubBlockDescription index_description_;
 
   DISALLOW_COPY_AND_ASSIGN(CreateIndexOperator);


[5/9] incubator-quickstep git commit: Fuse Aggregate with LeftOuterJoin to accelerate evaluation.

Posted by zu...@apache.org.
Fuse Aggregate with LeftOuterJoin to accelerate evaluation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a28b1e4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a28b1e4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a28b1e4d

Branch: refs/heads/two-level-tmb
Commit: a28b1e4d77ee12466b0801a5a7c5185f7a83e7f8
Parents: 266b9b9
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Mon Jan 30 14:46:39 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Wed Feb 8 23:55:32 2017 -0600

----------------------------------------------------------------------
 query_optimizer/CMakeLists.txt                  |   6 +-
 query_optimizer/ExecutionGenerator.cpp          | 261 +++++++++++--------
 query_optimizer/ExecutionGenerator.hpp          |  20 +-
 query_optimizer/PhysicalGenerator.cpp           |   3 +
 query_optimizer/cost_model/CMakeLists.txt       |   8 +
 query_optimizer/cost_model/SimpleCostModel.cpp  |   9 +
 query_optimizer/cost_model/SimpleCostModel.hpp  |   5 +
 .../cost_model/StarSchemaSimpleCostModel.cpp    | 148 ++++++++++-
 .../cost_model/StarSchemaSimpleCostModel.hpp    |  20 ++
 query_optimizer/physical/CMakeLists.txt         |  14 +
 .../CrossReferenceCoalesceAggregate.cpp         | 105 ++++++++
 .../CrossReferenceCoalesceAggregate.hpp         | 232 +++++++++++++++++
 query_optimizer/physical/PatternMatcher.hpp     |   3 +
 query_optimizer/physical/PhysicalType.hpp       |   1 +
 query_optimizer/rules/BottomUpRule.hpp          |  39 +--
 query_optimizer/rules/CMakeLists.txt            |  23 ++
 query_optimizer/rules/FuseAggregateJoin.cpp     | 170 ++++++++++++
 query_optimizer/rules/FuseAggregateJoin.hpp     |  71 +++++
 .../BuildAggregationExistenceMapOperator.cpp    | 196 ++++++++++++++
 .../BuildAggregationExistenceMapOperator.hpp    | 177 +++++++++++++
 relational_operators/CMakeLists.txt             |  30 +++
 relational_operators/WorkOrder.proto            |  12 +-
 relational_operators/WorkOrderFactory.cpp       |  37 +++
 storage/AggregationOperationState.cpp           |   8 +-
 storage/AggregationOperationState.hpp           |   9 +
 storage/CollisionFreeVectorTable.hpp            |   9 +
 utility/lip_filter/BitVectorExactFilter.hpp     |  27 +-
 utility/lip_filter/CMakeLists.txt               |  12 +-
 utility/lip_filter/SingleIdentityHashFilter.hpp |  22 +-
 29 files changed, 1489 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index e750a1e..3ff783c 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -64,7 +64,6 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_expressions_Expressions_proto
                       quickstep_expressions_aggregation_AggregateFunction
                       quickstep_expressions_aggregation_AggregateFunction_proto
-                      quickstep_expressions_aggregation_AggregationID
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
                       quickstep_expressions_scalar_ScalarAttribute
@@ -95,6 +94,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_queryoptimizer_physical_CopyFrom
                       quickstep_queryoptimizer_physical_CreateIndex
                       quickstep_queryoptimizer_physical_CreateTable
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
                       quickstep_queryoptimizer_physical_DeleteTuples
                       quickstep_queryoptimizer_physical_DropTable
                       quickstep_queryoptimizer_physical_FilterJoin
@@ -116,6 +116,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_queryoptimizer_physical_UpdateTable
                       quickstep_queryoptimizer_physical_WindowAggregate
                       quickstep_relationaloperators_AggregationOperator
+                      quickstep_relationaloperators_BuildAggregationExistenceMapOperator
                       quickstep_relationaloperators_BuildHashOperator
                       quickstep_relationaloperators_BuildLIPFilterOperator
                       quickstep_relationaloperators_CreateIndexOperator
@@ -147,12 +148,10 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_storage_StorageBlockLayout_proto
                       quickstep_storage_SubBlockTypeRegistry
                       quickstep_types_Type
-                      quickstep_types_TypeID
                       quickstep_types_Type_proto
                       quickstep_types_TypedValue
                       quickstep_types_TypedValue_proto
                       quickstep_types_containers_Tuple_proto
-                      quickstep_utility_EqualsAnyConstant
                       quickstep_utility_Macros
                       quickstep_utility_SqlError)
 if (ENABLE_DISTRIBUTED)
@@ -213,6 +212,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_rules_AttachLIPFilters
+                      quickstep_queryoptimizer_rules_FuseAggregateJoin
                       quickstep_queryoptimizer_rules_InjectJoinFilters
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_PushDownLowCostDisjunctivePredicate

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 1b50caa..70b69e0 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -49,7 +49,6 @@
 #include "expressions/Expressions.pb.h"
 #include "expressions/aggregation/AggregateFunction.hpp"
 #include "expressions/aggregation/AggregateFunction.pb.h"
-#include "expressions/aggregation/AggregationID.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
@@ -72,9 +71,11 @@
 #include "query_optimizer/expressions/Scalar.hpp"
 #include "query_optimizer/expressions/ScalarLiteral.hpp"
 #include "query_optimizer/expressions/WindowAggregateFunction.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/CopyFrom.hpp"
 #include "query_optimizer/physical/CreateIndex.hpp"
 #include "query_optimizer/physical/CreateTable.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/DeleteTuples.hpp"
 #include "query_optimizer/physical/DropTable.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
@@ -96,6 +97,7 @@
 #include "query_optimizer/physical/UpdateTable.hpp"
 #include "query_optimizer/physical/WindowAggregate.hpp"
 #include "relational_operators/AggregationOperator.hpp"
+#include "relational_operators/BuildAggregationExistenceMapOperator.hpp"
 #include "relational_operators/BuildHashOperator.hpp"
 #include "relational_operators/BuildLIPFilterOperator.hpp"
 #include "relational_operators/CreateIndexOperator.hpp"
@@ -128,11 +130,9 @@
 #include "storage/SubBlockTypeRegistry.hpp"
 #include "types/Type.hpp"
 #include "types/Type.pb.h"
-#include "types/TypeID.hpp"
 #include "types/TypedValue.hpp"
 #include "types/TypedValue.pb.h"
 #include "types/containers/Tuple.pb.h"
-#include "utility/EqualsAnyConstant.hpp"
 #include "utility/SqlError.hpp"
 
 #include "gflags/gflags.h"
@@ -163,10 +163,6 @@ static const volatile bool aggregate_hashtable_type_dummy
 
 DEFINE_bool(parallelize_load, true, "Parallelize loading data files.");
 
-DEFINE_int64(collision_free_vector_table_max_size, 1000000000,
-              "The maximum allowed key range (number of entries) for using a "
-              "CollisionFreeVectorTable.");
-
 namespace E = ::quickstep::optimizer::expressions;
 namespace P = ::quickstep::optimizer::physical;
 namespace S = ::quickstep::serialization;
@@ -266,6 +262,9 @@ void ExecutionGenerator::generatePlanInternal(
     case P::PhysicalType::kAggregate:
       return convertAggregate(
           std::static_pointer_cast<const P::Aggregate>(physical_plan));
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+      return convertCrossReferenceCoalesceAggregate(
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
     case P::PhysicalType::kCopyFrom:
       return convertCopyFrom(
           std::static_pointer_cast<const P::CopyFrom>(physical_plan));
@@ -379,105 +378,6 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
   }
 }
 
-bool ExecutionGenerator::canUseCollisionFreeAggregation(
-    const P::AggregatePtr &aggregate,
-    const std::size_t estimated_num_groups,
-    std::size_t *max_num_groups) const {
-#ifdef QUICKSTEP_DISTRIBUTED
-  // Currently we cannot do this fast path with the distributed setting. See
-  // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
-  // FinalizeAggregationOperator::getAllWorkOrderProtos().
-  return false;
-#endif
-
-  // Supports only single group-by key.
-  if (aggregate->grouping_expressions().size() != 1) {
-    return false;
-  }
-
-  // We need to know the exact min/max stats of the group-by key.
-  // So it must be a CatalogAttribute (but not an expression).
-  E::AttributeReferencePtr group_by_key_attr;
-  const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
-  if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
-    return false;
-  }
-
-  bool min_value_stat_is_exact;
-  bool max_value_stat_is_exact;
-  const TypedValue min_value =
-      cost_model_for_aggregation_->findMinValueStat(
-          aggregate, group_by_key_attr, &min_value_stat_is_exact);
-  const TypedValue max_value =
-      cost_model_for_aggregation_->findMaxValueStat(
-          aggregate, group_by_key_attr, &max_value_stat_is_exact);
-  if (min_value.isNull() || max_value.isNull() ||
-      (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
-    return false;
-  }
-
-  std::int64_t min_cpp_value;
-  std::int64_t max_cpp_value;
-  switch (group_by_key_attr->getValueType().getTypeID()) {
-    case TypeID::kInt: {
-      min_cpp_value = min_value.getLiteral<int>();
-      max_cpp_value = max_value.getLiteral<int>();
-      break;
-    }
-    case TypeID::kLong: {
-      min_cpp_value = min_value.getLiteral<std::int64_t>();
-      max_cpp_value = max_value.getLiteral<std::int64_t>();
-      break;
-    }
-    default:
-      return false;
-  }
-
-  // TODO(jianqiao):
-  // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
-  // 2. Reason about the table size bound (e.g. by checking memory size) instead
-  //    of hardcoding it as a gflag.
-  if (min_cpp_value < 0 ||
-      max_cpp_value >= FLAGS_collision_free_vector_table_max_size ||
-      max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
-    return false;
-  }
-
-  for (const auto &agg_expr : aggregate->aggregate_expressions()) {
-    const E::AggregateFunctionPtr agg_func =
-        std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
-
-    if (agg_func->is_distinct()) {
-      return false;
-    }
-
-    // TODO(jianqiao): Support AggregationID::AVG.
-    if (!QUICKSTEP_EQUALS_ANY_CONSTANT(agg_func->getAggregate().getAggregationID(),
-                                       AggregationID::kCount,
-                                       AggregationID::kSum)) {
-      return false;
-    }
-
-    const auto &arguments = agg_func->getArguments();
-    if (arguments.size() > 1u) {
-      return false;
-    }
-
-    if (arguments.size() == 1u) {
-      if (!QUICKSTEP_EQUALS_ANY_CONSTANT(arguments.front()->getValueType().getTypeID(),
-                                         TypeID::kInt,
-                                         TypeID::kLong,
-                                         TypeID::kFloat,
-                                         TypeID::kDouble)) {
-        return false;
-      }
-    }
-  }
-
-  *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
-  return true;
-}
-
 void ExecutionGenerator::convertNamedExpressions(
     const std::vector<E::NamedExpressionPtr> &named_expressions,
     S::QueryContext::ScalarGroup *scalar_group_proto) {
@@ -1608,9 +1508,10 @@ void ExecutionGenerator::convertAggregate(
         cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
 
     std::size_t max_num_groups;
-    if (canUseCollisionFreeAggregation(physical_plan,
-                                       estimated_num_groups,
-                                       &max_num_groups)) {
+    if (cost_model_for_aggregation_
+            ->canUseCollisionFreeAggregation(physical_plan,
+                                             estimated_num_groups,
+                                             &max_num_groups)) {
       aggr_state_proto->set_hash_table_impl_type(
           serialization::HashTableImplType::COLLISION_FREE_VECTOR);
       aggr_state_proto->set_estimated_num_entries(max_num_groups);
@@ -1730,6 +1631,148 @@ void ExecutionGenerator::convertAggregate(
   }
 }
 
+void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
+    const P::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+  DCHECK_EQ(1u, physical_plan->left_join_attributes().size());
+  DCHECK_EQ(1u, physical_plan->right_join_attributes().size());
+
+  const CatalogRelationInfo *left_relation_info =
+      findRelationInfoOutputByPhysical(physical_plan->left_child());
+  const CatalogRelationInfo *right_relation_info =
+      findRelationInfoOutputByPhysical(physical_plan->right_child());
+
+  // Create aggr state proto.
+  const QueryContext::aggregation_state_id aggr_state_index =
+      query_context_proto_->aggregation_states_size();
+  S::AggregationOperationState *aggr_state_proto = query_context_proto_->add_aggregation_states();
+
+  aggr_state_proto->set_relation_id(right_relation_info->relation->getID());
+
+  // Group by the right join attribute.
+  std::unique_ptr<const Scalar> execution_group_by_expression(
+      physical_plan->right_join_attributes().front()->concretize(
+          attribute_substitution_map_));
+  aggr_state_proto->add_group_by_expressions()->CopyFrom(
+      execution_group_by_expression->getProto());
+
+  aggr_state_proto->set_hash_table_impl_type(
+      serialization::HashTableImplType::COLLISION_FREE_VECTOR);
+  aggr_state_proto->set_estimated_num_entries(
+      physical_plan->group_by_key_value_range());
+
+  if (physical_plan->right_filter_predicate() != nullptr) {
+    std::unique_ptr<const Predicate> predicate(
+        convertPredicate(physical_plan->right_filter_predicate()));
+    aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
+  }
+
+  for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
+    const E::AggregateFunctionPtr unnamed_aggregate_expression =
+        std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression());
+
+    // Add a new entry in 'aggregates'.
+    S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
+
+    // Set the AggregateFunction.
+    aggr_proto->mutable_function()->CopyFrom(
+        unnamed_aggregate_expression->getAggregate().getProto());
+
+    // Add each of the aggregate's arguments.
+    for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
+      unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
+      aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto());
+    }
+
+    // Set whether it is a DISTINCT aggregation.
+    DCHECK(!unnamed_aggregate_expression->is_distinct());
+    aggr_proto->set_is_distinct(false);
+  }
+
+  const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index =
+      execution_plan_->addRelationalOperator(
+          new InitializeAggregationOperator(
+              query_handle_->query_id(),
+              aggr_state_index));
+
+  const QueryPlan::DAGNodeIndex build_aggregation_existence_map_operator_index =
+      execution_plan_->addRelationalOperator(
+          new BuildAggregationExistenceMapOperator(
+              query_handle_->query_id(),
+              *left_relation_info->relation,
+              physical_plan->left_join_attributes().front()->id(),
+              left_relation_info->isStoredRelation(),
+              aggr_state_index));
+
+  if (!left_relation_info->isStoredRelation()) {
+    execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index,
+                                         left_relation_info->producer_operator_index,
+                                         false /* is_pipeline_breaker */);
+  }
+
+  const QueryPlan::DAGNodeIndex aggregation_operator_index =
+      execution_plan_->addRelationalOperator(
+          new AggregationOperator(
+              query_handle_->query_id(),
+              *right_relation_info->relation,
+              right_relation_info->isStoredRelation(),
+              aggr_state_index));
+
+  if (!right_relation_info->isStoredRelation()) {
+    execution_plan_->addDirectDependency(aggregation_operator_index,
+                                         right_relation_info->producer_operator_index,
+                                         false /* is_pipeline_breaker */);
+  }
+
+  // Build aggregation existence map once initialization is done.
+  execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index,
+                                       initialize_aggregation_operator_index,
+                                       true /* is_pipeline_breaker */);
+
+  // Start aggregation after building existence map.
+  execution_plan_->addDirectDependency(aggregation_operator_index,
+                                       build_aggregation_existence_map_operator_index,
+                                       true /* is_pipeline_breaker */);
+
+
+  // Create InsertDestination proto.
+  const CatalogRelation *output_relation = nullptr;
+  const QueryContext::insert_destination_id insert_destination_index =
+      query_context_proto_->insert_destinations_size();
+  S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+  createTemporaryCatalogRelation(physical_plan,
+                                 &output_relation,
+                                 insert_destination_proto);
+
+  const QueryPlan::DAGNodeIndex finalize_aggregation_operator_index =
+      execution_plan_->addRelationalOperator(
+          new FinalizeAggregationOperator(query_handle_->query_id(),
+                                          aggr_state_index,
+                                          *output_relation,
+                                          insert_destination_index));
+
+  insert_destination_proto->set_relational_op_index(finalize_aggregation_operator_index);
+
+  execution_plan_->addDirectDependency(finalize_aggregation_operator_index,
+                                       aggregation_operator_index,
+                                       true /* is_pipeline_breaker */);
+
+  physical_to_output_relation_map_.emplace(
+      std::piecewise_construct,
+      std::forward_as_tuple(physical_plan),
+      std::forward_as_tuple(finalize_aggregation_operator_index, output_relation));
+  temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index,
+                                            output_relation);
+
+  const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index =
+      execution_plan_->addRelationalOperator(
+          new DestroyAggregationStateOperator(query_handle_->query_id(),
+                                              aggr_state_index));
+
+  execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index,
+                                       finalize_aggregation_operator_index,
+                                       true);
+}
+
 void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
   // Create sort configuration for run generation.
   vector<bool> sort_ordering(physical_sort->sort_ascending());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index 987f11a..f4e614a 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -46,6 +46,7 @@
 #include "query_optimizer/physical/CopyFrom.hpp"
 #include "query_optimizer/physical/CreateIndex.hpp"
 #include "query_optimizer/physical/CreateTable.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/DeleteTuples.hpp"
 #include "query_optimizer/physical/DropTable.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
@@ -206,22 +207,6 @@ class ExecutionGenerator {
   std::string getNewRelationName();
 
   /**
-   * @brief Checks whether an aggregate node can be efficiently evaluated with
-   *        the collision-free aggregation fast path.
-   *
-   * @param aggregate The physical aggregate node to be checked.
-   * @param estimated_num_groups The estimated number of groups for the aggregate.
-   * @param exact_num_groups If collision-free aggregation is applicable, the
-   *        pointed content of this pointer will be set as the maximum possible
-   *        number of groups that the collision-free hash table need to hold.
-   * @return A bool value indicating whether collision-free aggregation can be
-   *         used to evaluate \p aggregate.
-   */
-  bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate,
-                                      const std::size_t estimated_num_groups,
-                                      std::size_t *max_num_groups) const;
-
-  /**
    * @brief Sets up the info of the CatalogRelation represented by TableReference.
    *        TableReference is not converted to any operator.
    *
@@ -356,6 +341,9 @@ class ExecutionGenerator {
    */
   void convertAggregate(const physical::AggregatePtr &physical_plan);
 
+  void convertCrossReferenceCoalesceAggregate(
+      const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
   /**
    * @brief Converts a physical Sort to SortRunGeneration and SortMergeRun.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index 1b68f49..ac51c31 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -27,6 +27,7 @@
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/rules/AttachLIPFilters.hpp"
+#include "query_optimizer/rules/FuseAggregateJoin.hpp"
 #include "query_optimizer/rules/InjectJoinFilters.hpp"
 #include "query_optimizer/rules/PruneColumns.hpp"
 #include "query_optimizer/rules/PushDownLowCostDisjunctivePredicate.hpp"
@@ -145,6 +146,8 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
     rules.emplace_back(new ReorderColumns());
   }
 
+  rules.emplace_back(new FuseAggregateJoin());
+
   // NOTE(jianqiao): Adding rules after InjectJoinFilters (or AttachLIPFilters) requires
   // extra handling of LIPFilterConfiguration for transformed nodes. So currently it is
   // suggested that all the new rules be placed before this point.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/cost_model/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/CMakeLists.txt b/query_optimizer/cost_model/CMakeLists.txt
index 5f28bb3..4042915 100644
--- a/query_optimizer/cost_model/CMakeLists.txt
+++ b/query_optimizer/cost_model/CMakeLists.txt
@@ -33,6 +33,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_SimpleCostModel
                       quickstep_catalog_CatalogRelationStatistics
                       quickstep_queryoptimizer_costmodel_CostModel
                       quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
                       quickstep_queryoptimizer_physical_FilterJoin
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_NestedLoopsJoin
@@ -51,7 +52,10 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogRelationStatistics
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggregateFunction
+                      quickstep_expressions_aggregation_AggregationID
                       quickstep_queryoptimizer_costmodel_CostModel
+                      quickstep_queryoptimizer_expressions_AggregateFunction
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_ComparisonExpression
                       quickstep_queryoptimizer_expressions_ExprId
@@ -62,6 +66,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
                       quickstep_queryoptimizer_expressions_PatternMatcher
                       quickstep_queryoptimizer_expressions_Predicate
                       quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
                       quickstep_queryoptimizer_physical_FilterJoin
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_NestedLoopsJoin
@@ -76,7 +81,10 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
                       quickstep_queryoptimizer_physical_TopLevelPlan
                       quickstep_queryoptimizer_physical_WindowAggregate
                       quickstep_types_NullType
+                      quickstep_types_Type
+                      quickstep_types_TypeID
                       quickstep_types_TypedValue
+                      quickstep_utility_EqualsAnyConstant
                       quickstep_utility_Macros)
 
 # Module all-in-one library:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/cost_model/SimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.cpp b/query_optimizer/cost_model/SimpleCostModel.cpp
index e9d2e3a..cfd8a75 100644
--- a/query_optimizer/cost_model/SimpleCostModel.cpp
+++ b/query_optimizer/cost_model/SimpleCostModel.cpp
@@ -26,6 +26,7 @@
 #include "catalog/CatalogRelationStatistics.hpp"
 #include "query_optimizer/cost_model/CostModel.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
@@ -74,6 +75,9 @@ std::size_t SimpleCostModel::estimateCardinality(
     case P::PhysicalType::kAggregate:
       return estimateCardinalityForAggregate(
           std::static_pointer_cast<const P::Aggregate>(physical_plan));
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+      return estimateCardinalityForCrossReferenceCoalesceAggregate(
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
     case P::PhysicalType::kSharedSubplanReference: {
       const P::SharedSubplanReferencePtr shared_subplan_reference =
           std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan);
@@ -149,6 +153,11 @@ std::size_t SimpleCostModel::estimateCardinalityForAggregate(
                   estimateCardinality(physical_plan->input()) / 10);
 }
 
+std::size_t SimpleCostModel::estimateCardinalityForCrossReferenceCoalesceAggregate(
+    const physical::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+  return estimateCardinality(physical_plan->left_child());
+}
+
 std::size_t SimpleCostModel::estimateCardinalityForWindowAggregate(
     const physical::WindowAggregatePtr &physical_plan) {
   return estimateCardinality(physical_plan->input());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/cost_model/SimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.hpp b/query_optimizer/cost_model/SimpleCostModel.hpp
index 4edc2fe..0660c37 100644
--- a/query_optimizer/cost_model/SimpleCostModel.hpp
+++ b/query_optimizer/cost_model/SimpleCostModel.hpp
@@ -25,6 +25,7 @@
 
 #include "query_optimizer/cost_model/CostModel.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
@@ -100,6 +101,10 @@ class SimpleCostModel : public CostModel {
   std::size_t estimateCardinalityForAggregate(
       const physical::AggregatePtr &physical_plan);
 
+  // Returns the cardinality of the left child plan.
+  std::size_t estimateCardinalityForCrossReferenceCoalesceAggregate(
+      const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
   // Return the estimated cardinality of the input plan.
   std::size_t estimateCardinalityForWindowAggregate(
       const physical::WindowAggregatePtr &physical_plan);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index 7afa1c3..fc775c7 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -20,13 +20,18 @@
 #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
 
 #include <algorithm>
+#include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <vector>
 
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogRelationStatistics.hpp"
 #include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "query_optimizer/cost_model/CostModel.hpp"
+#include "query_optimizer/expressions/AggregateFunction.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ComparisonExpression.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
@@ -37,6 +42,7 @@
 #include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/expressions/PatternMatcher.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
@@ -49,8 +55,13 @@
 #include "query_optimizer/physical/TableGenerator.hpp"
 #include "query_optimizer/physical/TableReference.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
 #include "types/TypedValue.hpp"
 #include "types/NullType.hpp"
+#include "utility/EqualsAnyConstant.hpp"
+
+#include "gflags/gflags.h"
 
 #include "glog/logging.h"
 
@@ -58,6 +69,10 @@ namespace quickstep {
 namespace optimizer {
 namespace cost {
 
+DEFINE_int64(collision_free_vector_table_max_size, 1000000000,
+              "The maximum allowed key range (number of entries) for using a "
+              "CollisionFreeVectorTable.");
+
 namespace E = ::quickstep::optimizer::expressions;
 namespace P = ::quickstep::optimizer::physical;
 
@@ -88,6 +103,9 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinality(
     case P::PhysicalType::kAggregate:
       return estimateCardinalityForAggregate(
           std::static_pointer_cast<const P::Aggregate>(physical_plan));
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+      return estimateCardinalityForCrossReferenceCoalesceAggregate(
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
     case P::PhysicalType::kSharedSubplanReference: {
       const P::SharedSubplanReferencePtr shared_subplan_reference =
           std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan);
@@ -175,6 +193,11 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForAggregate(
       estimateNumGroupsForAggregate(physical_plan) * filter_selectivity);
 }
 
+std::size_t StarSchemaSimpleCostModel::estimateCardinalityForCrossReferenceCoalesceAggregate(
+    const P::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+  return estimateCardinality(physical_plan->left_child());
+}
+
 std::size_t StarSchemaSimpleCostModel::estimateCardinalityForWindowAggregate(
     const P::WindowAggregatePtr &physical_plan) {
   return estimateCardinality(physical_plan->input());
@@ -233,6 +256,13 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues(
       }
       break;
     }
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+      const P::PhysicalPtr left_child = physical_plan->children()[0];
+      if (E::ContainsExprId(left_child->getOutputAttributes(), attribute_id)) {
+        return estimateNumDistinctValues(attribute_id, left_child);
+      }
+      break;
+    }
     case P::PhysicalType::kFilterJoin: {
       const P::FilterJoinPtr &filter_join =
           std::static_pointer_cast<const P::FilterJoin>(physical_plan);
@@ -275,6 +305,17 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues(
 double StarSchemaSimpleCostModel::estimateSelectivity(
     const physical::PhysicalPtr &physical_plan) {
   switch (physical_plan->getPhysicalType()) {
+    case P::PhysicalType::kAggregate: {
+      const P::AggregatePtr &aggregate =
+          std::static_pointer_cast<const P::Aggregate>(physical_plan);
+      return estimateSelectivity(aggregate->input()) *
+          estimateSelectivityForFilterPredicate(aggregate);
+    }
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+      const P::CrossReferenceCoalesceAggregatePtr &aggregate_on_left_outer_join =
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan);
+      return estimateSelectivity(aggregate_on_left_outer_join->left_child());
+    }
     case P::PhysicalType::kSelection: {
       const P::SelectionPtr &selection =
           std::static_pointer_cast<const P::Selection>(physical_plan);
@@ -331,6 +372,7 @@ double StarSchemaSimpleCostModel::estimateSelectivity(
 
 double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
     const physical::PhysicalPtr &physical_plan) {
+  P::PhysicalPtr target_plan = physical_plan;
   E::PredicatePtr filter_predicate = nullptr;
   switch (physical_plan->getPhysicalType()) {
     case P::PhysicalType::kSelection:
@@ -340,6 +382,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
     case P::PhysicalType::kAggregate:
       filter_predicate =
           std::static_pointer_cast<const P::Aggregate>(physical_plan)->filter_predicate();
+      target_plan = physical_plan->children()[0];
       break;
     case P::PhysicalType::kHashJoin:
       filter_predicate =
@@ -356,7 +399,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
   if (filter_predicate == nullptr) {
     return 1.0;
   } else {
-    return estimateSelectivityForPredicate(filter_predicate, physical_plan);
+    return estimateSelectivityForPredicate(filter_predicate, target_plan);
   }
 }
 
@@ -443,6 +486,12 @@ bool StarSchemaSimpleCostModel::impliesUniqueAttributes(
           std::static_pointer_cast<const P::Aggregate>(physical_plan);
       return E::SubsetOfExpressions(aggregate->grouping_expressions(), attributes);
     }
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+      const P::CrossReferenceCoalesceAggregatePtr &aggregate_on_left_outer_join =
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan);
+      return E::SubsetOfExpressions(
+          aggregate_on_left_outer_join->left_join_attributes(), attributes);
+    }
     case P::PhysicalType::kHashJoin: {
       const P::HashJoinPtr &hash_join =
           std::static_pointer_cast<const P::HashJoin>(physical_plan);
@@ -542,6 +591,103 @@ attribute_id StarSchemaSimpleCostModel::findCatalogRelationAttributeId(
   return kInvalidAttributeID;
 }
 
+bool StarSchemaSimpleCostModel::canUseCollisionFreeAggregation(
+    const P::AggregatePtr &aggregate,
+    const std::size_t estimated_num_groups,
+    std::size_t *max_num_groups) {
+#ifdef QUICKSTEP_DISTRIBUTED
+  // Currently we cannot do this fast path with the distributed setting. See
+  // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
+  // FinalizeAggregationOperator::getAllWorkOrderProtos().
+  return false;
+#endif
+
+  // Supports only single group-by key.
+  if (aggregate->grouping_expressions().size() != 1) {
+    return false;
+  }
+
+  // We need to know the exact min/max stats of the group-by key.
+  // So it must be a CatalogAttribute (but not an expression).
+  E::AttributeReferencePtr group_by_key_attr;
+  const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
+  if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
+    return false;
+  }
+
+  bool min_value_stat_is_exact;
+  bool max_value_stat_is_exact;
+  const TypedValue min_value = findMinValueStat(
+          aggregate, group_by_key_attr, &min_value_stat_is_exact);
+  const TypedValue max_value = findMaxValueStat(
+          aggregate, group_by_key_attr, &max_value_stat_is_exact);
+  if (min_value.isNull() || max_value.isNull() ||
+      (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
+    return false;
+  }
+
+  std::int64_t min_cpp_value;
+  std::int64_t max_cpp_value;
+  switch (group_by_key_attr->getValueType().getTypeID()) {
+    case TypeID::kInt: {
+      min_cpp_value = min_value.getLiteral<int>();
+      max_cpp_value = max_value.getLiteral<int>();
+      break;
+    }
+    case TypeID::kLong: {
+      min_cpp_value = min_value.getLiteral<std::int64_t>();
+      max_cpp_value = max_value.getLiteral<std::int64_t>();
+      break;
+    }
+    default:
+      return false;
+  }
+
+  // TODO(jianqiao):
+  // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
+  // 2. Reason about the table size bound (e.g. by checking memory size) instead
+  //    of hardcoding it as a gflag.
+  if (min_cpp_value < 0 ||
+      max_cpp_value >= FLAGS_collision_free_vector_table_max_size ||
+      max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
+    return false;
+  }
+
+  for (const auto &agg_expr : aggregate->aggregate_expressions()) {
+    const E::AggregateFunctionPtr agg_func =
+        std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
+
+    if (agg_func->is_distinct()) {
+      return false;
+    }
+
+    // TODO(jianqiao): Support AggregationID::AVG.
+    if (!QUICKSTEP_EQUALS_ANY_CONSTANT(agg_func->getAggregate().getAggregationID(),
+                                       AggregationID::kCount,
+                                       AggregationID::kSum)) {
+      return false;
+    }
+
+    const auto &arguments = agg_func->getArguments();
+    if (arguments.size() > 1u) {
+      return false;
+    }
+
+    if (arguments.size() == 1u) {
+      if (!QUICKSTEP_EQUALS_ANY_CONSTANT(arguments.front()->getValueType().getTypeID(),
+                                         TypeID::kInt,
+                                         TypeID::kLong,
+                                         TypeID::kFloat,
+                                         TypeID::kDouble)) {
+        return false;
+      }
+    }
+  }
+
+  *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
+  return true;
+}
+
 }  // namespace cost
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
index cbe18f4..afb2ef9 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
@@ -29,6 +29,7 @@
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
@@ -166,10 +167,29 @@ class StarSchemaSimpleCostModel : public CostModel {
         physical_plan, attribute->id(), StatType::kMax, is_exact_stat);
   }
 
+  /**
+   * @brief Checks whether an aggregate node can be efficiently evaluated with
+   *        the collision-free aggregation fast path.
+   *
+   * @param aggregate The physical aggregate node to be checked.
+   * @param estimated_num_groups The estimated number of groups for the aggregate.
+   * @param exact_num_groups If collision-free aggregation is applicable, the
+   *        pointed content of this pointer will be set as the maximum possible
+   *        number of groups that the collision-free hash table need to hold.
+   * @return A bool value indicating whether collision-free aggregation can be
+   *         used to evaluate \p aggregate.
+   */
+  bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate,
+                                      const std::size_t estimated_num_groups,
+                                      std::size_t *max_num_groups);
+
  private:
   std::size_t estimateCardinalityForAggregate(
       const physical::AggregatePtr &physical_plan);
 
+  std::size_t estimateCardinalityForCrossReferenceCoalesceAggregate(
+      const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
   std::size_t estimateCardinalityForFilterJoin(
       const physical::FilterJoinPtr &physical_plan);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/physical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CMakeLists.txt b/query_optimizer/physical/CMakeLists.txt
index f68ed39..77ae75e 100644
--- a/query_optimizer/physical/CMakeLists.txt
+++ b/query_optimizer/physical/CMakeLists.txt
@@ -21,6 +21,9 @@ add_library(quickstep_queryoptimizer_physical_BinaryJoin BinaryJoin.cpp BinaryJo
 add_library(quickstep_queryoptimizer_physical_CopyFrom CopyFrom.cpp CopyFrom.hpp)
 add_library(quickstep_queryoptimizer_physical_CreateIndex CreateIndex.cpp CreateIndex.hpp)
 add_library(quickstep_queryoptimizer_physical_CreateTable CreateTable.cpp CreateTable.hpp)
+add_library(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+            CrossReferenceCoalesceAggregate.cpp
+            CrossReferenceCoalesceAggregate.hpp)
 add_library(quickstep_queryoptimizer_physical_DeleteTuples DeleteTuples.cpp DeleteTuples.hpp)
 add_library(quickstep_queryoptimizer_physical_DropTable DropTable.cpp DropTable.hpp)
 add_library(quickstep_queryoptimizer_physical_FilterJoin FilterJoin.cpp FilterJoin.hpp)
@@ -95,6 +98,16 @@ target_link_libraries(quickstep_queryoptimizer_physical_CreateTable
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_utility_Cast
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+                      quickstep_queryoptimizer_OptimizerTree
+                      quickstep_queryoptimizer_expressions_Alias
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_utility_Cast
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_physical_DeleteTuples
                       glog
                       quickstep_catalog_CatalogRelation
@@ -293,6 +306,7 @@ target_link_libraries(quickstep_queryoptimizer_physical
                       quickstep_queryoptimizer_physical_CopyFrom
                       quickstep_queryoptimizer_physical_CreateIndex
                       quickstep_queryoptimizer_physical_CreateTable
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
                       quickstep_queryoptimizer_physical_DeleteTuples
                       quickstep_queryoptimizer_physical_DropTable
                       quickstep_queryoptimizer_physical_FilterJoin

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp b/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
new file mode 100644
index 0000000..6bed215
--- /dev/null
+++ b/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
+
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "utility/Cast.hpp"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+namespace E = ::quickstep::optimizer::expressions;
+
+std::vector<E::AttributeReferencePtr> CrossReferenceCoalesceAggregate
+    ::getOutputAttributes() const {
+  std::vector<E::AttributeReferencePtr> output_attributes(left_join_attributes_);
+  for (const auto &aggregate_expr : aggregate_expressions_) {
+    output_attributes.emplace_back(E::ToRef(aggregate_expr));
+  }
+  return output_attributes;
+}
+
+std::vector<E::AttributeReferencePtr> CrossReferenceCoalesceAggregate
+    ::getReferencedAttributes() const {
+  std::unordered_set<E::AttributeReferencePtr> referenced_attributes;
+
+  referenced_attributes.insert(left_join_attributes_.begin(),
+                               left_join_attributes_.end());
+  referenced_attributes.insert(right_join_attributes_.begin(),
+                               right_join_attributes_.end());
+
+  if (right_filter_predicate_ != nullptr) {
+    const std::vector<E::AttributeReferencePtr> attrs_in_predicate =
+        right_filter_predicate_->getReferencedAttributes();
+    referenced_attributes.insert(attrs_in_predicate.begin(),
+                                 attrs_in_predicate.end());
+  }
+
+  for (const auto &aggregate_expr : aggregate_expressions_) {
+    const std::vector<E::AttributeReferencePtr> attrs_in_expr =
+        aggregate_expr->getReferencedAttributes();
+    referenced_attributes.insert(attrs_in_expr.begin(), attrs_in_expr.end());
+  }
+
+  return std::vector<E::AttributeReferencePtr>(
+      referenced_attributes.begin(), referenced_attributes.end());
+}
+
+void CrossReferenceCoalesceAggregate::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+  inline_field_names->push_back("group_by_key_value_range");
+  inline_field_values->push_back(std::to_string(group_by_key_value_range_));
+
+  non_container_child_field_names->push_back("left_child");
+  non_container_child_fields->push_back(left_child_);
+  non_container_child_field_names->push_back("right_child");
+  non_container_child_fields->push_back(right_child_);
+
+  container_child_field_names->push_back("left_join_attributes");
+  container_child_fields->push_back(
+      CastSharedPtrVector<OptimizerTreeBase>(left_join_attributes_));
+  container_child_field_names->push_back("right_join_attributes");
+  container_child_fields->push_back(
+      CastSharedPtrVector<OptimizerTreeBase>(right_join_attributes_));
+
+  if (right_filter_predicate_ != nullptr) {
+    non_container_child_field_names->push_back("right_filter_predicate");
+    non_container_child_fields->push_back(right_filter_predicate_);
+  }
+  container_child_field_names->push_back("aggregate_expressions");
+  container_child_fields->push_back(
+      CastSharedPtrVector<OptimizerTreeBase>(aggregate_expressions_));
+}
+
+}  // namespace physical
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp b/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
new file mode 100644
index 0000000..44f8a33
--- /dev/null
+++ b/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+/** \addtogroup OptimizerLogical
+ *  @{
+ */
+
+class CrossReferenceCoalesceAggregate;
+typedef std::shared_ptr<const CrossReferenceCoalesceAggregate> CrossReferenceCoalesceAggregatePtr;
+
+/**
+ * @brief A physical node that fuses a HashJoin with an Aggregate to enable
+ *        fast-path execution.
+ *
+ * Below we briefly describe the semantics of this physical node.
+ *
+ * Let L be a table with PRIMARY KEY u. Let R be a table with FOREIGN KEY x
+ * referring to L(u). Then CrossReferenceCoalesceAggregate represents a common
+ * class of analytical queries that
+ * - For each u in L, COUNT/SUM the records in R that correspond to u (i.e.
+ *   those records satisfying R.x = L.u).
+ *   In the case that there is no record for u in R, use 0 as the result value.
+ *
+ * And we have the mapping:
+ *   L -> left_child_
+ *   R -> right_child_
+ *   u -> left_join_attributes_
+ *   x -> right_join_attributes_
+ *   COUNT/SUM -> aggregate_expressions_
+ */
+class CrossReferenceCoalesceAggregate : public Physical {
+ public:
+  PhysicalType getPhysicalType() const override {
+    return PhysicalType::kCrossReferenceCoalesceAggregate;
+  }
+
+  std::string getName() const override {
+    return "CrossReferenceCoalesceAggregate";
+  }
+
+  /**
+   * @return The left physical child.
+   */
+  const PhysicalPtr& left_child() const {
+    return left_child_;
+  }
+
+  /**
+   * @return The right physical child.
+   */
+  const PhysicalPtr& right_child() const {
+    return right_child_;
+  }
+
+  /**
+   * @return The left join attributes.
+   */
+  const std::vector<expressions::AttributeReferencePtr>& left_join_attributes() const {
+    return left_join_attributes_;
+  }
+
+  /**
+   * @return The right join attributes.
+   */
+  const std::vector<expressions::AttributeReferencePtr>& right_join_attributes() const {
+    return right_join_attributes_;
+  }
+
+  /**
+   * @return The predicate to be applied to the right child before aggregation.
+   */
+  const expressions::PredicatePtr& right_filter_predicate() const {
+    return right_filter_predicate_;
+  }
+
+  /**
+   * @return Aggregate expressions.
+   */
+  const std::vector<expressions::AliasPtr>& aggregate_expressions() const {
+    return aggregate_expressions_;
+  }
+
+  /**
+   * @return The maximum possible value of the group-by keys when mapped to
+   *         integer.
+   */
+  std::size_t group_by_key_value_range() const {
+    return group_by_key_value_range_;
+  }
+
+  PhysicalPtr copyWithNewChildren(
+      const std::vector<PhysicalPtr> &new_children) const override {
+    DCHECK_EQ(getNumChildren(), new_children.size());
+    return Create(new_children[0],
+                  new_children[1],
+                  left_join_attributes_,
+                  right_join_attributes_,
+                  right_filter_predicate_,
+                  aggregate_expressions_,
+                  group_by_key_value_range_);
+  }
+
+  std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override;
+
+  std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
+
+  bool maybeCopyWithPrunedExpressions(
+      const expressions::UnorderedNamedExpressionSet &referenced_expressions,
+      PhysicalPtr *output) const override {
+    return false;
+  }
+
+  /**
+   * @brief Creates a physical CrossReferenceCoalesceAggregate.
+   *
+   * @param left_child The left child.
+   * @param right_child The right child.
+   * @param left_join_attributes The join attributes of the left child.
+   * @param right_join_attributes The join attributes of the right child.
+   * @param right_filter_predicate Optional filtering predicate evaluated on
+   *        the left child before aggregation.
+   * @param aggregate_expressions The aggregate expressions.
+   * @param group_by_key_value_range The maximum possible value of the group-by
+   *        keys when mapped to integer.
+   * @return An immutable physical CrossReferenceCoalesceAggregate.
+   */
+  static CrossReferenceCoalesceAggregatePtr Create(
+      const PhysicalPtr &left_child,
+      const PhysicalPtr &right_child,
+      const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
+      const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
+      const expressions::PredicatePtr right_filter_predicate,
+      const std::vector<expressions::AliasPtr> &aggregate_expressions,
+      const std::size_t group_by_key_value_range) {
+    return CrossReferenceCoalesceAggregatePtr(
+        new CrossReferenceCoalesceAggregate(left_child,
+                                            right_child,
+                                            left_join_attributes,
+                                            right_join_attributes,
+                                            right_filter_predicate,
+                                            aggregate_expressions,
+                                            group_by_key_value_range));
+  }
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override;
+
+ private:
+  CrossReferenceCoalesceAggregate(
+      const PhysicalPtr &left_child,
+      const PhysicalPtr &right_child,
+      const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
+      const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
+      const expressions::PredicatePtr right_filter_predicate,
+      const std::vector<expressions::AliasPtr> &aggregate_expressions,
+      const std::size_t group_by_key_value_range)
+      : left_child_(left_child),
+        right_child_(right_child),
+        left_join_attributes_(left_join_attributes),
+        right_join_attributes_(right_join_attributes),
+        right_filter_predicate_(right_filter_predicate),
+        aggregate_expressions_(aggregate_expressions),
+        group_by_key_value_range_(group_by_key_value_range) {
+    addChild(left_child_);
+    addChild(right_child_);
+  }
+
+  // TODO(jianqiao): For the left child, support filter predicate fusing and
+  // attachment of LIPFilters.
+  PhysicalPtr left_child_;
+  PhysicalPtr right_child_;
+  std::vector<expressions::AttributeReferencePtr> left_join_attributes_;
+  std::vector<expressions::AttributeReferencePtr> right_join_attributes_;
+  expressions::PredicatePtr right_filter_predicate_;
+  std::vector<expressions::AliasPtr> aggregate_expressions_;
+  std::size_t group_by_key_value_range_;
+
+  DISALLOW_COPY_AND_ASSIGN(CrossReferenceCoalesceAggregate);
+};
+
+/** @} */
+
+}  // namespace physical
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/physical/PatternMatcher.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/PatternMatcher.hpp b/query_optimizer/physical/PatternMatcher.hpp
index 4336767..0204504 100644
--- a/query_optimizer/physical/PatternMatcher.hpp
+++ b/query_optimizer/physical/PatternMatcher.hpp
@@ -33,6 +33,7 @@ class Aggregate;
 class BinaryJoin;
 class CopyFrom;
 class CreateTable;
+class CrossReferenceCoalesceAggregate;
 class DeleteTuples;
 class DropTable;
 class FilterJoin;
@@ -112,6 +113,8 @@ using SomeAggregate = SomePhysicalNode<Aggregate, PhysicalType::kAggregate>;
 using SomeBinaryJoin = SomePhysicalNode<BinaryJoin, PhysicalType::kHashJoin, PhysicalType::kNestedLoopsJoin>;
 using SomeCopyFrom = SomePhysicalNode<CopyFrom, PhysicalType::kCopyFrom>;
 using SomeCreateTable = SomePhysicalNode<CreateTable, PhysicalType::kCreateTable>;
+using SomeCrossReferenceCoalesceAggregate = SomePhysicalNode<CrossReferenceCoalesceAggregate,
+                                                             PhysicalType::kCrossReferenceCoalesceAggregate>;
 using SomeDeleteTuples = SomePhysicalNode<DeleteTuples, PhysicalType::kDeleteTuples>;
 using SomeDropTable = SomePhysicalNode<DropTable, PhysicalType::kDropTable>;
 using SomeFilterJoin = SomePhysicalNode<FilterJoin, PhysicalType::kFilterJoin>;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/physical/PhysicalType.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/PhysicalType.hpp b/query_optimizer/physical/PhysicalType.hpp
index 1da5929..077bd54 100644
--- a/query_optimizer/physical/PhysicalType.hpp
+++ b/query_optimizer/physical/PhysicalType.hpp
@@ -36,6 +36,7 @@ enum class PhysicalType {
   kCopyFrom,
   kCreateIndex,
   kCreateTable,
+  kCrossReferenceCoalesceAggregate,
   kDeleteTuples,
   kDropTable,
   kFilterJoin,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/rules/BottomUpRule.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/BottomUpRule.hpp b/query_optimizer/rules/BottomUpRule.hpp
index 53dff0d..6c14e64 100644
--- a/query_optimizer/rules/BottomUpRule.hpp
+++ b/query_optimizer/rules/BottomUpRule.hpp
@@ -57,21 +57,7 @@ class BottomUpRule : public Rule<TreeType> {
     DCHECK(tree != nullptr);
 
     init(tree);
-    std::vector<std::shared_ptr<const TreeType>> new_children;
-    bool has_changed_children = false;
-    for (const std::shared_ptr<const TreeType> &child : tree->children()) {
-      std::shared_ptr<const TreeType> new_child = apply(child);
-      if (child != new_child && !has_changed_children) {
-        has_changed_children = true;
-      }
-      new_children.push_back(new_child);
-    }
-
-    if (has_changed_children) {
-      return applyToNode(tree->copyWithNewChildren(new_children));
-    } else {
-      return applyToNode(tree);
-    }
+    return applyInternal(tree);
   }
 
  protected:
@@ -89,10 +75,29 @@ class BottomUpRule : public Rule<TreeType> {
    *
    * @param input The input tree.
    */
-  virtual void init(const TreeNodePtr &input) {
-  }
+  virtual void init(const TreeNodePtr &input) {}
 
  private:
+  TreeNodePtr applyInternal(const TreeNodePtr &tree) {
+    DCHECK(tree != nullptr);
+
+    std::vector<std::shared_ptr<const TreeType>> new_children;
+    bool has_changed_children = false;
+    for (const std::shared_ptr<const TreeType> &child : tree->children()) {
+      std::shared_ptr<const TreeType> new_child = applyInternal(child);
+      if (child != new_child && !has_changed_children) {
+        has_changed_children = true;
+      }
+      new_children.push_back(new_child);
+    }
+
+    if (has_changed_children) {
+      return applyToNode(tree->copyWithNewChildren(new_children));
+    } else {
+      return applyToNode(tree);
+    }
+  }
+
   DISALLOW_COPY_AND_ASSIGN(BottomUpRule);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 029d816..427500d 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -21,6 +21,7 @@ add_subdirectory(tests)
 add_library(quickstep_queryoptimizer_rules_AttachLIPFilters AttachLIPFilters.cpp AttachLIPFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp)
 add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp)
+add_library(quickstep_queryoptimizer_rules_FuseAggregateJoin FuseAggregateJoin.cpp FuseAggregateJoin.hpp)
 add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
 add_library(quickstep_queryoptimizer_rules_InjectJoinFilters InjectJoinFilters.cpp InjectJoinFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_PruneColumns PruneColumns.cpp PruneColumns.hpp)
@@ -75,6 +76,27 @@ target_link_libraries(quickstep_queryoptimizer_rules_CollapseProject
                       quickstep_queryoptimizer_rules_Rule
                       quickstep_queryoptimizer_rules_RuleHelper
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_FuseAggregateJoin
+                      quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
+                      quickstep_queryoptimizer_expressions_AggregateFunction
+                      quickstep_queryoptimizer_expressions_Alias
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExprId
+                      quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_expressions_PatternMatcher
+                      quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_expressions_Scalar
+                      quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+                      quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_PatternMatcher
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_queryoptimizer_physical_Selection
+                      quickstep_queryoptimizer_physical_TopLevelPlan
+                      quickstep_queryoptimizer_rules_BottomUpRule
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_rules_GenerateJoins
                       glog
                       quickstep_queryoptimizer_expressions_AttributeReference
@@ -288,6 +310,7 @@ target_link_libraries(quickstep_queryoptimizer_rules
                       quickstep_queryoptimizer_rules_AttachLIPFilters
                       quickstep_queryoptimizer_rules_BottomUpRule
                       quickstep_queryoptimizer_rules_CollapseProject
+                      quickstep_queryoptimizer_rules_FuseAggregateJoin
                       quickstep_queryoptimizer_rules_GenerateJoins
                       quickstep_queryoptimizer_rules_InjectJoinFilters
                       quickstep_queryoptimizer_rules_PruneColumns

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/rules/FuseAggregateJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseAggregateJoin.cpp b/query_optimizer/rules/FuseAggregateJoin.cpp
new file mode 100644
index 0000000..6efc7e8
--- /dev/null
+++ b/query_optimizer/rules/FuseAggregateJoin.cpp
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/FuseAggregateJoin.hpp"
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_set>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AggregateFunction.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+P::PhysicalPtr FuseAggregateJoin::applyToNode(
+    const P::PhysicalPtr &node) {
+  // Currently we consider only Aggregate on HashLeftOuterJoin.
+  P::AggregatePtr aggregate;
+  if (!P::SomeAggregate::MatchesWithConditionalCast(node, &aggregate) ||
+      aggregate->filter_predicate() != nullptr) {
+    return node;
+  }
+
+  P::HashJoinPtr hash_join;
+  if ((!P::SomeHashJoin::MatchesWithConditionalCast(aggregate->input(), &hash_join)) ||
+      hash_join->join_type() != P::HashJoin::JoinType::kLeftOuterJoin ||
+      hash_join->residual_predicate() != nullptr) {
+    return node;
+  }
+
+  // Single left join attribute with unique values.
+  const std::vector<E::AttributeReferencePtr> &left_join_attributes =
+      hash_join->left_join_attributes();
+  if (left_join_attributes.size() != 1u ||
+      (!cost_model_->impliesUniqueAttributes(hash_join->left(), left_join_attributes))) {
+    return node;
+  }
+
+  // Single group-by attribute that is the same as the right join attribute.
+  const std::vector<E::NamedExpressionPtr> &grouping_expressions =
+      aggregate->grouping_expressions();
+  if (grouping_expressions.size() != 1u ||
+      grouping_expressions.front()->id() != left_join_attributes.front()->id()) {
+    return node;
+  }
+
+  std::unordered_set<E::ExprId> right_side_attr_ids;
+  for (const auto &attr : hash_join->right()->getOutputAttributes()) {
+    right_side_attr_ids.insert(attr->id());
+  }
+
+  // Aggregate expressions only depend on attributes from the right child.
+  const std::vector<E::AliasPtr> &aggregate_expressions =
+      aggregate->aggregate_expressions();
+  for (const auto &expr : aggregate_expressions) {
+    const E::AggregateFunctionPtr aggr_expr =
+        std::static_pointer_cast<const E::AggregateFunction>(expr->expression());
+
+    const std::vector<E::ScalarPtr> &arguments = aggr_expr->getArguments();
+    if (arguments.size() != 1u) {
+      return node;
+    }
+
+    E::AttributeReferencePtr arg_attr;
+    if (!E::SomeAttributeReference::MatchesWithConditionalCast(arguments.front(), &arg_attr) ||
+        right_side_attr_ids.find(arg_attr->id()) == right_side_attr_ids.end()) {
+      return node;
+    }
+  }
+
+  // Collision-free vector aggregation is applicable, and both the left and right
+  // join attributes are range-bounded integer values.
+  const std::size_t estimated_num_groups =
+      cost_model_->estimateNumGroupsForAggregate(aggregate);
+
+  std::size_t max_num_groups_left;
+  if (!cost_model_->canUseCollisionFreeAggregation(aggregate,
+                                                   estimated_num_groups,
+                                                   &max_num_groups_left)) {
+    return node;
+  }
+
+  std::size_t max_num_groups_right;
+  if (!cost_model_->canUseCollisionFreeAggregation(
+           P::Aggregate::Create(hash_join->right(),
+                                E::ToNamedExpressions(hash_join->right_join_attributes()),
+                                aggregate->aggregate_expressions(),
+                                nullptr),
+           estimated_num_groups,
+           &max_num_groups_right)) {
+    return node;
+  }
+
+  // Fuse right child's filter predicate.
+  P::PhysicalPtr right_child = hash_join->right();
+  const std::vector<E::AttributeReferencePtr> &right_join_attributes =
+      hash_join->right_join_attributes();
+  E::PredicatePtr right_filter_predicate = nullptr;
+
+  P::SelectionPtr selection;
+  if (P::SomeSelection::MatchesWithConditionalCast(right_child, &selection)) {
+    if (E::SubsetOfExpressions(right_join_attributes,
+                               selection->input()->getOutputAttributes())) {
+      right_child = selection->input();
+      right_filter_predicate = selection->filter_predicate();
+    }
+  }
+
+  const std::size_t max_num_groups =
+      std::max(max_num_groups_left, max_num_groups_right);
+
+  return P::CrossReferenceCoalesceAggregate::Create(hash_join->left(),
+                                                    right_child,
+                                                    left_join_attributes,
+                                                    right_join_attributes,
+                                                    right_filter_predicate,
+                                                    aggregate_expressions,
+                                                    max_num_groups);
+}
+
+void FuseAggregateJoin::init(const P::PhysicalPtr &input) {
+  DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+
+  const P::TopLevelPlanPtr top_level_plan =
+      std::static_pointer_cast<const P::TopLevelPlan>(input);
+  cost_model_.reset(
+      new cost::StarSchemaSimpleCostModel(top_level_plan->shared_subplans()));
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/rules/FuseAggregateJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseAggregateJoin.hpp b/query_optimizer/rules/FuseAggregateJoin.hpp
new file mode 100644
index 0000000..f2d4c47
--- /dev/null
+++ b/query_optimizer/rules/FuseAggregateJoin.hpp
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_
+
+#include <memory>
+#include <string>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+/**
+ * @brief Rule that applies to a physical plan to fuse Aggregate nodes with
+ *        HashJoin nodes.
+ */
+class FuseAggregateJoin : public BottomUpRule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   */
+  FuseAggregateJoin() {}
+
+  ~FuseAggregateJoin() override {}
+
+  std::string getName() const override {
+    return "FuseAggregateJoin";
+  }
+
+ protected:
+  physical::PhysicalPtr applyToNode(const physical::PhysicalPtr &node) override;
+
+  void init(const physical::PhysicalPtr &input) override;
+
+ private:
+  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+
+  DISALLOW_COPY_AND_ASSIGN(FuseAggregateJoin);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/relational_operators/BuildAggregationExistenceMapOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.cpp b/relational_operators/BuildAggregationExistenceMapOperator.cpp
new file mode 100644
index 0000000..648e291
--- /dev/null
+++ b/relational_operators/BuildAggregationExistenceMapOperator.cpp
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/BuildAggregationExistenceMapOperator.hpp"
+
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogAttribute.hpp"
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/AggregationOperationState.hpp"
+#include "storage/CollisionFreeVectorTable.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+namespace {
+
+template <typename CppType, bool is_attr_nullable>
+void ExecuteBuild(const attribute_id attr_id,
+                  ValueAccessor *accessor,
+                  BarrieredReadWriteConcurrentBitVector *existence_map) {
+  InvokeOnAnyValueAccessor(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    accessor->beginIteration();
+    while (accessor->next()) {
+      const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id);
+      if (!is_attr_nullable || value != nullptr) {
+        existence_map->setBit(*reinterpret_cast<const CppType *>(value));
+      }
+    }
+  });
+}
+
+// Dispatch helper.
+template <typename CppType>
+void ExecuteHelper(const attribute_id attr_id,
+                   const bool is_attr_nullable,
+                   ValueAccessor *accessor,
+                   BarrieredReadWriteConcurrentBitVector *existence_map)  {
+  if (is_attr_nullable) {
+    ExecuteBuild<CppType, true>(attr_id, accessor, existence_map);
+  } else {
+    ExecuteBuild<CppType, false>(attr_id, accessor, existence_map);
+  }
+}
+
+}  // namespace
+
+bool BuildAggregationExistenceMapOperator::getAllWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
+  if (input_relation_is_stored_) {
+    if (!started_) {
+      for (const block_id input_block_id : input_relation_block_ids_) {
+        container->addNormalWorkOrder(
+            new BuildAggregationExistenceMapWorkOrder(
+                query_id_,
+                input_relation_,
+                input_block_id,
+                build_attribute_,
+                query_context->getAggregationState(aggr_state_index_),
+                storage_manager),
+            op_index_);
+      }
+      started_ = true;
+    }
+    return true;
+  } else {
+    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+      container->addNormalWorkOrder(
+          new BuildAggregationExistenceMapWorkOrder(
+                query_id_,
+                input_relation_,
+                input_relation_block_ids_[num_workorders_generated_],
+                build_attribute_,
+                query_context->getAggregationState(aggr_state_index_),
+                storage_manager),
+          op_index_);
+      ++num_workorders_generated_;
+    }
+    return done_feeding_input_relation_;
+  }
+}
+
+bool BuildAggregationExistenceMapOperator
+    ::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  if (input_relation_is_stored_) {
+    if (!started_) {
+      for (const block_id block : input_relation_block_ids_) {
+        container->addWorkOrderProto(createWorkOrderProto(block), op_index_);
+      }
+      started_ = true;
+    }
+    return true;
+  } else {
+    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+      container->addWorkOrderProto(
+          createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]),
+          op_index_);
+      ++num_workorders_generated_;
+    }
+    return done_feeding_input_relation_;
+  }
+}
+
+serialization::WorkOrder* BuildAggregationExistenceMapOperator
+    ::createWorkOrderProto(const block_id block) {
+  serialization::WorkOrder *proto = new serialization::WorkOrder;
+  proto->set_work_order_type(serialization::BUILD_LIP_FILTER);
+  proto->set_query_id(query_id_);
+
+  proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id,
+                      input_relation_.getID());
+  proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id,
+                      block);
+  proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_attribute,
+                      build_attribute_);
+  proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index,
+                      aggr_state_index_);
+  return proto;
+}
+
+void BuildAggregationExistenceMapWorkOrder::execute() {
+  BlockReference block(
+      storage_manager_->getBlock(build_block_id_, input_relation_));
+  std::unique_ptr<ValueAccessor> accessor(
+      block->getTupleStorageSubBlock().createValueAccessor());
+
+  CollisionFreeVectorTable *aggregate_table =
+      state_->getCollisionFreeVectorTable();
+  DCHECK(aggregate_table != nullptr);
+
+  BarrieredReadWriteConcurrentBitVector *existence_map =
+      aggregate_table->getExistenceMap();
+
+  const Type &attr_type =
+      input_relation_.getAttributeById(build_attribute_)->getType();
+  switch (attr_type.getTypeID()) {
+    case TypeID::kInt:
+      ExecuteHelper<int>(build_attribute_,
+                         attr_type.isNullable(),
+                         accessor.get(),
+                         existence_map);
+      return;
+    case TypeID::kLong:
+      ExecuteHelper<std::int64_t>(build_attribute_,
+                                  attr_type.isNullable(),
+                                  accessor.get(),
+                                  existence_map);
+      return;
+    default:
+      LOG(FATAL) << "Build attribute type not supported by "
+                 << "BuildAggregationExistenceMapOperator: "
+                 << attr_type.getName();
+  }
+}
+
+}  // namespace quickstep


[7/9] incubator-quickstep git commit: Minor refactored QueryManagerDistributed.

Posted by zu...@apache.org.
Minor refactored QueryManagerDistributed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8229994f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8229994f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8229994f

Branch: refs/heads/two-level-tmb
Commit: 8229994ff813b990830a48617d64362aa4c20c63
Parents: 1cfc1c4
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Feb 9 14:21:28 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Feb 9 14:21:28 2017 -0800

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                |  1 -
 query_execution/PolicyEnforcerDistributed.cpp | 12 ++++++++++--
 query_execution/QueryManagerDistributed.cpp   | 17 ++++++-----------
 query_execution/QueryManagerDistributed.hpp   | 15 ++++++++++-----
 4 files changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8229994f/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index e26bde0..5ad6999 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -257,7 +257,6 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_QueryExecutionTypedefs
                         quickstep_queryexecution_QueryExecutionUtil
                         quickstep_queryexecution_QueryManagerBase
-                        quickstep_queryexecution_ShiftbossDirectory
                         quickstep_queryexecution_WorkOrderProtosContainer
                         quickstep_relationaloperators_RelationalOperator
                         quickstep_relationaloperators_WorkOrder_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8229994f/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index ef5abb0..12d2037 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -131,9 +131,17 @@ bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) {
   // initializes.
   initiateQueryInShiftboss(query_handle);
 
+  const std::size_t num_shiftbosses = shiftboss_directory_->size();
+
+  tmb::Address shiftboss_addresses;
+  for (std::size_t i = 0; i < num_shiftbosses; ++i) {
+    shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
+  }
+
   // Query with the same ID not present, ok to admit.
-  admitted_queries_[query_id].reset(
-      new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_));
+  admitted_queries_.emplace(query_id,
+                            std::make_unique<QueryManagerDistributed>(
+                                query_handle, foreman_client_id_, num_shiftbosses, move(shiftboss_addresses), bus_));
   return true;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8229994f/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 6ac96ab..174c490 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -30,7 +30,6 @@
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
-#include "query_execution/ShiftbossDirectory.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.pb.h"
@@ -52,12 +51,14 @@ using std::vector;
 namespace quickstep {
 
 QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
-                                                 const ShiftbossDirectory *shiftboss_directory,
                                                  const tmb::client_id foreman_client_id,
+                                                 const std::size_t num_shiftbosses,
+                                                 tmb::Address &&shiftboss_addresses,
                                                  tmb::MessageBus *bus)
     : QueryManagerBase(query_handle),
-      shiftboss_directory_(shiftboss_directory),
       foreman_client_id_(foreman_client_id),
+      num_shiftbosses_(num_shiftbosses),
+      shiftboss_addresses_(move(shiftboss_addresses)),
       bus_(bus),
       normal_workorder_protos_container_(
           new WorkOrderProtosContainer(num_operators_in_dag_)) {
@@ -142,7 +143,7 @@ void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_no
                                                                     const std::size_t shiftboss_index) {
   query_exec_state_->updateRebuildStatus(op_index, num_rebuild_work_orders, shiftboss_index);
 
-  if (!query_exec_state_->hasRebuildFinished(op_index, shiftboss_directory_->size())) {
+  if (!query_exec_state_->hasRebuildFinished(op_index, num_shiftbosses_)) {
     // Wait for the rebuild work orders to finish.
     return;
   }
@@ -181,16 +182,10 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
                            kInitiateRebuildMessage);
   free(proto_bytes);
 
-  // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses.
-  tmb::Address shiftboss_addresses;
-  for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) {
-    shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i));
-  }
-
   DLOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage
              << "') to all Shiftbosses";
   QueryExecutionUtil::BroadcastMessage(foreman_client_id_,
-                                       shiftboss_addresses,
+                                       shiftboss_addresses_,
                                        move(tagged_msg),
                                        bus_);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8229994f/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 631b15a..759fa70 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -32,6 +32,7 @@
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "utility/Macros.hpp"
 
+#include "tmb/address.h"
 #include "tmb/id_typedefs.h"
 
 namespace tmb { class MessageBus; }
@@ -39,7 +40,6 @@ namespace tmb { class MessageBus; }
 namespace quickstep {
 
 class QueryHandle;
-class ShiftbossDirectory;
 
 namespace serialization { class WorkOrderMessage; }
 
@@ -57,13 +57,15 @@ class QueryManagerDistributed final : public QueryManagerBase {
    * @brief Constructor.
    *
    * @param query_handle The QueryHandle object for this query.
-   * @param shiftboss_directory The ShiftbossDirectory to use.
    * @param foreman_client_id The TMB client ID of the foreman thread.
+   * @param num_shiftbosses The number of Shiftbosses for rebuild.
+   * @param shiftboss_addresses The TMB Address of Shiftbosses for rebuild.
    * @param bus The TMB used for communication.
    **/
   QueryManagerDistributed(QueryHandle *query_handle,
-                          const ShiftbossDirectory *shiftboss_directory,
                           const tmb::client_id foreman_client_id,
+                          const std::size_t num_shiftbosses,
+                          tmb::Address &&shiftboss_addresses,
                           tmb::MessageBus *bus);
 
   ~QueryManagerDistributed() override {}
@@ -153,9 +155,12 @@ class QueryManagerDistributed final : public QueryManagerBase {
            (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
   }
 
-  const ShiftbossDirectory *shiftboss_directory_;
-
   const tmb::client_id foreman_client_id_;
+
+  // TODO(quickstep-team): deal with Shiftboss failure.
+  const std::size_t num_shiftbosses_;
+  const tmb::Address shiftboss_addresses_;
+
   tmb::MessageBus *bus_;
 
   std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_;