You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/08 19:43:05 UTC

[1/3] incubator-quickstep git commit: Refactor building.md [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/aggregate-on-left-outer-join 1db1e0887 -> ccd707b3b (forced update)


Refactor building.md


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/712ffd12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/712ffd12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/712ffd12

Branch: refs/heads/aggregate-on-left-outer-join
Commit: 712ffd126e9bf219a8278a71a7fdddf9bb2fbf92
Parents: a9fe07d
Author: cramja <ma...@gmail.com>
Authored: Tue Feb 7 18:41:16 2017 -0600
Committer: cramja <ma...@gmail.com>
Committed: Wed Feb 8 12:53:51 2017 -0600

----------------------------------------------------------------------
 BUILDING.md | 165 +++++++++++++++++++++++++++++++------------------------
 1 file changed, 92 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/712ffd12/BUILDING.md
----------------------------------------------------------------------
diff --git a/BUILDING.md b/BUILDING.md
index 02a3a58..235fa03 100644
--- a/BUILDING.md
+++ b/BUILDING.md
@@ -1,60 +1,71 @@
-Quickstep Build Guide
-=====================
+# Quickstep Build Guide
 
+**Contents**
+* [Basic Instructions](#basic-instructions)
+  * [Prerequisites](#prerequisites)
+  * [Building](#building)
+  * [Running Quickstep](#running-quickstep)
+  * [Running Tests](#running-tests)
+  * [Configuring with CMake](#configuring-with-cmake)
+* [Advanced Configuration](#advanced-configuration)
+* [Appendix](#appendix)
+  * [Building on Windows](#building-on-windows)
+  * [Building in Vagrant](#building-in-vagrant)
 
-What You Will Need
-------------------
 
-To build quickstep, you will need a C++ compiler that supports the C++14
-standard (GCC 4.9 or Clang 3.4 or higher are known to work), and CMake. If you
-have GNU Bison and Flex as well, they will be used to build the parser and
-lexer sources (otherwise, preprocessed sources made with Bison and Flex will be
-used).
+**Short Version**
 
-### Vagrant
+```sh
+cd third_party
+./download_and_patch_prerequisites.sh
+cd ../build
+cmake ..
+make quickstep_cli_shell
+./quickstep_cli_shell -initialize_db=true
+```
 
-For your convenience, we have provided Vagrant virtual machine configurations
-that have a complete development environment for Quickstep with all necessary
-tools and dependencies already installed. [See here for instructions on how to
-use them](build/vagrant/README.md).
+# Basic Instructions
 
-### Getting CMake
+## Prerequisites
 
-Quickstep uses the CMake build system.
+- C++ compiler that supports the C++14 standard (GCC 4.9+ or Clang 3.4+ are good)
+- [cmake](http://www.cmake.org/download/) 2.8.6+
+- curl
 
-If you're on a Linux machine, most distributions' package managers have a
-package for CMake that you can install. The same goes for all of the major
-flavors of BSD UNIX (Free/Net/Open/Dragonfly), OpenSolaris, and Cygwin.
+All these programs should be available on your distro's package manager.
 
-If you're using Mac OS X or Windows, you can download CMake binaries from:
-http://www.cmake.org/download/
+**Optional**
 
-On Mac or Windows, be sure to let the installer put links to the CMake command-
-line tools in bin or add them to your PATH.
+- GNU Bison and Flex (They will be used to build the parser and lexer, but pre-processed copies are provided)
 
-### Special Note: Building on Windows
+## Building
 
-To build on Windows, you will need some variety of Microsoft's C++ compiler and
-the nmake tool (either from Visual Studio, Visual C++ Express, or the Windows
-SDK). Only Visual Studio 2015 or higher is sufficiently modern to build
-Quickstep.
+Once cmake finishes, you are ready to actually build quickstep by running
+`make` (or `nmake` on Windows) (this will also build bundled third-party
+libraries as necesary). If you want to see the actual commands that make is
+running, you can do `make VERBOSE=1`. It is highly recommended to do a parallel
+make to speed up the build time, which you can do with `make -jX`, where X is
+the number of parallel jobs (the number of CPU cores on your system is a good
+choice, unless you are low on RAM, in which case you may want to reduce the
+number of jobs).
 
-Once you have the necessary tools installed, run the "Visual Studio Command
-Prompt" (use the 64-bit version if you have it). Change into the build
-directory and run:
+## Running Quickstep
 
-    cmake -G "NMake Makefiles" ..
+To use quickstep, just run `quickstep_cli_shell` in the build directory. For the
+first time user, run once with `-initialize_db=true` to set up an empty catalog.
+Quickstep has number of command-line flags that control its behavior. Run
+`quickstep_cli_shell --help` to see a listing of the options and how to use
+them.
 
-The `-G "NMake Makefiles"` option tells CMake to generate makefiles for the nmake
-tool instead of project files for Visual Studio. You can also specify the usual
-cmake options described below like `-D CMAKE_BUILD_TYPE=Release`.
+## Running Tests
 
-Once cmake finishes, run `nmake` to actually build quickstep. Unfortunately,
-nmake does not support parallel jobs like UNIX make, so you're in for a bit of
-a wait.
+Quickstep comes with an extensive suite of unit tests. After a successful
+build, you can run the whole test suite by doing `make test` or `ctest`. If
+you use `ctest`, you may also run tests in parallel with `ctest -jX`, where
+X is the number of parallel jobs (as with `make`, your number of CPU cores is
+usually a good choice).
 
-Configuring with CMake
-----------------------
+## Configuring with CMake
 
 CMake recommends building outside of the source tree (a recommendation which we
 follow). For your convenience, a "build" directory with a skeleton of files
@@ -65,21 +76,25 @@ Like a conventional configure script, you can configure some settings about how
 quickstep is built when you invoke cmake. The most important is the build type.
 You can build an unoptimized build with debugging information by doing:
 
-    cmake -D CMAKE_BUILD_TYPE=Debug ..
+```
+cmake -D CMAKE_BUILD_TYPE=Debug ..
+```
 
 You can build a fast, optimized release build by doing:
 
-    cmake -D CMAKE_BUILD_TYPE=Release ..
+```
+cmake -D CMAKE_BUILD_TYPE=Release ..
+```
 
 The first time you check out the Quickstep source repo, you will also need to
-fetch some third-party dependencies that are packaged as git submodules. Do
-this by running the following 2 commands in the root quickstep directory:
+fetch some third-party dependencies. Do this by running the following commands 
+in the root quickstep directory:
 
-    git submodule init
-    git submodule update
-    cd third_party && ./download_and_patch_prerequisites.sh
+```
+cd third_party && ./download_and_patch_prerequisites.sh
+```
 
-### Advanced Configuration
+# Advanced Configuration
 
 There are a number of advanced options you can pass to CMake to control how
 Quickstep is built. These all have sensible defaults, so you may skip this
@@ -91,7 +106,9 @@ section and go straight to "Building" below if you are not interested.
   `CMAKE_C_COMPILER` and `CMAKE_CXX_COMPILER` options. For example, if you
   wish to use clang instead of gcc, you would do this:
 
-      cmake -D CMAKE_BUILD_TYPE=Release -D CMAKE_C_COMPILER=clang -D CMAKE_CXX_COMPILER=clang++ ../
+```
+cmake -D CMAKE_BUILD_TYPE=Release -D CMAKE_C_COMPILER=clang -D CMAKE_CXX_COMPILER=clang++ ../
+```
 
 * **Disabling TCMalloc**: You can configure whether quickstep should use
   tcmalloc (it does by default). tcmalloc stands for thread-cacheing malloc, it
@@ -160,6 +177,7 @@ section and go straight to "Building" below if you are not interested.
   default, the Quickstep storage engine will always try to rebuild an index if
   it runs out of space, but this behavior can be disabled by setting
   `-D REBUILD_INDEX_ON_UPDATE_OVERFLOW=0`.
+
 * **Building With libc++**: The Clang compiler is usually used with the
   system-default C++ standard library (on most Linux systems, this is GNU
   libstdc++, which is packaged with GCC). Clang can also be used with the LLVM
@@ -168,6 +186,7 @@ section and go straight to "Building" below if you are not interested.
   standard library). If you are using Clang on a system that has libc++
   installed but doesn't use it by default, add `-D USE_LIBCXX=1` to make
   Clang use libc++.
+
 * **Link-Time Optimization**: Some compilers support link-time optimization,
   where all the objects linked into an executable are analyzed and optimized
   together as if they were a single translation unit. This potentially enables
@@ -176,32 +195,32 @@ section and go straight to "Building" below if you are not interested.
   release builds with GCC or ICC by doing `-D ENABLE_LTO=1`. Be aware that the
   build may take a very long time.
 
-Building
---------
+# Appendix
 
-Once cmake finishes, you are ready to actually build quickstep by running
-`make` (or `nmake` on Windows) (this will also build bundled third-party
-libraries as necesary). If you want to see the actual commands that make is
-running, you can do `make VERBOSE=1`. It is highly recommended to do a parallel
-make to speed up the build time, which you can do with `make -jX`, where X is
-the number of parallel jobs (the number of CPU cores on your system is a good
-choice, unless you are low on RAM, in which case you may want to reduce the
-number of jobs).
+## Building on Windows
 
-Running Quickstep
------------------
+To build on Windows, you will need some variety of Microsoft's C++ compiler and
+the nmake tool (either from Visual Studio, Visual C++ Express, or the Windows
+SDK). Only Visual Studio 2015 or higher is sufficiently modern to build
+Quickstep.
 
-To use quickstep, just run `quickstep_cli_shell` in the build directory. For the
-first time user, run once with `-initialize_db=true` to set up an empty catalog.
-Quickstep has number of command-line flags that control its behavior. Run
-`quickstep_cli_shell --help` to see a listing of the options and how to use
-them.
+Once you have the necessary tools installed, run the "Visual Studio Command
+Prompt" (use the 64-bit version if you have it). Change into the build
+directory and run:
 
-Running Tests
--------------
+    cmake -G "NMake Makefiles" ..
 
-Quickstep comes with an extensive suite of unit tests. After a successful
-build, you can run the whole test suite by doing `make test` or `ctest`. If
-you use `ctest`, you may also run tests in parallel with `ctest -jX`, where
-X is the number of parallel jobs (as with `make`, your number of CPU cores is
-usually a good choice).
+The `-G "NMake Makefiles"` option tells CMake to generate makefiles for the nmake
+tool instead of project files for Visual Studio. You can also specify the usual
+cmake options described below like `-D CMAKE_BUILD_TYPE=Release`.
+
+Once cmake finishes, run `nmake` to actually build quickstep. Unfortunately,
+nmake does not support parallel jobs like UNIX make, so you're in for a bit of
+a wait.
+
+## Building in Vagrant
+
+For your convenience, we have provided Vagrant virtual machine configurations
+that have a complete development environment for Quickstep with all necessary
+tools and dependencies already installed. [See here for instructions on how to
+use them](build/vagrant/README.md).


[2/3] incubator-quickstep git commit: Fuse Aggregate with LeftOuterJoin to accelerate evaluation.

Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index df4114d..33321d3 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -33,6 +33,9 @@ set_gflags_lib_name ()
 
 # Declare micro-libs:
 add_library(quickstep_relationaloperators_AggregationOperator AggregationOperator.cpp AggregationOperator.hpp)
+add_library(quickstep_relationaloperators_BuildAggregationExistenceMapOperator
+            BuildAggregationExistenceMapOperator.cpp
+            BuildAggregationExistenceMapOperator.hpp)
 add_library(quickstep_relationaloperators_BuildHashOperator BuildHashOperator.cpp BuildHashOperator.hpp)
 add_library(quickstep_relationaloperators_BuildLIPFilterOperator BuildLIPFilterOperator.cpp BuildLIPFilterOperator.hpp)
 add_library(quickstep_relationaloperators_CreateIndexOperator CreateIndexOperator.cpp CreateIndexOperator.hpp)
@@ -95,6 +98,27 @@ target_link_libraries(quickstep_relationaloperators_AggregationOperator
                       quickstep_utility_lipfilter_LIPFilterAdaptiveProber
                       quickstep_utility_lipfilter_LIPFilterUtil
                       tmb)
+target_link_libraries(quickstep_relationaloperators_BuildAggregationExistenceMapOperator
+                      glog
+                      quickstep_catalog_CatalogRelation
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_WorkOrderProtosContainer
+                      quickstep_queryexecution_WorkOrdersContainer
+                      quickstep_relationaloperators_RelationalOperator
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_storage_AggregationOperationState
+                      quickstep_storage_StorageBlock
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageManager
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_types_TypeID
+                      quickstep_utility_BarrieredReadWriteConcurrentBitVector
+                      quickstep_utility_Macros
+                      tmb)
 target_link_libraries(quickstep_relationaloperators_BuildHashOperator
                       glog
                       quickstep_catalog_CatalogRelation
@@ -552,6 +576,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrder_proto
 add_library(quickstep_relationaloperators ../empty_src.cpp RelationalOperatorsModule.hpp)
 target_link_libraries(quickstep_relationaloperators
                       quickstep_relationaloperators_AggregationOperator
+                      quickstep_relationaloperators_BuildAggregationExistenceMapOperator
                       quickstep_relationaloperators_BuildLIPFilterOperator
                       quickstep_relationaloperators_BuildHashOperator
                       quickstep_relationaloperators_CreateIndexOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 0b34908..2d85312 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -412,12 +412,23 @@ std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
   }
 }
 
+BarrieredReadWriteConcurrentBitVector* AggregationOperationState
+    ::getExistenceMap() const {
+  if (is_aggregate_collision_free_) {
+    return static_cast<CollisionFreeVectorTable *>(
+        collision_free_hashtable_.get())->getExistenceMap();
+  } else {
+    LOG(FATAL) << "AggregationOperationState::getExistenceMap() "
+               << "is not supported by this aggregation";
+  }
+}
+
 void AggregationOperationState::initialize(const std::size_t partition_id) {
   if (is_aggregate_collision_free_) {
     static_cast<CollisionFreeVectorTable *>(
         collision_free_hashtable_.get())->initialize(partition_id);
   } else {
-    LOG(FATAL) << "AggregationOperationState::initializeState() "
+    LOG(FATAL) << "AggregationOperationState::initialize() "
                << "is not supported by this aggregation";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 13ee377..23f23b0 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -39,6 +39,7 @@ namespace quickstep {
 namespace serialization { class AggregationOperationState; }
 
 class AggregateFunction;
+class BarrieredReadWriteConcurrentBitVector;
 class CatalogDatabaseLite;
 class CatalogRelationSchema;
 class InsertDestination;
@@ -167,6 +168,8 @@ class AggregationOperationState {
    **/
   std::size_t getNumFinalizationPartitions() const;
 
+  BarrieredReadWriteConcurrentBitVector* getExistenceMap() const;
+
   /**
    * @brief Initialize the specified partition of this aggregation.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 4f3e238..cd76854 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -104,6 +104,10 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
     return existence_map_->onesCountInRange(start_position, end_position);
   }
 
+  inline BarrieredReadWriteConcurrentBitVector* getExistenceMap() const {
+    return existence_map_.get();
+  }
+
   /**
    * @brief Initialize the specified partition of this aggregation table.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/utility/lip_filter/BitVectorExactFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/BitVectorExactFilter.hpp b/utility/lip_filter/BitVectorExactFilter.hpp
index 6ad0567..7c4c238 100644
--- a/utility/lip_filter/BitVectorExactFilter.hpp
+++ b/utility/lip_filter/BitVectorExactFilter.hpp
@@ -20,9 +20,8 @@
 #ifndef QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
 #define QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
 
-#include <atomic>
+#include <cstddef>
 #include <cstdint>
-#include <cstring>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
@@ -31,6 +30,7 @@
 #include "storage/ValueAccessor.hpp"
 #include "storage/ValueAccessorUtil.hpp"
 #include "types/Type.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
 #include "utility/Macros.hpp"
 #include "utility/lip_filter/LIPFilter.hpp"
 
@@ -64,14 +64,10 @@ class BitVectorExactFilter : public LIPFilter {
       : LIPFilter(LIPFilterType::kBitVectorExactFilter),
         min_value_(static_cast<CppType>(min_value)),
         max_value_(static_cast<CppType>(max_value)),
-        bit_array_(GetByteSize(max_value - min_value + 1)) {
+        bit_vector_(max_value - min_value + 1) {
     DCHECK_EQ(min_value, static_cast<std::int64_t>(min_value_));
     DCHECK_EQ(max_value, static_cast<std::int64_t>(max_value_));
     DCHECK_GE(max_value_, min_value_);
-
-    std::memset(bit_array_.data(),
-                0x0,
-                sizeof(std::atomic<std::uint8_t>) * GetByteSize(max_value - min_value + 1));
   }
 
   void insertValueAccessor(ValueAccessor *accessor,
@@ -109,13 +105,6 @@ class BitVectorExactFilter : public LIPFilter {
 
  private:
   /**
-   * @brief Round up bit_size to multiples of 8.
-   */
-  inline static std::size_t GetByteSize(const std::size_t bit_size) {
-    return (bit_size + 7u) / 8u;
-  }
-
-  /**
    * @brief Iterate through the accessor and hash values into the internal bit
    *        array.
    */
@@ -164,8 +153,7 @@ class BitVectorExactFilter : public LIPFilter {
     DCHECK_GE(value, min_value_);
     DCHECK_LE(value, max_value_);
 
-    const CppType loc = value - min_value_;
-    bit_array_[loc >> 3u].fetch_or(1u << (loc & 7u), std::memory_order_relaxed);
+    bit_vector_.setBit(value - min_value_);
   }
 
   /**
@@ -177,9 +165,7 @@ class BitVectorExactFilter : public LIPFilter {
       return is_anti_filter;
     }
 
-    const CppType loc = value - min_value_;
-    const bool is_bit_set =
-        (bit_array_[loc >> 3u].load(std::memory_order_relaxed) & (1u << (loc & 7u))) != 0;
+    const bool is_bit_set = bit_vector_.getBit(value - min_value_);
 
     if (is_anti_filter) {
       return !is_bit_set;
@@ -190,7 +176,7 @@ class BitVectorExactFilter : public LIPFilter {
 
   const CppType min_value_;
   const CppType max_value_;
-  alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+  BarrieredReadWriteConcurrentBitVector bit_vector_;
 
   DISALLOW_COPY_AND_ASSIGN(BitVectorExactFilter);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index edd0d24..0a820cf 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -39,8 +39,9 @@ target_link_libraries(quickstep_utility_lipfilter_BitVectorExactFilter
                       quickstep_storage_ValueAccessor
                       quickstep_storage_ValueAccessorUtil
                       quickstep_types_Type
-                      quickstep_utility_lipfilter_LIPFilter
-                      quickstep_utility_Macros)
+                      quickstep_utility_BarrieredReadWriteConcurrentBitVector
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter)
 target_link_libraries(quickstep_utility_lipfilter_LIPFilter
                       quickstep_catalog_CatalogTypedefs
                       quickstep_storage_StorageBlockInfo
@@ -83,5 +84,6 @@ target_link_libraries(quickstep_utility_lipfilter_SingleIdentityHashFilter
                       quickstep_storage_ValueAccessor
                       quickstep_storage_ValueAccessorUtil
                       quickstep_types_Type
-                      quickstep_utility_lipfilter_LIPFilter
-                      quickstep_utility_Macros)
+                      quickstep_utility_BarrieredReadWriteConcurrentBitVector
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/utility/lip_filter/SingleIdentityHashFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/SingleIdentityHashFilter.hpp b/utility/lip_filter/SingleIdentityHashFilter.hpp
index 5c0e8a2..6eaa93e 100644
--- a/utility/lip_filter/SingleIdentityHashFilter.hpp
+++ b/utility/lip_filter/SingleIdentityHashFilter.hpp
@@ -20,10 +20,8 @@
 #ifndef QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
 #define QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
 
-#include <atomic>
 #include <cstddef>
 #include <cstdint>
-#include <cstring>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
@@ -32,6 +30,7 @@
 #include "storage/ValueAccessor.hpp"
 #include "storage/ValueAccessorUtil.hpp"
 #include "types/Type.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
 #include "utility/Macros.hpp"
 #include "utility/lip_filter/LIPFilter.hpp"
 
@@ -65,11 +64,8 @@ class SingleIdentityHashFilter : public LIPFilter {
   explicit SingleIdentityHashFilter(const std::size_t filter_cardinality)
       : LIPFilter(LIPFilterType::kSingleIdentityHashFilter),
         filter_cardinality_(filter_cardinality),
-        bit_array_(GetByteSize(filter_cardinality)) {
+        bit_vector_(filter_cardinality) {
     DCHECK_GE(filter_cardinality, 1u);
-    std::memset(bit_array_.data(),
-                0x0,
-                sizeof(std::atomic<std::uint8_t>) * GetByteSize(filter_cardinality));
   }
 
   void insertValueAccessor(ValueAccessor *accessor,
@@ -158,8 +154,9 @@ class SingleIdentityHashFilter : public LIPFilter {
    * @brief Inserts a given value into the hash filter.
    */
   inline void insert(const void *key_begin) {
-    const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
-    bit_array_[hash >> 3u].fetch_or(1u << (hash & 7u), std::memory_order_relaxed);
+    const CppType hash =
+        *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+    bit_vector_.setBit(hash);
   }
 
   /**
@@ -168,12 +165,13 @@ class SingleIdentityHashFilter : public LIPFilter {
    *        If false is returned, a value is certainly not present in the hash filter.
    */
   inline bool contains(const void *key_begin) const {
-    const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
-    return (bit_array_[hash >> 3u].load(std::memory_order_relaxed) & (1u << (hash & 7u)));
+    const CppType hash =
+        *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+    return bit_vector_.getBit(hash);
   }
 
   std::size_t filter_cardinality_;
-  alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+  BarrieredReadWriteConcurrentBitVector bit_vector_;
 
   DISALLOW_COPY_AND_ASSIGN(SingleIdentityHashFilter);
 };


[3/3] incubator-quickstep git commit: Fuse Aggregate with LeftOuterJoin to accelerate evaluation.

Posted by ji...@apache.org.
Fuse Aggregate with LeftOuterJoin to accelerate evaluation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ccd707b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ccd707b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ccd707b3

Branch: refs/heads/aggregate-on-left-outer-join
Commit: ccd707b3b2cb4655739d3cd0de8c2b62e2041cf5
Parents: 712ffd1
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Mon Jan 30 14:46:39 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Wed Feb 8 13:43:33 2017 -0600

----------------------------------------------------------------------
 query_optimizer/CMakeLists.txt                  |   6 +-
 query_optimizer/ExecutionGenerator.cpp          | 261 +++++++++++--------
 query_optimizer/ExecutionGenerator.hpp          |  20 +-
 query_optimizer/PhysicalGenerator.cpp           |   3 +
 query_optimizer/cost_model/CMakeLists.txt       |   7 +
 query_optimizer/cost_model/SimpleCostModel.cpp  |   9 +
 query_optimizer/cost_model/SimpleCostModel.hpp  |   5 +
 .../cost_model/StarSchemaSimpleCostModel.cpp    | 147 ++++++++++-
 .../cost_model/StarSchemaSimpleCostModel.hpp    |  20 ++
 query_optimizer/physical/CMakeLists.txt         |  14 +
 .../CrossReferenceCoalesceAggregate.cpp         | 105 ++++++++
 .../CrossReferenceCoalesceAggregate.hpp         | 176 +++++++++++++
 query_optimizer/physical/PatternMatcher.hpp     |   3 +
 query_optimizer/physical/PhysicalType.hpp       |   1 +
 query_optimizer/rules/BottomUpRule.hpp          |  39 +--
 query_optimizer/rules/CMakeLists.txt            |  23 ++
 query_optimizer/rules/FuseAggregateJoin.cpp     | 164 ++++++++++++
 query_optimizer/rules/FuseAggregateJoin.hpp     |  69 +++++
 .../BuildAggregationExistenceMapOperator.cpp    | 148 +++++++++++
 .../BuildAggregationExistenceMapOperator.hpp    | 146 +++++++++++
 relational_operators/CMakeLists.txt             |  25 ++
 storage/AggregationOperationState.cpp           |  13 +-
 storage/AggregationOperationState.hpp           |   3 +
 storage/CollisionFreeVectorTable.hpp            |   4 +
 utility/lip_filter/BitVectorExactFilter.hpp     |  26 +-
 utility/lip_filter/CMakeLists.txt               |  10 +-
 utility/lip_filter/SingleIdentityHashFilter.hpp |  20 +-
 27 files changed, 1285 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index e750a1e..3ff783c 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -64,7 +64,6 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_expressions_Expressions_proto
                       quickstep_expressions_aggregation_AggregateFunction
                       quickstep_expressions_aggregation_AggregateFunction_proto
-                      quickstep_expressions_aggregation_AggregationID
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
                       quickstep_expressions_scalar_ScalarAttribute
@@ -95,6 +94,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_queryoptimizer_physical_CopyFrom
                       quickstep_queryoptimizer_physical_CreateIndex
                       quickstep_queryoptimizer_physical_CreateTable
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
                       quickstep_queryoptimizer_physical_DeleteTuples
                       quickstep_queryoptimizer_physical_DropTable
                       quickstep_queryoptimizer_physical_FilterJoin
@@ -116,6 +116,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_queryoptimizer_physical_UpdateTable
                       quickstep_queryoptimizer_physical_WindowAggregate
                       quickstep_relationaloperators_AggregationOperator
+                      quickstep_relationaloperators_BuildAggregationExistenceMapOperator
                       quickstep_relationaloperators_BuildHashOperator
                       quickstep_relationaloperators_BuildLIPFilterOperator
                       quickstep_relationaloperators_CreateIndexOperator
@@ -147,12 +148,10 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_storage_StorageBlockLayout_proto
                       quickstep_storage_SubBlockTypeRegistry
                       quickstep_types_Type
-                      quickstep_types_TypeID
                       quickstep_types_Type_proto
                       quickstep_types_TypedValue
                       quickstep_types_TypedValue_proto
                       quickstep_types_containers_Tuple_proto
-                      quickstep_utility_EqualsAnyConstant
                       quickstep_utility_Macros
                       quickstep_utility_SqlError)
 if (ENABLE_DISTRIBUTED)
@@ -213,6 +212,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_rules_AttachLIPFilters
+                      quickstep_queryoptimizer_rules_FuseAggregateJoin
                       quickstep_queryoptimizer_rules_InjectJoinFilters
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_PushDownLowCostDisjunctivePredicate

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 1b50caa..1b41cdd 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -49,7 +49,6 @@
 #include "expressions/Expressions.pb.h"
 #include "expressions/aggregation/AggregateFunction.hpp"
 #include "expressions/aggregation/AggregateFunction.pb.h"
-#include "expressions/aggregation/AggregationID.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
@@ -72,9 +71,11 @@
 #include "query_optimizer/expressions/Scalar.hpp"
 #include "query_optimizer/expressions/ScalarLiteral.hpp"
 #include "query_optimizer/expressions/WindowAggregateFunction.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/CopyFrom.hpp"
 #include "query_optimizer/physical/CreateIndex.hpp"
 #include "query_optimizer/physical/CreateTable.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/DeleteTuples.hpp"
 #include "query_optimizer/physical/DropTable.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
@@ -96,6 +97,7 @@
 #include "query_optimizer/physical/UpdateTable.hpp"
 #include "query_optimizer/physical/WindowAggregate.hpp"
 #include "relational_operators/AggregationOperator.hpp"
+#include "relational_operators/BuildAggregationExistenceMapOperator.hpp"
 #include "relational_operators/BuildHashOperator.hpp"
 #include "relational_operators/BuildLIPFilterOperator.hpp"
 #include "relational_operators/CreateIndexOperator.hpp"
@@ -128,11 +130,9 @@
 #include "storage/SubBlockTypeRegistry.hpp"
 #include "types/Type.hpp"
 #include "types/Type.pb.h"
-#include "types/TypeID.hpp"
 #include "types/TypedValue.hpp"
 #include "types/TypedValue.pb.h"
 #include "types/containers/Tuple.pb.h"
-#include "utility/EqualsAnyConstant.hpp"
 #include "utility/SqlError.hpp"
 
 #include "gflags/gflags.h"
@@ -163,10 +163,6 @@ static const volatile bool aggregate_hashtable_type_dummy
 
 DEFINE_bool(parallelize_load, true, "Parallelize loading data files.");
 
-DEFINE_int64(collision_free_vector_table_max_size, 1000000000,
-              "The maximum allowed key range (number of entries) for using a "
-              "CollisionFreeVectorTable.");
-
 namespace E = ::quickstep::optimizer::expressions;
 namespace P = ::quickstep::optimizer::physical;
 namespace S = ::quickstep::serialization;
@@ -266,6 +262,9 @@ void ExecutionGenerator::generatePlanInternal(
     case P::PhysicalType::kAggregate:
       return convertAggregate(
           std::static_pointer_cast<const P::Aggregate>(physical_plan));
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+      return convertCrossReferenceCoalesceAggregate(
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
     case P::PhysicalType::kCopyFrom:
       return convertCopyFrom(
           std::static_pointer_cast<const P::CopyFrom>(physical_plan));
@@ -379,105 +378,6 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
   }
 }
 
-bool ExecutionGenerator::canUseCollisionFreeAggregation(
-    const P::AggregatePtr &aggregate,
-    const std::size_t estimated_num_groups,
-    std::size_t *max_num_groups) const {
-#ifdef QUICKSTEP_DISTRIBUTED
-  // Currently we cannot do this fast path with the distributed setting. See
-  // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
-  // FinalizeAggregationOperator::getAllWorkOrderProtos().
-  return false;
-#endif
-
-  // Supports only single group-by key.
-  if (aggregate->grouping_expressions().size() != 1) {
-    return false;
-  }
-
-  // We need to know the exact min/max stats of the group-by key.
-  // So it must be a CatalogAttribute (but not an expression).
-  E::AttributeReferencePtr group_by_key_attr;
-  const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
-  if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
-    return false;
-  }
-
-  bool min_value_stat_is_exact;
-  bool max_value_stat_is_exact;
-  const TypedValue min_value =
-      cost_model_for_aggregation_->findMinValueStat(
-          aggregate, group_by_key_attr, &min_value_stat_is_exact);
-  const TypedValue max_value =
-      cost_model_for_aggregation_->findMaxValueStat(
-          aggregate, group_by_key_attr, &max_value_stat_is_exact);
-  if (min_value.isNull() || max_value.isNull() ||
-      (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
-    return false;
-  }
-
-  std::int64_t min_cpp_value;
-  std::int64_t max_cpp_value;
-  switch (group_by_key_attr->getValueType().getTypeID()) {
-    case TypeID::kInt: {
-      min_cpp_value = min_value.getLiteral<int>();
-      max_cpp_value = max_value.getLiteral<int>();
-      break;
-    }
-    case TypeID::kLong: {
-      min_cpp_value = min_value.getLiteral<std::int64_t>();
-      max_cpp_value = max_value.getLiteral<std::int64_t>();
-      break;
-    }
-    default:
-      return false;
-  }
-
-  // TODO(jianqiao):
-  // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
-  // 2. Reason about the table size bound (e.g. by checking memory size) instead
-  //    of hardcoding it as a gflag.
-  if (min_cpp_value < 0 ||
-      max_cpp_value >= FLAGS_collision_free_vector_table_max_size ||
-      max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
-    return false;
-  }
-
-  for (const auto &agg_expr : aggregate->aggregate_expressions()) {
-    const E::AggregateFunctionPtr agg_func =
-        std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
-
-    if (agg_func->is_distinct()) {
-      return false;
-    }
-
-    // TODO(jianqiao): Support AggregationID::AVG.
-    if (!QUICKSTEP_EQUALS_ANY_CONSTANT(agg_func->getAggregate().getAggregationID(),
-                                       AggregationID::kCount,
-                                       AggregationID::kSum)) {
-      return false;
-    }
-
-    const auto &arguments = agg_func->getArguments();
-    if (arguments.size() > 1u) {
-      return false;
-    }
-
-    if (arguments.size() == 1u) {
-      if (!QUICKSTEP_EQUALS_ANY_CONSTANT(arguments.front()->getValueType().getTypeID(),
-                                         TypeID::kInt,
-                                         TypeID::kLong,
-                                         TypeID::kFloat,
-                                         TypeID::kDouble)) {
-        return false;
-      }
-    }
-  }
-
-  *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
-  return true;
-}
-
 void ExecutionGenerator::convertNamedExpressions(
     const std::vector<E::NamedExpressionPtr> &named_expressions,
     S::QueryContext::ScalarGroup *scalar_group_proto) {
@@ -1608,9 +1508,10 @@ void ExecutionGenerator::convertAggregate(
         cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
 
     std::size_t max_num_groups;
-    if (canUseCollisionFreeAggregation(physical_plan,
-                                       estimated_num_groups,
-                                       &max_num_groups)) {
+    if (cost_model_for_aggregation_
+            ->canUseCollisionFreeAggregation(physical_plan,
+                                             estimated_num_groups,
+                                             &max_num_groups)) {
       aggr_state_proto->set_hash_table_impl_type(
           serialization::HashTableImplType::COLLISION_FREE_VECTOR);
       aggr_state_proto->set_estimated_num_entries(max_num_groups);
@@ -1730,6 +1631,148 @@ void ExecutionGenerator::convertAggregate(
   }
 }
 
+void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
+    const P::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+  DCHECK_EQ(1u, physical_plan->left_join_attributes().size());
+  DCHECK_EQ(1u, physical_plan->right_join_attributes().size());
+
+  const CatalogRelationInfo *left_relation_info =
+      findRelationInfoOutputByPhysical(physical_plan->left_child());
+  const CatalogRelationInfo *right_relation_info =
+      findRelationInfoOutputByPhysical(physical_plan->right_child());
+
+  // Create aggr state proto.
+  const QueryContext::aggregation_state_id aggr_state_index =
+      query_context_proto_->aggregation_states_size();
+  S::AggregationOperationState *aggr_state_proto = query_context_proto_->add_aggregation_states();
+
+  aggr_state_proto->set_relation_id(right_relation_info->relation->getID());
+
+  // Group by the right join attribute.
+  std::unique_ptr<const Scalar> execution_group_by_expression(
+      physical_plan->right_join_attributes().front()->concretize(
+          attribute_substitution_map_));
+  aggr_state_proto->add_group_by_expressions()->CopyFrom(
+      execution_group_by_expression->getProto());
+
+  aggr_state_proto->set_hash_table_impl_type(
+      serialization::HashTableImplType::COLLISION_FREE_VECTOR);
+  aggr_state_proto->set_estimated_num_entries(
+      physical_plan->hash_table_num_entries());
+
+  if (physical_plan->right_filter_predicate() != nullptr) {
+    std::unique_ptr<const Predicate> predicate(
+        convertPredicate(physical_plan->right_filter_predicate()));
+    aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
+  }
+
+  for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
+    const E::AggregateFunctionPtr unnamed_aggregate_expression =
+        std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression());
+
+    // Add a new entry in 'aggregates'.
+    S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
+
+    // Set the AggregateFunction.
+    aggr_proto->mutable_function()->CopyFrom(
+        unnamed_aggregate_expression->getAggregate().getProto());
+
+    // Add each of the aggregate's arguments.
+    for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
+      unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
+      aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto());
+    }
+
+    // Set whether it is a DISTINCT aggregation.
+    DCHECK(!unnamed_aggregate_expression->is_distinct());
+    aggr_proto->set_is_distinct(false);
+  }
+
+  const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index =
+      execution_plan_->addRelationalOperator(
+          new InitializeAggregationOperator(
+              query_handle_->query_id(),
+              aggr_state_index));
+
+  const QueryPlan::DAGNodeIndex build_aggregation_existence_map_operator_index =
+      execution_plan_->addRelationalOperator(
+          new BuildAggregationExistenceMapOperator(
+              query_handle_->query_id(),
+              *left_relation_info->relation,
+              physical_plan->left_join_attributes().front()->id(),
+              left_relation_info->isStoredRelation(),
+              aggr_state_index));
+
+  if (!left_relation_info->isStoredRelation()) {
+    execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index,
+                                         left_relation_info->producer_operator_index,
+                                         false /* is_pipeline_breaker */);
+  }
+
+  const QueryPlan::DAGNodeIndex aggregation_operator_index =
+      execution_plan_->addRelationalOperator(
+          new AggregationOperator(
+              query_handle_->query_id(),
+              *right_relation_info->relation,
+              right_relation_info->isStoredRelation(),
+              aggr_state_index));
+
+  if (!right_relation_info->isStoredRelation()) {
+    execution_plan_->addDirectDependency(aggregation_operator_index,
+                                         right_relation_info->producer_operator_index,
+                                         false /* is_pipeline_breaker */);
+  }
+
+  // Build aggregation existence map once initialization is done.
+  execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index,
+                                       initialize_aggregation_operator_index,
+                                       true /* is_pipeline_breaker */);
+
+  // Start aggregation after building existence map.
+  execution_plan_->addDirectDependency(aggregation_operator_index,
+                                       build_aggregation_existence_map_operator_index,
+                                       true /* is_pipeline_breaker */);
+
+
+  // Create InsertDestination proto.
+  const CatalogRelation *output_relation = nullptr;
+  const QueryContext::insert_destination_id insert_destination_index =
+      query_context_proto_->insert_destinations_size();
+  S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+  createTemporaryCatalogRelation(physical_plan,
+                                 &output_relation,
+                                 insert_destination_proto);
+
+  const QueryPlan::DAGNodeIndex finalize_aggregation_operator_index =
+      execution_plan_->addRelationalOperator(
+          new FinalizeAggregationOperator(query_handle_->query_id(),
+                                          aggr_state_index,
+                                          *output_relation,
+                                          insert_destination_index));
+
+  insert_destination_proto->set_relational_op_index(finalize_aggregation_operator_index);
+
+  execution_plan_->addDirectDependency(finalize_aggregation_operator_index,
+                                       aggregation_operator_index,
+                                       true /* is_pipeline_breaker */);
+
+  physical_to_output_relation_map_.emplace(
+      std::piecewise_construct,
+      std::forward_as_tuple(physical_plan),
+      std::forward_as_tuple(finalize_aggregation_operator_index, output_relation));
+  temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index,
+                                            output_relation);
+
+  const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index =
+      execution_plan_->addRelationalOperator(
+          new DestroyAggregationStateOperator(query_handle_->query_id(),
+                                              aggr_state_index));
+
+  execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index,
+                                       finalize_aggregation_operator_index,
+                                       true);
+}
+
 void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
   // Create sort configuration for run generation.
   vector<bool> sort_ordering(physical_sort->sort_ascending());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index 987f11a..f4e614a 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -46,6 +46,7 @@
 #include "query_optimizer/physical/CopyFrom.hpp"
 #include "query_optimizer/physical/CreateIndex.hpp"
 #include "query_optimizer/physical/CreateTable.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/DeleteTuples.hpp"
 #include "query_optimizer/physical/DropTable.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
@@ -206,22 +207,6 @@ class ExecutionGenerator {
   std::string getNewRelationName();
 
   /**
-   * @brief Checks whether an aggregate node can be efficiently evaluated with
-   *        the collision-free aggregation fast path.
-   *
-   * @param aggregate The physical aggregate node to be checked.
-   * @param estimated_num_groups The estimated number of groups for the aggregate.
-   * @param exact_num_groups If collision-free aggregation is applicable, the
-   *        pointed content of this pointer will be set as the maximum possible
-   *        number of groups that the collision-free hash table need to hold.
-   * @return A bool value indicating whether collision-free aggregation can be
-   *         used to evaluate \p aggregate.
-   */
-  bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate,
-                                      const std::size_t estimated_num_groups,
-                                      std::size_t *max_num_groups) const;
-
-  /**
    * @brief Sets up the info of the CatalogRelation represented by TableReference.
    *        TableReference is not converted to any operator.
    *
@@ -356,6 +341,9 @@ class ExecutionGenerator {
    */
   void convertAggregate(const physical::AggregatePtr &physical_plan);
 
+  void convertCrossReferenceCoalesceAggregate(
+      const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
   /**
    * @brief Converts a physical Sort to SortRunGeneration and SortMergeRun.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index 1b68f49..ac51c31 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -27,6 +27,7 @@
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/rules/AttachLIPFilters.hpp"
+#include "query_optimizer/rules/FuseAggregateJoin.hpp"
 #include "query_optimizer/rules/InjectJoinFilters.hpp"
 #include "query_optimizer/rules/PruneColumns.hpp"
 #include "query_optimizer/rules/PushDownLowCostDisjunctivePredicate.hpp"
@@ -145,6 +146,8 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
     rules.emplace_back(new ReorderColumns());
   }
 
+  rules.emplace_back(new FuseAggregateJoin());
+
   // NOTE(jianqiao): Adding rules after InjectJoinFilters (or AttachLIPFilters) requires
   // extra handling of LIPFilterConfiguration for transformed nodes. So currently it is
   // suggested that all the new rules be placed before this point.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/cost_model/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/CMakeLists.txt b/query_optimizer/cost_model/CMakeLists.txt
index 5f28bb3..e222fdd 100644
--- a/query_optimizer/cost_model/CMakeLists.txt
+++ b/query_optimizer/cost_model/CMakeLists.txt
@@ -33,6 +33,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_SimpleCostModel
                       quickstep_catalog_CatalogRelationStatistics
                       quickstep_queryoptimizer_costmodel_CostModel
                       quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
                       quickstep_queryoptimizer_physical_FilterJoin
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_NestedLoopsJoin
@@ -51,7 +52,10 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogRelationStatistics
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggregateFunction
+                      quickstep_expressions_aggregation_AggregationID
                       quickstep_queryoptimizer_costmodel_CostModel
+                      quickstep_queryoptimizer_expressions_AggregateFunction
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_ComparisonExpression
                       quickstep_queryoptimizer_expressions_ExprId
@@ -62,6 +66,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
                       quickstep_queryoptimizer_expressions_PatternMatcher
                       quickstep_queryoptimizer_expressions_Predicate
                       quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
                       quickstep_queryoptimizer_physical_FilterJoin
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_NestedLoopsJoin
@@ -76,7 +81,9 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
                       quickstep_queryoptimizer_physical_TopLevelPlan
                       quickstep_queryoptimizer_physical_WindowAggregate
                       quickstep_types_NullType
+                      quickstep_types_TypeID
                       quickstep_types_TypedValue
+                      quickstep_utility_EqualsAnyConstant
                       quickstep_utility_Macros)
 
 # Module all-in-one library:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/cost_model/SimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.cpp b/query_optimizer/cost_model/SimpleCostModel.cpp
index e9d2e3a..cfd8a75 100644
--- a/query_optimizer/cost_model/SimpleCostModel.cpp
+++ b/query_optimizer/cost_model/SimpleCostModel.cpp
@@ -26,6 +26,7 @@
 #include "catalog/CatalogRelationStatistics.hpp"
 #include "query_optimizer/cost_model/CostModel.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
@@ -74,6 +75,9 @@ std::size_t SimpleCostModel::estimateCardinality(
     case P::PhysicalType::kAggregate:
       return estimateCardinalityForAggregate(
           std::static_pointer_cast<const P::Aggregate>(physical_plan));
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+      return estimateCardinalityForCrossReferenceCoalesceAggregate(
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
     case P::PhysicalType::kSharedSubplanReference: {
       const P::SharedSubplanReferencePtr shared_subplan_reference =
           std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan);
@@ -149,6 +153,11 @@ std::size_t SimpleCostModel::estimateCardinalityForAggregate(
                   estimateCardinality(physical_plan->input()) / 10);
 }
 
+std::size_t SimpleCostModel::estimateCardinalityForCrossReferenceCoalesceAggregate(
+    const physical::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+  return estimateCardinality(physical_plan->left_child());
+}
+
 std::size_t SimpleCostModel::estimateCardinalityForWindowAggregate(
     const physical::WindowAggregatePtr &physical_plan) {
   return estimateCardinality(physical_plan->input());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/cost_model/SimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.hpp b/query_optimizer/cost_model/SimpleCostModel.hpp
index 4edc2fe..0660c37 100644
--- a/query_optimizer/cost_model/SimpleCostModel.hpp
+++ b/query_optimizer/cost_model/SimpleCostModel.hpp
@@ -25,6 +25,7 @@
 
 #include "query_optimizer/cost_model/CostModel.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
@@ -100,6 +101,10 @@ class SimpleCostModel : public CostModel {
   std::size_t estimateCardinalityForAggregate(
       const physical::AggregatePtr &physical_plan);
 
+  // Returns the cardinality of the left child plan.
+  std::size_t estimateCardinalityForCrossReferenceCoalesceAggregate(
+      const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
   // Return the estimated cardinality of the input plan.
   std::size_t estimateCardinalityForWindowAggregate(
       const physical::WindowAggregatePtr &physical_plan);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index 7afa1c3..f546737 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -20,13 +20,18 @@
 #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
 
 #include <algorithm>
+#include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <vector>
 
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogRelationStatistics.hpp"
 #include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "query_optimizer/cost_model/CostModel.hpp"
+#include "query_optimizer/expressions/AggregateFunction.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ComparisonExpression.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
@@ -37,6 +42,7 @@
 #include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/expressions/PatternMatcher.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
@@ -49,8 +55,12 @@
 #include "query_optimizer/physical/TableGenerator.hpp"
 #include "query_optimizer/physical/TableReference.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "types/TypeID.hpp"
 #include "types/TypedValue.hpp"
 #include "types/NullType.hpp"
+#include "utility/EqualsAnyConstant.hpp"
+
+#include "gflags/gflags.h"
 
 #include "glog/logging.h"
 
@@ -58,6 +68,10 @@ namespace quickstep {
 namespace optimizer {
 namespace cost {
 
+DEFINE_int64(collision_free_vector_table_max_size, 1000000000,
+              "The maximum allowed key range (number of entries) for using a "
+              "CollisionFreeVectorTable.");
+
 namespace E = ::quickstep::optimizer::expressions;
 namespace P = ::quickstep::optimizer::physical;
 
@@ -88,6 +102,9 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinality(
     case P::PhysicalType::kAggregate:
       return estimateCardinalityForAggregate(
           std::static_pointer_cast<const P::Aggregate>(physical_plan));
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+      return estimateCardinalityForCrossReferenceCoalesceAggregate(
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
     case P::PhysicalType::kSharedSubplanReference: {
       const P::SharedSubplanReferencePtr shared_subplan_reference =
           std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan);
@@ -175,6 +192,11 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForAggregate(
       estimateNumGroupsForAggregate(physical_plan) * filter_selectivity);
 }
 
+std::size_t StarSchemaSimpleCostModel::estimateCardinalityForCrossReferenceCoalesceAggregate(
+    const P::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+  return estimateCardinality(physical_plan->left_child());
+}
+
 std::size_t StarSchemaSimpleCostModel::estimateCardinalityForWindowAggregate(
     const P::WindowAggregatePtr &physical_plan) {
   return estimateCardinality(physical_plan->input());
@@ -233,6 +255,13 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues(
       }
       break;
     }
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+      const P::PhysicalPtr left_child = physical_plan->children()[0];
+      if (E::ContainsExprId(left_child->getOutputAttributes(), attribute_id)) {
+        return estimateNumDistinctValues(attribute_id, left_child);
+      }
+      break;
+    }
     case P::PhysicalType::kFilterJoin: {
       const P::FilterJoinPtr &filter_join =
           std::static_pointer_cast<const P::FilterJoin>(physical_plan);
@@ -275,6 +304,17 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues(
 double StarSchemaSimpleCostModel::estimateSelectivity(
     const physical::PhysicalPtr &physical_plan) {
   switch (physical_plan->getPhysicalType()) {
+    case P::PhysicalType::kAggregate: {
+      const P::AggregatePtr &aggregate =
+          std::static_pointer_cast<const P::Aggregate>(physical_plan);
+      return estimateSelectivity(aggregate->input()) *
+          estimateSelectivityForFilterPredicate(aggregate);
+    }
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+      const P::CrossReferenceCoalesceAggregatePtr &aggregate_on_left_outer_join =
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan);
+      return estimateSelectivity(aggregate_on_left_outer_join->left_child());
+    }
     case P::PhysicalType::kSelection: {
       const P::SelectionPtr &selection =
           std::static_pointer_cast<const P::Selection>(physical_plan);
@@ -331,6 +371,7 @@ double StarSchemaSimpleCostModel::estimateSelectivity(
 
 double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
     const physical::PhysicalPtr &physical_plan) {
+  P::PhysicalPtr target_plan = physical_plan;
   E::PredicatePtr filter_predicate = nullptr;
   switch (physical_plan->getPhysicalType()) {
     case P::PhysicalType::kSelection:
@@ -340,6 +381,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
     case P::PhysicalType::kAggregate:
       filter_predicate =
           std::static_pointer_cast<const P::Aggregate>(physical_plan)->filter_predicate();
+      target_plan = physical_plan->children()[0];
       break;
     case P::PhysicalType::kHashJoin:
       filter_predicate =
@@ -356,7 +398,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
   if (filter_predicate == nullptr) {
     return 1.0;
   } else {
-    return estimateSelectivityForPredicate(filter_predicate, physical_plan);
+    return estimateSelectivityForPredicate(filter_predicate, target_plan);
   }
 }
 
@@ -443,6 +485,12 @@ bool StarSchemaSimpleCostModel::impliesUniqueAttributes(
           std::static_pointer_cast<const P::Aggregate>(physical_plan);
       return E::SubsetOfExpressions(aggregate->grouping_expressions(), attributes);
     }
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+      const P::CrossReferenceCoalesceAggregatePtr &aggregate_on_left_outer_join =
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan);
+      return E::SubsetOfExpressions(
+          aggregate_on_left_outer_join->left_join_attributes(), attributes);
+    }
     case P::PhysicalType::kHashJoin: {
       const P::HashJoinPtr &hash_join =
           std::static_pointer_cast<const P::HashJoin>(physical_plan);
@@ -542,6 +590,103 @@ attribute_id StarSchemaSimpleCostModel::findCatalogRelationAttributeId(
   return kInvalidAttributeID;
 }
 
+bool StarSchemaSimpleCostModel::canUseCollisionFreeAggregation(
+    const P::AggregatePtr &aggregate,
+    const std::size_t estimated_num_groups,
+    std::size_t *max_num_groups) {
+#ifdef QUICKSTEP_DISTRIBUTED
+  // Currently we cannot do this fast path with the distributed setting. See
+  // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
+  // FinalizeAggregationOperator::getAllWorkOrderProtos().
+  return false;
+#endif
+
+  // Supports only single group-by key.
+  if (aggregate->grouping_expressions().size() != 1) {
+    return false;
+  }
+
+  // We need to know the exact min/max stats of the group-by key.
+  // So it must be a CatalogAttribute (but not an expression).
+  E::AttributeReferencePtr group_by_key_attr;
+  const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
+  if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
+    return false;
+  }
+
+  bool min_value_stat_is_exact;
+  bool max_value_stat_is_exact;
+  const TypedValue min_value = findMinValueStat(
+          aggregate, group_by_key_attr, &min_value_stat_is_exact);
+  const TypedValue max_value = findMaxValueStat(
+          aggregate, group_by_key_attr, &max_value_stat_is_exact);
+  if (min_value.isNull() || max_value.isNull() ||
+      (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
+    return false;
+  }
+
+  std::int64_t min_cpp_value;
+  std::int64_t max_cpp_value;
+  switch (group_by_key_attr->getValueType().getTypeID()) {
+    case TypeID::kInt: {
+      min_cpp_value = min_value.getLiteral<int>();
+      max_cpp_value = max_value.getLiteral<int>();
+      break;
+    }
+    case TypeID::kLong: {
+      min_cpp_value = min_value.getLiteral<std::int64_t>();
+      max_cpp_value = max_value.getLiteral<std::int64_t>();
+      break;
+    }
+    default:
+      return false;
+  }
+
+  // TODO(jianqiao):
+  // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
+  // 2. Reason about the table size bound (e.g. by checking memory size) instead
+  //    of hardcoding it as a gflag.
+  if (min_cpp_value < 0 ||
+      max_cpp_value >= FLAGS_collision_free_vector_table_max_size ||
+      max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
+    return false;
+  }
+
+  for (const auto &agg_expr : aggregate->aggregate_expressions()) {
+    const E::AggregateFunctionPtr agg_func =
+        std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
+
+    if (agg_func->is_distinct()) {
+      return false;
+    }
+
+    // TODO(jianqiao): Support AggregationID::AVG.
+    if (!QUICKSTEP_EQUALS_ANY_CONSTANT(agg_func->getAggregate().getAggregationID(),
+                                       AggregationID::kCount,
+                                       AggregationID::kSum)) {
+      return false;
+    }
+
+    const auto &arguments = agg_func->getArguments();
+    if (arguments.size() > 1u) {
+      return false;
+    }
+
+    if (arguments.size() == 1u) {
+      if (!QUICKSTEP_EQUALS_ANY_CONSTANT(arguments.front()->getValueType().getTypeID(),
+                                         TypeID::kInt,
+                                         TypeID::kLong,
+                                         TypeID::kFloat,
+                                         TypeID::kDouble)) {
+        return false;
+      }
+    }
+  }
+
+  *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
+  return true;
+}
+
 }  // namespace cost
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
index cbe18f4..afb2ef9 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
@@ -29,6 +29,7 @@
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
@@ -166,10 +167,29 @@ class StarSchemaSimpleCostModel : public CostModel {
         physical_plan, attribute->id(), StatType::kMax, is_exact_stat);
   }
 
+  /**
+   * @brief Checks whether an aggregate node can be efficiently evaluated with
+   *        the collision-free aggregation fast path.
+   *
+   * @param aggregate The physical aggregate node to be checked.
+   * @param estimated_num_groups The estimated number of groups for the aggregate.
+   * @param exact_num_groups If collision-free aggregation is applicable, the
+   *        pointed content of this pointer will be set as the maximum possible
+   *        number of groups that the collision-free hash table need to hold.
+   * @return A bool value indicating whether collision-free aggregation can be
+   *         used to evaluate \p aggregate.
+   */
+  bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate,
+                                      const std::size_t estimated_num_groups,
+                                      std::size_t *max_num_groups);
+
  private:
   std::size_t estimateCardinalityForAggregate(
       const physical::AggregatePtr &physical_plan);
 
+  std::size_t estimateCardinalityForCrossReferenceCoalesceAggregate(
+      const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
   std::size_t estimateCardinalityForFilterJoin(
       const physical::FilterJoinPtr &physical_plan);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/physical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CMakeLists.txt b/query_optimizer/physical/CMakeLists.txt
index f68ed39..77ae75e 100644
--- a/query_optimizer/physical/CMakeLists.txt
+++ b/query_optimizer/physical/CMakeLists.txt
@@ -21,6 +21,9 @@ add_library(quickstep_queryoptimizer_physical_BinaryJoin BinaryJoin.cpp BinaryJo
 add_library(quickstep_queryoptimizer_physical_CopyFrom CopyFrom.cpp CopyFrom.hpp)
 add_library(quickstep_queryoptimizer_physical_CreateIndex CreateIndex.cpp CreateIndex.hpp)
 add_library(quickstep_queryoptimizer_physical_CreateTable CreateTable.cpp CreateTable.hpp)
+add_library(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+            CrossReferenceCoalesceAggregate.cpp
+            CrossReferenceCoalesceAggregate.hpp)
 add_library(quickstep_queryoptimizer_physical_DeleteTuples DeleteTuples.cpp DeleteTuples.hpp)
 add_library(quickstep_queryoptimizer_physical_DropTable DropTable.cpp DropTable.hpp)
 add_library(quickstep_queryoptimizer_physical_FilterJoin FilterJoin.cpp FilterJoin.hpp)
@@ -95,6 +98,16 @@ target_link_libraries(quickstep_queryoptimizer_physical_CreateTable
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_utility_Cast
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+                      quickstep_queryoptimizer_OptimizerTree
+                      quickstep_queryoptimizer_expressions_Alias
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_utility_Cast
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_physical_DeleteTuples
                       glog
                       quickstep_catalog_CatalogRelation
@@ -293,6 +306,7 @@ target_link_libraries(quickstep_queryoptimizer_physical
                       quickstep_queryoptimizer_physical_CopyFrom
                       quickstep_queryoptimizer_physical_CreateIndex
                       quickstep_queryoptimizer_physical_CreateTable
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
                       quickstep_queryoptimizer_physical_DeleteTuples
                       quickstep_queryoptimizer_physical_DropTable
                       quickstep_queryoptimizer_physical_FilterJoin

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp b/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
new file mode 100644
index 0000000..16700c0
--- /dev/null
+++ b/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
+
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "utility/Cast.hpp"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+namespace E = ::quickstep::optimizer::expressions;
+
+std::vector<E::AttributeReferencePtr> CrossReferenceCoalesceAggregate
+    ::getOutputAttributes() const {
+  std::vector<E::AttributeReferencePtr> output_attributes(left_join_attributes_);
+  for (const auto &aggregate_expr : aggregate_expressions_) {
+    output_attributes.emplace_back(E::ToRef(aggregate_expr));
+  }
+  return output_attributes;
+}
+
+std::vector<E::AttributeReferencePtr> CrossReferenceCoalesceAggregate
+    ::getReferencedAttributes() const {
+  std::unordered_set<E::AttributeReferencePtr> referenced_attributes;
+
+  referenced_attributes.insert(left_join_attributes_.begin(),
+                               left_join_attributes_.end());
+  referenced_attributes.insert(right_join_attributes_.begin(),
+                               right_join_attributes_.end());
+
+  if (right_filter_predicate_ != nullptr) {
+    const std::vector<E::AttributeReferencePtr> attrs_in_predicate =
+        right_filter_predicate_->getReferencedAttributes();
+    referenced_attributes.insert(attrs_in_predicate.begin(),
+                                 attrs_in_predicate.end());
+  }
+
+  for (const auto &aggregate_expr : aggregate_expressions_) {
+    const std::vector<E::AttributeReferencePtr> attrs_in_expr =
+        aggregate_expr->getReferencedAttributes();
+    referenced_attributes.insert(attrs_in_expr.begin(), attrs_in_expr.end());
+  }
+
+  return std::vector<E::AttributeReferencePtr>(
+      referenced_attributes.begin(), referenced_attributes.end());
+}
+
+void CrossReferenceCoalesceAggregate::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+  inline_field_names->push_back("hash_table_num_entries");
+  inline_field_values->push_back(std::to_string(hash_table_num_entries_));
+
+  non_container_child_field_names->push_back("left_child");
+  non_container_child_fields->push_back(left_child_);
+  non_container_child_field_names->push_back("right_child");
+  non_container_child_fields->push_back(right_child_);
+
+  container_child_field_names->push_back("left_join_attributes");
+  container_child_fields->push_back(
+      CastSharedPtrVector<OptimizerTreeBase>(left_join_attributes_));
+  container_child_field_names->push_back("right_join_attributes");
+  container_child_fields->push_back(
+      CastSharedPtrVector<OptimizerTreeBase>(right_join_attributes_));
+
+  if (right_filter_predicate_ != nullptr) {
+    non_container_child_field_names->push_back("right_filter_predicate");
+    non_container_child_fields->push_back(right_filter_predicate_);
+  }
+  container_child_field_names->push_back("aggregate_expressions");
+  container_child_fields->push_back(
+      CastSharedPtrVector<OptimizerTreeBase>(aggregate_expressions_));
+}
+
+}  // namespace physical
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp b/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
new file mode 100644
index 0000000..9fb02e2
--- /dev/null
+++ b/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+/** \addtogroup OptimizerLogical
+ *  @{
+ */
+
+class CrossReferenceCoalesceAggregate;
+typedef std::shared_ptr<const CrossReferenceCoalesceAggregate> CrossReferenceCoalesceAggregatePtr;
+
+/**
+ * @brief TODO.
+ */
+class CrossReferenceCoalesceAggregate : public Physical {
+ public:
+  PhysicalType getPhysicalType() const override {
+    return PhysicalType::kCrossReferenceCoalesceAggregate;
+  }
+
+  std::string getName() const override {
+    return "CrossReferenceCoalesceAggregate";
+  }
+
+  const PhysicalPtr& left_child() const {
+    return left_child_;
+  }
+
+  const PhysicalPtr& right_child() const {
+    return right_child_;
+  }
+
+  const std::vector<expressions::AttributeReferencePtr>& left_join_attributes() const {
+    return left_join_attributes_;
+  }
+
+  const std::vector<expressions::AttributeReferencePtr>& right_join_attributes() const {
+    return right_join_attributes_;
+  }
+
+  const expressions::PredicatePtr& right_filter_predicate() const {
+    return right_filter_predicate_;
+  }
+
+  const std::vector<expressions::AliasPtr>& aggregate_expressions() const {
+    return aggregate_expressions_;
+  }
+
+  inline std::size_t hash_table_num_entries() const {
+    return hash_table_num_entries_;
+  }
+
+  PhysicalPtr copyWithNewChildren(
+      const std::vector<PhysicalPtr> &new_children) const override {
+    DCHECK_EQ(getNumChildren(), new_children.size());
+    return Create(new_children[0],
+                  new_children[1],
+                  left_join_attributes_,
+                  right_join_attributes_,
+                  right_filter_predicate_,
+                  aggregate_expressions_,
+                  hash_table_num_entries_);
+  }
+
+  std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override;
+
+  std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
+
+  bool maybeCopyWithPrunedExpressions(
+      const expressions::UnorderedNamedExpressionSet &referenced_expressions,
+      PhysicalPtr *output) const override {
+    return false;
+  }
+
+  static CrossReferenceCoalesceAggregatePtr Create(
+      const PhysicalPtr &left_child,
+      const PhysicalPtr &right_child,
+      const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
+      const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
+      const expressions::PredicatePtr right_filter_predicate,
+      const std::vector<expressions::AliasPtr> &aggregate_expressions,
+      const std::size_t hash_table_num_entries) {
+    return CrossReferenceCoalesceAggregatePtr(
+        new CrossReferenceCoalesceAggregate(left_child,
+                                            right_child,
+                                            left_join_attributes,
+                                            right_join_attributes,
+                                            right_filter_predicate,
+                                            aggregate_expressions,
+                                            hash_table_num_entries));
+  }
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override;
+
+ private:
+  CrossReferenceCoalesceAggregate(
+      const PhysicalPtr &left_child,
+      const PhysicalPtr &right_child,
+      const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
+      const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
+      const expressions::PredicatePtr right_filter_predicate,
+      const std::vector<expressions::AliasPtr> &aggregate_expressions,
+      const std::size_t hash_table_num_entries)
+      : left_child_(left_child),
+        right_child_(right_child),
+        left_join_attributes_(left_join_attributes),
+        right_join_attributes_(right_join_attributes),
+        right_filter_predicate_(right_filter_predicate),
+        aggregate_expressions_(aggregate_expressions),
+        hash_table_num_entries_(hash_table_num_entries) {
+    addChild(left_child_);
+    addChild(right_child_);
+  }
+
+  PhysicalPtr left_child_;
+  PhysicalPtr right_child_;
+  std::vector<expressions::AttributeReferencePtr> left_join_attributes_;
+  std::vector<expressions::AttributeReferencePtr> right_join_attributes_;
+  expressions::PredicatePtr right_filter_predicate_;
+  std::vector<expressions::AliasPtr> aggregate_expressions_;
+  std::size_t hash_table_num_entries_;
+
+  DISALLOW_COPY_AND_ASSIGN(CrossReferenceCoalesceAggregate);
+};
+
+/** @} */
+
+}  // namespace physical
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/physical/PatternMatcher.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/PatternMatcher.hpp b/query_optimizer/physical/PatternMatcher.hpp
index 4336767..0204504 100644
--- a/query_optimizer/physical/PatternMatcher.hpp
+++ b/query_optimizer/physical/PatternMatcher.hpp
@@ -33,6 +33,7 @@ class Aggregate;
 class BinaryJoin;
 class CopyFrom;
 class CreateTable;
+class CrossReferenceCoalesceAggregate;
 class DeleteTuples;
 class DropTable;
 class FilterJoin;
@@ -112,6 +113,8 @@ using SomeAggregate = SomePhysicalNode<Aggregate, PhysicalType::kAggregate>;
 using SomeBinaryJoin = SomePhysicalNode<BinaryJoin, PhysicalType::kHashJoin, PhysicalType::kNestedLoopsJoin>;
 using SomeCopyFrom = SomePhysicalNode<CopyFrom, PhysicalType::kCopyFrom>;
 using SomeCreateTable = SomePhysicalNode<CreateTable, PhysicalType::kCreateTable>;
+using SomeCrossReferenceCoalesceAggregate = SomePhysicalNode<CrossReferenceCoalesceAggregate,
+                                                             PhysicalType::kCrossReferenceCoalesceAggregate>;
 using SomeDeleteTuples = SomePhysicalNode<DeleteTuples, PhysicalType::kDeleteTuples>;
 using SomeDropTable = SomePhysicalNode<DropTable, PhysicalType::kDropTable>;
 using SomeFilterJoin = SomePhysicalNode<FilterJoin, PhysicalType::kFilterJoin>;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/physical/PhysicalType.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/PhysicalType.hpp b/query_optimizer/physical/PhysicalType.hpp
index 1da5929..077bd54 100644
--- a/query_optimizer/physical/PhysicalType.hpp
+++ b/query_optimizer/physical/PhysicalType.hpp
@@ -36,6 +36,7 @@ enum class PhysicalType {
   kCopyFrom,
   kCreateIndex,
   kCreateTable,
+  kCrossReferenceCoalesceAggregate,
   kDeleteTuples,
   kDropTable,
   kFilterJoin,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/rules/BottomUpRule.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/BottomUpRule.hpp b/query_optimizer/rules/BottomUpRule.hpp
index 53dff0d..6c14e64 100644
--- a/query_optimizer/rules/BottomUpRule.hpp
+++ b/query_optimizer/rules/BottomUpRule.hpp
@@ -57,21 +57,7 @@ class BottomUpRule : public Rule<TreeType> {
     DCHECK(tree != nullptr);
 
     init(tree);
-    std::vector<std::shared_ptr<const TreeType>> new_children;
-    bool has_changed_children = false;
-    for (const std::shared_ptr<const TreeType> &child : tree->children()) {
-      std::shared_ptr<const TreeType> new_child = apply(child);
-      if (child != new_child && !has_changed_children) {
-        has_changed_children = true;
-      }
-      new_children.push_back(new_child);
-    }
-
-    if (has_changed_children) {
-      return applyToNode(tree->copyWithNewChildren(new_children));
-    } else {
-      return applyToNode(tree);
-    }
+    return applyInternal(tree);
   }
 
  protected:
@@ -89,10 +75,29 @@ class BottomUpRule : public Rule<TreeType> {
    *
    * @param input The input tree.
    */
-  virtual void init(const TreeNodePtr &input) {
-  }
+  virtual void init(const TreeNodePtr &input) {}
 
  private:
+  TreeNodePtr applyInternal(const TreeNodePtr &tree) {
+    DCHECK(tree != nullptr);
+
+    std::vector<std::shared_ptr<const TreeType>> new_children;
+    bool has_changed_children = false;
+    for (const std::shared_ptr<const TreeType> &child : tree->children()) {
+      std::shared_ptr<const TreeType> new_child = applyInternal(child);
+      if (child != new_child && !has_changed_children) {
+        has_changed_children = true;
+      }
+      new_children.push_back(new_child);
+    }
+
+    if (has_changed_children) {
+      return applyToNode(tree->copyWithNewChildren(new_children));
+    } else {
+      return applyToNode(tree);
+    }
+  }
+
   DISALLOW_COPY_AND_ASSIGN(BottomUpRule);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 029d816..a614bde 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -21,6 +21,7 @@ add_subdirectory(tests)
 add_library(quickstep_queryoptimizer_rules_AttachLIPFilters AttachLIPFilters.cpp AttachLIPFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp)
 add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp)
+add_library(quickstep_queryoptimizer_rules_FuseAggregateJoin FuseAggregateJoin.cpp FuseAggregateJoin.hpp)
 add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
 add_library(quickstep_queryoptimizer_rules_InjectJoinFilters InjectJoinFilters.cpp InjectJoinFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_PruneColumns PruneColumns.cpp PruneColumns.hpp)
@@ -75,6 +76,27 @@ target_link_libraries(quickstep_queryoptimizer_rules_CollapseProject
                       quickstep_queryoptimizer_rules_Rule
                       quickstep_queryoptimizer_rules_RuleHelper
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_FuseAggregateJoin
+                      quickstep_expressions_aggregation_AggregateFunction
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
+                      quickstep_queryoptimizer_expressions_AggregateFunction
+                      quickstep_queryoptimizer_expressions_Alias
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_expressions_PatternMatcher
+                      quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+                      quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_PatternMatcher
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_queryoptimizer_physical_Selection
+                      quickstep_queryoptimizer_physical_TopLevelPlan
+                      quickstep_queryoptimizer_rules_BottomUpRule
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_rules_GenerateJoins
                       glog
                       quickstep_queryoptimizer_expressions_AttributeReference
@@ -288,6 +310,7 @@ target_link_libraries(quickstep_queryoptimizer_rules
                       quickstep_queryoptimizer_rules_AttachLIPFilters
                       quickstep_queryoptimizer_rules_BottomUpRule
                       quickstep_queryoptimizer_rules_CollapseProject
+                      quickstep_queryoptimizer_rules_FuseAggregateJoin
                       quickstep_queryoptimizer_rules_GenerateJoins
                       quickstep_queryoptimizer_rules_InjectJoinFilters
                       quickstep_queryoptimizer_rules_PruneColumns

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/rules/FuseAggregateJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseAggregateJoin.cpp b/query_optimizer/rules/FuseAggregateJoin.cpp
new file mode 100644
index 0000000..206188e
--- /dev/null
+++ b/query_optimizer/rules/FuseAggregateJoin.cpp
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/FuseAggregateJoin.hpp"
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_set>
+#include <vector>
+
+#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AggregateFunction.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+P::PhysicalPtr FuseAggregateJoin::applyToNode(
+    const P::PhysicalPtr &node) {
+  P::AggregatePtr aggregate;
+  if (!P::SomeAggregate::MatchesWithConditionalCast(node, &aggregate) ||
+      aggregate->filter_predicate() != nullptr) {
+    return node;
+  }
+
+  P::HashJoinPtr hash_join;
+  if ((!P::SomeHashJoin::MatchesWithConditionalCast(aggregate->input(), &hash_join)) ||
+      hash_join->join_type() != P::HashJoin::JoinType::kLeftOuterJoin ||
+      hash_join->residual_predicate() != nullptr) {
+    return node;
+  }
+
+  const std::vector<E::AttributeReferencePtr> &left_join_attributes =
+      hash_join->left_join_attributes();
+  if (left_join_attributes.size() != 1u ||
+      (!cost_model_->impliesUniqueAttributes(hash_join->left(), left_join_attributes))) {
+    return node;
+  }
+
+  const std::vector<E::NamedExpressionPtr> &grouping_expressions =
+      aggregate->grouping_expressions();
+  if (grouping_expressions.size() != 1u ||
+      grouping_expressions.front()->id() != left_join_attributes.front()->id()) {
+    return node;
+  }
+
+  std::unordered_set<E::ExprId> right_side_attr_ids;
+  for (const auto &attr : hash_join->right()->getOutputAttributes()) {
+    right_side_attr_ids.insert(attr->id());
+  }
+
+  const std::vector<E::AliasPtr> &aggregate_expressions =
+      aggregate->aggregate_expressions();
+  for (const auto &expr : aggregate_expressions) {
+    const E::AggregateFunctionPtr aggr_expr =
+        std::static_pointer_cast<const E::AggregateFunction>(expr->expression());
+
+    const std::vector<E::ScalarPtr> &arguments = aggr_expr->getArguments();
+    if (arguments.size() != 1u) {
+      return node;
+    }
+
+    E::AttributeReferencePtr arg_attr;
+    if (!E::SomeAttributeReference::MatchesWithConditionalCast(arguments.front(), &arg_attr) ||
+        right_side_attr_ids.find(arg_attr->id()) == right_side_attr_ids.end()) {
+      return node;
+    }
+  }
+
+  const std::size_t estimated_num_groups =
+      cost_model_->estimateNumGroupsForAggregate(aggregate);
+
+  std::size_t max_num_groups_left;
+  if (!cost_model_->canUseCollisionFreeAggregation(aggregate,
+                                                   estimated_num_groups,
+                                                   &max_num_groups_left)) {
+    return node;
+  }
+
+  std::size_t max_num_groups_right;
+  if (!cost_model_->canUseCollisionFreeAggregation(
+           P::Aggregate::Create(hash_join->right(),
+                                E::ToNamedExpressions(hash_join->right_join_attributes()),
+                                aggregate->aggregate_expressions(),
+                                nullptr),
+           estimated_num_groups,
+           &max_num_groups_right)) {
+    return node;
+  }
+
+  // Fuse filter predicate.
+  P::PhysicalPtr right_child = hash_join->right();
+  const std::vector<E::AttributeReferencePtr> &right_join_attributes =
+      hash_join->right_join_attributes();
+  E::PredicatePtr right_filter_predicate = nullptr;
+
+  P::SelectionPtr selection;
+  if (P::SomeSelection::MatchesWithConditionalCast(right_child, &selection)) {
+    if (E::SubsetOfExpressions(right_join_attributes,
+                               selection->input()->getOutputAttributes())) {
+      right_child = selection->input();
+      right_filter_predicate = selection->filter_predicate();
+    }
+  }
+
+  const std::size_t max_num_groups =
+      std::max(max_num_groups_left, max_num_groups_right);
+
+  return P::CrossReferenceCoalesceAggregate::Create(hash_join->left(),
+                                                    right_child,
+                                                    left_join_attributes,
+                                                    right_join_attributes,
+                                                    right_filter_predicate,
+                                                    aggregate_expressions,
+                                                    max_num_groups);
+}
+
+void FuseAggregateJoin::init(const P::PhysicalPtr &input) {
+  DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+
+  const P::TopLevelPlanPtr top_level_plan =
+      std::static_pointer_cast<const P::TopLevelPlan>(input);
+  cost_model_.reset(
+      new cost::StarSchemaSimpleCostModel(top_level_plan->shared_subplans()));
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/rules/FuseAggregateJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseAggregateJoin.hpp b/query_optimizer/rules/FuseAggregateJoin.hpp
new file mode 100644
index 0000000..240aaf1
--- /dev/null
+++ b/query_optimizer/rules/FuseAggregateJoin.hpp
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <string>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+class FuseAggregateJoin : public BottomUpRule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   */
+  FuseAggregateJoin() {}
+
+  ~FuseAggregateJoin() override {}
+
+  std::string getName() const override {
+    return "FuseAggregateJoin";
+  }
+
+ protected:
+  physical::PhysicalPtr applyToNode(const physical::PhysicalPtr &node) override;
+
+  void init(const physical::PhysicalPtr &input) override;
+
+ private:
+  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+
+  DISALLOW_COPY_AND_ASSIGN(FuseAggregateJoin);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/relational_operators/BuildAggregationExistenceMapOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.cpp b/relational_operators/BuildAggregationExistenceMapOperator.cpp
new file mode 100644
index 0000000..6a77696
--- /dev/null
+++ b/relational_operators/BuildAggregationExistenceMapOperator.cpp
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/BuildAggregationExistenceMapOperator.hpp"
+
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/AggregationOperationState.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+namespace {
+
+template <typename CppType, bool is_attr_nullable>
+void ExecuteBuild(const attribute_id attr_id,
+                  ValueAccessor *accessor,
+                  BarrieredReadWriteConcurrentBitVector *existence_map) {
+  InvokeOnAnyValueAccessor(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    accessor->beginIteration();
+    while (accessor->next()) {
+      const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id);
+      if (!is_attr_nullable || value != nullptr) {
+        existence_map->setBit(*reinterpret_cast<const CppType *>(value));
+      }
+    }
+  });
+}
+
+template <typename CppType>
+void ExecuteHelper(const attribute_id attr_id,
+                   const bool is_attr_nullable,
+                   ValueAccessor *accessor,
+                   BarrieredReadWriteConcurrentBitVector *existence_map)  {
+  if (is_attr_nullable) {
+    ExecuteBuild<CppType, true>(attr_id, accessor, existence_map);
+  } else {
+    ExecuteBuild<CppType, false>(attr_id, accessor, existence_map);
+  }
+}
+
+}
+
+bool BuildAggregationExistenceMapOperator::getAllWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
+  if (input_relation_is_stored_) {
+    if (!started_) {
+      for (const block_id input_block_id : input_relation_block_ids_) {
+        container->addNormalWorkOrder(
+            new BuildAggregationExistenceMapWorkOrder(
+                query_id_,
+                input_relation_,
+                input_block_id,
+                build_attribute_,
+                query_context->getAggregationState(aggr_state_index_),
+                storage_manager),
+            op_index_);
+      }
+      started_ = true;
+    }
+    return true;
+  } else {
+    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+      container->addNormalWorkOrder(
+          new BuildAggregationExistenceMapWorkOrder(
+                query_id_,
+                input_relation_,
+                input_relation_block_ids_[num_workorders_generated_],
+                build_attribute_,
+                query_context->getAggregationState(aggr_state_index_),
+                storage_manager),
+          op_index_);
+      ++num_workorders_generated_;
+    }
+    return done_feeding_input_relation_;
+  }
+}
+
+bool BuildAggregationExistenceMapOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  LOG(FATAL) << "Not implemented";
+}
+
+void BuildAggregationExistenceMapWorkOrder::execute() {
+  BlockReference block(
+      storage_manager_->getBlock(build_block_id_, input_relation_));
+
+  std::unique_ptr<ValueAccessor> accessor(
+      block->getTupleStorageSubBlock().createValueAccessor());
+
+  const Type &attr_type =
+      input_relation_.getAttributeById(build_attribute_)->getType();
+  switch (attr_type.getTypeID()) {
+    case TypeID::kInt:
+      ExecuteHelper<int>(build_attribute_,
+                         attr_type.isNullable(),
+                         accessor.get(),
+                         state_->getExistenceMap());
+      return;
+    case TypeID::kLong:
+      ExecuteHelper<std::int64_t>(build_attribute_,
+                                  attr_type.isNullable(),
+                                  accessor.get(),
+                                  state_->getExistenceMap());
+      return;
+    default:
+      LOG(FATAL) << "Build attribute type not supported by "
+                 << "BuildAggregationExistenceMapOperator: "
+                 << attr_type.getName();
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/relational_operators/BuildAggregationExistenceMapOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.hpp b/relational_operators/BuildAggregationExistenceMapOperator.hpp
new file mode 100644
index 0000000..2d13bda
--- /dev/null
+++ b/relational_operators/BuildAggregationExistenceMapOperator.hpp
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
+
+#include <string>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class AggregationOperationState;
+class CatalogRelationSchema;
+class StorageManager;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ *  @{
+ */
+
+class BuildAggregationExistenceMapOperator : public RelationalOperator {
+ public:
+  BuildAggregationExistenceMapOperator(const std::size_t query_id,
+                                       const CatalogRelation &input_relation,
+                                       const attribute_id build_attribute,
+                                       const bool input_relation_is_stored,
+                                       const QueryContext::aggregation_state_id aggr_state_index)
+      : RelationalOperator(query_id),
+        input_relation_(input_relation),
+        build_attribute_(build_attribute),
+        input_relation_is_stored_(input_relation_is_stored),
+        input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
+                                                           : std::vector<block_id>()),
+        aggr_state_index_(aggr_state_index),
+        num_workorders_generated_(0),
+        started_(false) {}
+
+  ~BuildAggregationExistenceMapOperator() override {}
+
+  std::string getName() const override {
+    return "BuildAggregationExistenceMapOperator";
+  }
+
+  const CatalogRelation& input_relation() const {
+    return input_relation_;
+  }
+
+  bool getAllWorkOrders(WorkOrdersContainer *container,
+                        QueryContext *query_context,
+                        StorageManager *storage_manager,
+                        const tmb::client_id scheduler_client_id,
+                        tmb::MessageBus *bus) override;
+
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+                      const partition_id part_id) override {
+    input_relation_block_ids_.push_back(input_block_id);
+  }
+
+ private:
+  serialization::WorkOrder* createWorkOrderProto(const block_id block);
+
+  const CatalogRelation &input_relation_;
+  const attribute_id build_attribute_;
+  const bool input_relation_is_stored_;
+  std::vector<block_id> input_relation_block_ids_;
+  const QueryContext::aggregation_state_id aggr_state_index_;
+
+  std::vector<block_id>::size_type num_workorders_generated_;
+  bool started_;
+
+  DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapOperator);
+};
+
+/**
+ * @brief A WorkOrder produced by BuildAggregationExistenceMapOperator.
+ **/
+class BuildAggregationExistenceMapWorkOrder : public WorkOrder {
+ public:
+  BuildAggregationExistenceMapWorkOrder(const std::size_t query_id,
+                                        const CatalogRelationSchema &input_relation,
+                                        const block_id build_block_id,
+                                        const attribute_id build_attribute,
+                                        AggregationOperationState *state,
+                                        StorageManager *storage_manager)
+      : WorkOrder(query_id),
+        input_relation_(input_relation),
+        build_block_id_(build_block_id),
+        build_attribute_(build_attribute),
+        state_(DCHECK_NOTNULL(state)),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
+  ~BuildAggregationExistenceMapWorkOrder() override {}
+
+  void execute() override;
+
+ private:
+  const CatalogRelationSchema &input_relation_;
+  const block_id build_block_id_;
+  const attribute_id build_attribute_;
+  AggregationOperationState *state_;
+
+  StorageManager *storage_manager_;
+
+  DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapWorkOrder);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_