You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/11 20:27:50 UTC

[10/16] incubator-quickstep git commit: Initial commit

Initial commit


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b7150fb1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b7150fb1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b7150fb1

Branch: refs/heads/LIP-for-tpch
Commit: b7150fb13b117841919ac8dfcc7aca9043a30e82
Parents: d9135a8
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Sat Jun 11 23:14:00 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Thu Aug 11 15:23:00 2016 -0500

----------------------------------------------------------------------
 CMakeLists.txt                                  |   1 +
 catalog/CMakeLists.txt                          |   9 +
 catalog/Catalog.proto                           |   5 +
 catalog/CatalogRelationConstraints.cpp          |  55 ++
 catalog/CatalogRelationConstraints.hpp          | 110 ++++
 catalog/CatalogRelationSchema.cpp               |  15 +
 catalog/CatalogRelationSchema.hpp               |  16 +-
 cli/CommandExecutor.cpp                         |  25 +-
 cli/QuickstepCli.cpp                            |  65 +++
 compression/CompressionDictionaryLite.hpp       |  42 ++
 query_execution/CMakeLists.txt                  |   1 +
 query_execution/QueryContext.cpp                |  11 +-
 query_execution/Worker.cpp                      |   5 +
 query_optimizer/CMakeLists.txt                  |   2 +
 query_optimizer/ExecutionGenerator.cpp          |  74 +--
 query_optimizer/ExecutionGenerator.hpp          |   2 +-
 query_optimizer/ExecutionHeuristics.cpp         | 171 ++++---
 query_optimizer/ExecutionHeuristics.hpp         |  79 ++-
 query_optimizer/PhysicalGenerator.cpp           |   7 +-
 query_optimizer/cost_model/SimpleCostModel.cpp  |   4 +-
 .../cost_model/StarSchemaSimpleCostModel.cpp    |  42 +-
 query_optimizer/expressions/ExpressionUtil.hpp  |   8 +-
 query_optimizer/physical/Aggregate.cpp          |   5 +
 query_optimizer/physical/Aggregate.hpp          |  23 +-
 query_optimizer/physical/HashJoin.cpp           |  27 +
 query_optimizer/physical/HashJoin.hpp           |  23 +-
 query_optimizer/physical/Physical.hpp           |  55 ++
 query_optimizer/physical/Selection.cpp          |   6 +
 query_optimizer/physical/Selection.hpp          |   3 +
 query_optimizer/physical/TableReference.cpp     |  18 +
 query_optimizer/physical/TableReference.hpp     |   3 +
 query_optimizer/rules/AttachBloomFilters.cpp    | 308 ++++++++++++
 query_optimizer/rules/AttachBloomFilters.hpp    | 118 +++++
 query_optimizer/rules/CMakeLists.txt            |  17 +
 .../StarSchemaHashJoinOrderOptimization.cpp     | 277 ++++++----
 .../StarSchemaHashJoinOrderOptimization.hpp     | 100 ++--
 .../tests/ExecutionHeuristics_unittest.cpp      |   3 +-
 relational_operators/HashJoinOperator.cpp       |  10 +
 relational_operators/HashJoinOperator.hpp       |  25 +-
 relational_operators/WorkOrder.hpp              |  11 +-
 storage/AggregationOperationState.cpp           |  98 +++-
 storage/AggregationOperationState.hpp           |  10 +-
 storage/AggregationOperationState.proto         |   6 +
 storage/BasicColumnStoreValueAccessor.hpp       |  26 +-
 storage/BloomFilterIndexSubBlock.cpp            |   4 +-
 storage/BloomFilterIndexSubBlock.hpp            |   6 -
 storage/CMakeLists.txt                          |   2 +
 storage/CompressedColumnStoreValueAccessor.hpp  |  22 +
 .../CompressedPackedRowStoreValueAccessor.hpp   |  22 +
 storage/HashTable.hpp                           | 185 ++++---
 storage/HashTable.proto                         |  10 +-
 storage/HashTableFactory.hpp                    |  23 +-
 storage/PackedRowStoreValueAccessor.hpp         |  25 +-
 storage/SplitRowStoreValueAccessor.hpp          |  45 ++
 storage/StorageBlock.cpp                        |  28 +-
 storage/StorageBlock.hpp                        |   7 +-
 storage/ValueAccessor.hpp                       |  36 ++
 types/containers/ColumnVector.hpp               |  35 ++
 types/containers/ColumnVectorsValueAccessor.hpp |  17 +
 utility/BloomFilter.hpp                         | 502 ++++++++++++++-----
 utility/BloomFilter.proto                       |   6 +-
 utility/BloomFilterAdapter.hpp                  | 142 ++++++
 utility/CMakeLists.txt                          |  13 +
 utility/DisjointTreeForest.hpp                  | 116 +++++
 utility/EventProfiler.cpp                       |  29 ++
 utility/EventProfiler.hpp                       | 188 +++++++
 utility/PlanVisualizer.cpp                      |  42 +-
 67 files changed, 2867 insertions(+), 559 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 487aaf9..27db15e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -770,6 +770,7 @@ target_link_libraries(quickstep_cli_shell
                       quickstep_queryoptimizer_QueryProcessor
                       quickstep_storage_PreloaderThread
                       quickstep_threading_ThreadIDBasedMap
+                      quickstep_utility_EventProfiler
                       quickstep_utility_ExecutionDAGVisualizer
                       quickstep_utility_Macros
                       quickstep_utility_PtrVector

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/catalog/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/catalog/CMakeLists.txt b/catalog/CMakeLists.txt
index dd4ef99..fc01458 100644
--- a/catalog/CMakeLists.txt
+++ b/catalog/CMakeLists.txt
@@ -35,6 +35,9 @@ add_library(quickstep_catalog_CatalogDatabaseCache CatalogDatabaseCache.cpp Cata
 add_library(quickstep_catalog_CatalogDatabaseLite ../empty_src.cpp CatalogDatabaseLite.hpp)
 add_library(quickstep_catalog_CatalogErrors ../empty_src.cpp CatalogErrors.hpp)
 add_library(quickstep_catalog_CatalogRelation CatalogRelation.cpp CatalogRelation.hpp)
+add_library(quickstep_catalog_CatalogRelationConstraints
+            CatalogRelationConstraints.cpp
+            CatalogRelationConstraints.hpp)
 add_library(quickstep_catalog_CatalogRelationSchema
             CatalogRelationSchema.cpp
             CatalogRelationSchema.hpp)
@@ -117,6 +120,10 @@ target_link_libraries(quickstep_catalog_CatalogRelation
                       quickstep_threading_SpinSharedMutex
                       quickstep_utility_Macros
                       quickstep_utility_PtrVector)
+target_link_libraries(quickstep_catalog_CatalogRelationConstraints
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_catalog_Catalog_proto
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_catalog_CatalogRelationStatistics
                       quickstep_catalog_CatalogTypedefs
                       quickstep_catalog_Catalog_proto
@@ -136,6 +143,7 @@ target_link_libraries(quickstep_catalog_CatalogRelationSchema
                       glog
                       quickstep_catalog_CatalogAttribute
                       quickstep_catalog_CatalogErrors
+                      quickstep_catalog_CatalogRelationConstraints
                       quickstep_catalog_CatalogTypedefs
                       quickstep_catalog_Catalog_proto
                       quickstep_types_Type
@@ -182,6 +190,7 @@ target_link_libraries(quickstep_catalog
                       quickstep_catalog_CatalogDatabaseLite
                       quickstep_catalog_CatalogErrors
                       quickstep_catalog_CatalogRelation
+                      quickstep_catalog_CatalogRelationConstraints
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_catalog_CatalogRelationStatistics
                       quickstep_catalog_CatalogTypedefs

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/catalog/Catalog.proto
----------------------------------------------------------------------
diff --git a/catalog/Catalog.proto b/catalog/Catalog.proto
index 90ce37e..a4cc3ce 100644
--- a/catalog/Catalog.proto
+++ b/catalog/Catalog.proto
@@ -80,6 +80,10 @@ message IndexScheme {
   repeated IndexEntry index_entries = 1;
 }
 
+message CatalogRelationConstraints {
+  repeated int32 primary_key = 1;
+}
+
 message CatalogRelationStatistics {
   optional fixed64 num_tuples = 1;
 
@@ -96,6 +100,7 @@ message CatalogRelationSchema {
   required bool temporary = 3;
 
   repeated CatalogAttribute attributes = 4;
+  optional CatalogRelationConstraints constraints = 5;
 
   extensions 16 to max;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/catalog/CatalogRelationConstraints.cpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogRelationConstraints.cpp b/catalog/CatalogRelationConstraints.cpp
new file mode 100644
index 0000000..4525a98
--- /dev/null
+++ b/catalog/CatalogRelationConstraints.cpp
@@ -0,0 +1,55 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "catalog/CatalogRelationConstraints.hpp"
+
+#include "catalog/Catalog.pb.h"
+
+namespace quickstep {
+
+CatalogRelationConstraints::CatalogRelationConstraints(
+    const serialization::CatalogRelationConstraints &proto) {
+  if (proto.primary_key_size() > 0) {
+    primary_key_.reset(new std::set<attribute_id>());
+    for (std::size_t i = 0; i < proto.primary_key_size(); ++i) {
+      primary_key_->emplace(proto.primary_key(i));
+    }
+  }
+}
+
+serialization::CatalogRelationConstraints CatalogRelationConstraints::getProto() const {
+  serialization::CatalogRelationConstraints proto;
+  if (primary_key_ != nullptr) {
+    for (const auto attr_id : *primary_key_) {
+      proto.add_primary_key(attr_id);
+    }
+  }
+  return proto;
+}
+
+bool CatalogRelationConstraints::ProtoIsValid(
+    const serialization::CatalogRelationConstraints &proto,
+    const std::size_t num_attributes) {
+  for (std::size_t i = 0; i < proto.primary_key_size(); ++i) {
+    if (proto.primary_key(i) >= num_attributes) {
+      return false;
+    }
+  }
+  return true;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/catalog/CatalogRelationConstraints.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogRelationConstraints.hpp b/catalog/CatalogRelationConstraints.hpp
new file mode 100644
index 0000000..896c072
--- /dev/null
+++ b/catalog/CatalogRelationConstraints.hpp
@@ -0,0 +1,110 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_CATALOG_CATALOG_RELATION_CONSTRAINTS_HPP_
+#define QUICKSTEP_CATALOG_CATALOG_RELATION_CONSTRAINTS_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <memory>
+#include <set>
+#include <utility>
+
+#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogTypedefs.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Catalog
+ *  @{
+ */
+
+/**
+ * @brief Constraints on a catalog relation.
+ **/
+class CatalogRelationConstraints {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  CatalogRelationConstraints() {}
+
+  /**
+   * @brief Reconstruct a CatalogRelationConstraints object from its serialized
+   *        Protocol Buffer form.
+   *
+   * @param proto The Protocol Buffer serialization of a CatalogRelationConstraints
+   *        object, previously produced by getProto().
+   **/
+  explicit CatalogRelationConstraints(const serialization::CatalogRelationConstraints &proto);
+
+  /**
+   * @brief Serialize the CatalogRelationConstraints object as Protocol Buffer.
+   *
+   * @return The Protocol Buffer representation of the CatalogRelationConstraints
+   *         object.
+   **/
+  serialization::CatalogRelationConstraints getProto() const;
+
+  static bool ProtoIsValid(const serialization::CatalogRelationConstraints &proto,
+                           const std::size_t num_attributes);
+
+  bool hasPrimaryKey() const {
+    return (primary_key_ != nullptr);
+  }
+
+  const std::set<attribute_id>* getPrimaryKey() const {
+    return primary_key_.get();
+  }
+
+  template <typename IterableT>
+  void setPrimaryKey(IterableT &&primary_key) {
+    CHECK(!primary_key.empty());
+    primary_key_.reset(
+        new std::set<attribute_id>(primary_key.begin(), primary_key.end()));
+  }
+
+  void removePrimaryKey() {
+    primary_key_.reset();
+  }
+
+  bool impliesUniqueAttributes(const std::set<attribute_id> &attributes) const {
+    if (primary_key_ == nullptr) {
+      return false;
+    }
+
+    std::vector<attribute_id> attr_intersection;
+    std::set_intersection(primary_key_->begin(), primary_key_->end(),
+                          attributes.begin(), attributes.end(),
+                          std::back_inserter(attr_intersection));
+    return (attr_intersection.size() == primary_key_->size());
+  }
+
+ private:
+  std::unique_ptr<std::set<attribute_id>> primary_key_;
+
+  DISALLOW_COPY_AND_ASSIGN(CatalogRelationConstraints);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_CATALOG_CATALOG_RELATION_CONSTRAINTS_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/catalog/CatalogRelationSchema.cpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogRelationSchema.cpp b/catalog/CatalogRelationSchema.cpp
index 5568cef..d474ef6 100644
--- a/catalog/CatalogRelationSchema.cpp
+++ b/catalog/CatalogRelationSchema.cpp
@@ -27,6 +27,7 @@
 #include "catalog/Catalog.pb.h"
 #include "catalog/CatalogAttribute.hpp"
 #include "catalog/CatalogErrors.hpp"
+#include "catalog/CatalogRelationConstraints.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "types/Type.hpp"
 #include "utility/PtrVector.hpp"
@@ -70,6 +71,12 @@ CatalogRelationSchema::CatalogRelationSchema(const serialization::CatalogRelatio
       attr_vec_.push_back(nullptr);
     }
   }
+
+  if (proto.has_constraints()) {
+    constraints_.reset(new CatalogRelationConstraints(proto.constraints()));
+  } else {
+    constraints_.reset(new CatalogRelationConstraints());
+  }
 }
 
 bool CatalogRelationSchema::ProtoIsValid(const serialization::CatalogRelationSchema &proto) {
@@ -84,6 +91,12 @@ bool CatalogRelationSchema::ProtoIsValid(const serialization::CatalogRelationSch
     }
   }
 
+  if (proto.has_constraints()
+      && !CatalogRelationConstraints::ProtoIsValid(proto.constraints(),
+                                                   proto.attributes_size())) {
+    return false;
+  }
+
   return true;
 }
 
@@ -104,6 +117,8 @@ serialization::CatalogRelationSchema CatalogRelationSchema::getProto() const {
     }
   }
 
+  proto.mutable_constraints()->CopyFrom(constraints_->getProto());
+
   return proto;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/catalog/CatalogRelationSchema.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogRelationSchema.hpp b/catalog/CatalogRelationSchema.hpp
index d8b6128..0c7e97b 100644
--- a/catalog/CatalogRelationSchema.hpp
+++ b/catalog/CatalogRelationSchema.hpp
@@ -21,12 +21,14 @@
 #define QUICKSTEP_CATALOG_CATALOG_RELATION_SCHEMA_HPP_
 
 #include <cstddef>
+#include <memory>
 #include <string>
 #include <unordered_map>
 #include <vector>
 
 #include "catalog/Catalog.pb.h"
 #include "catalog/CatalogAttribute.hpp"
+#include "catalog/CatalogRelationConstraints.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "utility/Macros.hpp"
 #include "utility/PtrVector.hpp"
@@ -427,6 +429,14 @@ class CatalogRelationSchema {
     return max_byte_lengths_;
   }
 
+  const CatalogRelationConstraints& getConstraints() const {
+    return *constraints_;
+  }
+
+  CatalogRelationConstraints* getConstraintsMutable() {
+    return constraints_.get();
+  }
+
  protected:
   /**
    * @brief Create a new relation.
@@ -456,7 +466,8 @@ class CatalogRelationSchema {
         min_variable_byte_length_excluding_nullable_(0),
         estimated_variable_byte_length_(0),
         current_nullable_attribute_index_(-1),
-        current_variable_length_attribute_index_(-1) {
+        current_variable_length_attribute_index_(-1),
+        constraints_(new CatalogRelationConstraints()) {
   }
 
   /**
@@ -532,6 +543,9 @@ class CatalogRelationSchema {
   std::vector<int> variable_length_attribute_indices_;
   int current_variable_length_attribute_index_;
 
+  // Primary key, foreign keys, etc.
+  std::unique_ptr<CatalogRelationConstraints> constraints_;
+
  private:
   friend class CatalogDatabase;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/cli/CommandExecutor.cpp
----------------------------------------------------------------------
diff --git a/cli/CommandExecutor.cpp b/cli/CommandExecutor.cpp
index 78fbe6f..f27f1ef 100644
--- a/cli/CommandExecutor.cpp
+++ b/cli/CommandExecutor.cpp
@@ -258,7 +258,8 @@ inline TypedValue executeQueryForSingleResult(
   return value;
 }
 
-void executeAnalyze(const tmb::client_id main_thread_client_id,
+void executeAnalyze(const PtrVector<ParseString> *arguments,
+                    const tmb::client_id main_thread_client_id,
                     const tmb::client_id foreman_client_id,
                     MessageBus *bus,
                     QueryProcessor *query_processor,
@@ -267,8 +268,19 @@ void executeAnalyze(const tmb::client_id main_thread_client_id,
   StorageManager *storage_manager = query_processor->getStorageManager();
 
   std::unique_ptr<SqlParserWrapper> parser_wrapper(new SqlParserWrapper());
-  std::vector<std::reference_wrapper<const CatalogRelation>> relations(
-      database.begin(), database.end());
+  std::vector<std::reference_wrapper<const CatalogRelation>> relations;
+  if (arguments->size() == 0) {
+    relations.insert(relations.begin(), database.begin(), database.end());
+  } else {
+    for (const auto &rel_name : *arguments) {
+      const CatalogRelation *rel = database.getRelationByName(rel_name.value());
+      if (rel == nullptr) {
+        THROW_SQL_ERROR_AT(&rel_name) << "Table does not exist";
+      } else {
+        relations.emplace_back(*rel);
+      }
+    }
+  }
 
   // Analyze each relation in the database.
   for (const CatalogRelation &relation : relations) {
@@ -348,8 +360,11 @@ void executeCommand(const ParseStatement &statement,
       executeDescribeTable(arguments, catalog_database, out);
     }
   } else if (command_str == C::kAnalyzeCommand) {
-    executeAnalyze(
-        main_thread_client_id, foreman_client_id, bus, query_processor, out);
+    executeAnalyze(arguments,
+                   main_thread_client_id,
+                   foreman_client_id,
+                   bus,
+                   query_processor, out);
   } else {
     THROW_SQL_ERROR_AT(command.command()) << "Invalid Command";
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 8269197..a3dfae4 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -52,6 +52,9 @@ typedef quickstep::LineReaderDumb LineReaderImpl;
 #include <gperftools/profiler.h>
 #endif
 
+#include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogRelationConstraints.hpp"
 #include "cli/DefaultsConfigurator.hpp"
 #include "cli/InputParserUtil.hpp"
 #include "cli/PrintToScreen.hpp"
@@ -75,6 +78,7 @@ typedef quickstep::LineReaderDumb LineReaderImpl;
 
 #include "storage/PreloaderThread.hpp"
 #include "threading/ThreadIDBasedMap.hpp"
+#include "utility/EventProfiler.hpp"
 #include "utility/ExecutionDAGVisualizer.hpp"
 #include "utility/Macros.hpp"
 #include "utility/PtrVector.hpp"
@@ -90,6 +94,8 @@ typedef quickstep::LineReaderDumb LineReaderImpl;
 #include "tmb/message_bus.h"
 #include "tmb/message_style.h"
 
+#include "google/protobuf/text_format.h"
+
 namespace quickstep {
 class CatalogRelation;
 }
@@ -190,9 +196,48 @@ DEFINE_bool(visualize_execution_dag, false,
             "If true, visualize the execution plan DAG into a graph in DOT "
             "format (DOT is a plain text graph description language) which is "
             "then printed via stderr.");
+DEFINE_string(profile_output, "",
+              "Output file name for writing the profiled events.");
 
 }  // namespace quickstep
 
+void addPrimaryKeyInfoForTPCHTables(quickstep::CatalogDatabase *database) {
+  const std::vector<std::pair<std::string, std::vector<std::string>>> rel_pkeys = {
+      { "region", { "r_regionkey" } },
+      { "nation", { "n_nationkey" } },
+      { "supplier", { "s_suppkey" } },
+      { "customer", { "c_custkey" } },
+      { "part", { "p_partkey" } },
+      { "partsupp", { "ps_partkey", "ps_suppkey" } },
+      { "orders", { "o_orderkey" } }
+  };
+  for (const auto &rel_pair : rel_pkeys) {
+    CatalogRelation *rel = database->getRelationByNameMutable(rel_pair.first);
+    std::vector<quickstep::attribute_id> attrs;
+    for (const auto &pkey : rel_pair.second) {
+      attrs.emplace_back(rel->getAttributeByName(pkey)->getID());
+    }
+    rel->getConstraintsMutable()->setPrimaryKey(attrs);
+  }
+}
+
+void addPrimaryKeyInfoForSSBTables(quickstep::CatalogDatabase *database) {
+  const std::vector<std::pair<std::string, std::vector<std::string>>> rel_pkeys = {
+      { "supplier", { "s_suppkey" } },
+      { "customer", { "c_custkey" } },
+      { "part", { "p_partkey" } },
+      { "ddate", { "d_datekey" } }
+  };
+  for (const auto &rel_pair : rel_pkeys) {
+    CatalogRelation *rel = database->getRelationByNameMutable(rel_pair.first);
+    std::vector<quickstep::attribute_id> attrs;
+    for (const auto &pkey : rel_pair.second) {
+      attrs.emplace_back(rel->getAttributeByName(pkey)->getID());
+    }
+    rel->getConstraintsMutable()->setPrimaryKey(attrs);
+  }
+}
+
 int main(int argc, char* argv[]) {
   google::InitGoogleLogging(argv[0]);
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -300,6 +345,15 @@ int main(int argc, char* argv[]) {
     LOG(FATAL) << "NON-STANDARD EXCEPTION DURING STARTUP";
   }
 
+//  addPrimaryKeyInfoForTPCHTables(query_processor->getDefaultDatabase());
+//  addPrimaryKeyInfoForSSBTables(query_processor->getDefaultDatabase());
+//  std::string proto_str;
+//  google::protobuf::TextFormat::PrintToString(
+//      query_processor->getDefaultDatabase()->getProto(), &proto_str);
+//  std::cerr << proto_str << "\n";
+//  query_processor->markCatalogAltered();
+//  query_processor->saveCatalog();
+
   // Parse the CPU affinities for workers and the preloader thread, if enabled
   // to warm up the buffer pool.
   const vector<int> worker_cpu_affinities =
@@ -449,6 +503,7 @@ int main(int argc, char* argv[]) {
               new quickstep::ExecutionDAGVisualizer(*query_handle->getQueryPlanMutable()));
         }
 
+        quickstep::simple_profiler.clear();
         start = std::chrono::steady_clock::now();
         QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
             main_thread_client_id,
@@ -461,6 +516,11 @@ int main(int argc, char* argv[]) {
               main_thread_client_id, &bus);
           end = std::chrono::steady_clock::now();
 
+          if (quickstep::FLAGS_visualize_dag) {
+            quickstep::DAGVisualizer visualizer(*query_handle->getQueryPlanMutable());
+            std::cerr << "\n" << visualizer.toDOT() << "\n";
+          }
+
           const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
           if (query_result_relation) {
             PrintToScreen::PrintRelation(*query_result_relation,
@@ -492,6 +552,11 @@ int main(int argc, char* argv[]) {
             dag_visualizer->bindProfilingStats(profiling_stats);
             std::cerr << "\n" << dag_visualizer->toDOT() << "\n";
           }
+          if (!quickstep::FLAGS_profile_output.empty()) {
+            std::ofstream ofs(quickstep::FLAGS_profile_output, std::ios::out);
+            quickstep::simple_profiler.writeToStream(ofs);
+            ofs.close();
+          }
         } catch (const std::exception &e) {
           fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what());
           break;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/compression/CompressionDictionaryLite.hpp
----------------------------------------------------------------------
diff --git a/compression/CompressionDictionaryLite.hpp b/compression/CompressionDictionaryLite.hpp
index c4c338e..7eb0e34 100644
--- a/compression/CompressionDictionaryLite.hpp
+++ b/compression/CompressionDictionaryLite.hpp
@@ -176,6 +176,15 @@ class CompressionDictionaryLite {
     }
   }
 
+  template <bool check_null = true>
+  inline std::pair<const void*, std::size_t> getUntypedValueAndByteLengthForCode(const std::uint32_t code) const {
+    if (type_is_variable_length_) {
+      return variableLengthGetUntypedValueAndByteLengthHelper<std::uint32_t, check_null>(code);
+    } else {
+      return fixedLengthGetUntypedValueAndByteLengthHelper<std::uint32_t, check_null>(code);
+    }
+  }
+
   /**
    * @brief Get the value represented by the specified code as a TypedValue.
    * @note This version is for codes of 8 bits or less. Also see
@@ -257,6 +266,39 @@ class CompressionDictionaryLite {
     return retval;
   }
 
+  template <typename CodeType, bool check_null = true>
+  inline std::pair<const void*, std::size_t> fixedLengthGetUntypedValueAndByteLengthHelper(
+      const CodeType code) const {
+    if (check_null && (code == getNullCode())) {
+      return std::make_pair(nullptr, 0);
+    }
+    DCHECK_LT(code, numberOfCodes());
+    return std::make_pair(static_cast<const char*>(dictionary_memory_)
+                              + 2 * sizeof(std::uint32_t)        // Header.
+                              + code * type_fixed_byte_length_,  // Index into value array.
+                          type_fixed_byte_length_);
+  }
+
+  template <typename CodeType, bool check_null = true>
+  inline std::pair<const void*, std::size_t> variableLengthGetUntypedValueAndByteLengthHelper(
+      const CodeType code) const {
+    if (check_null && (code == getNullCode())) {
+      return std::make_pair(nullptr, 0);
+    }
+    DCHECK_LT(code, numberOfCodes());
+
+    const std::uint32_t value_offset = static_cast<const std::uint32_t*>(dictionary_memory_)[code + 2];
+    const void *data_ptr = variable_length_data_region_ + value_offset;
+    DCHECK_LT(data_ptr, static_cast<const char*>(dictionary_memory_) + dictionary_memory_size_);
+
+    std::size_t data_size = (code == *static_cast<const std::uint32_t*>(dictionary_memory_) - 1) ?
+        (static_cast<const char*>(dictionary_memory_)
+            + dictionary_memory_size_
+            - static_cast<const char*>(data_ptr))
+        : (static_cast<const std::uint32_t*>(dictionary_memory_)[code + 3] - value_offset);
+    return std::make_pair(data_ptr, data_size);
+  }
+
   template <typename CodeType>
   inline TypedValue fixedLengthGetTypedValueHelper(const CodeType code) const {
     if (code == getNullCode()) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 74fcafb..ff0fe08 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -282,6 +282,7 @@ target_link_libraries(quickstep_queryexecution_Worker
                       quickstep_threading_Thread
                       quickstep_threading_ThreadIDBasedMap
                       quickstep_threading_ThreadUtil
+                      quickstep_utility_EventProfiler
                       quickstep_utility_Macros
                       tmb)
 target_link_libraries(quickstep_queryexecution_WorkerDirectory

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 2572e18..65405a6 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -61,15 +61,16 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
       << "Attempted to create QueryContext from an invalid proto description:\n"
       << proto.DebugString();
 
+  for (int i = 0; i < proto.bloom_filters_size(); ++i) {
+    bloom_filters_.emplace_back(new BloomFilter(proto.bloom_filters(i)));
+  }
+
   for (int i = 0; i < proto.aggregation_states_size(); ++i) {
     aggregation_states_.emplace_back(
         AggregationOperationState::ReconstructFromProto(proto.aggregation_states(i),
                                                         database,
-                                                        storage_manager));
-  }
-
-  for (int i = 0; i < proto.bloom_filters_size(); ++i) {
-    bloom_filters_.emplace_back(new BloomFilter(proto.bloom_filters(i)));
+                                                        storage_manager,
+                                                        bloom_filters_));
   }
 
   for (int i = 0; i < proto.generator_functions_size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index 0b1efba..a1d3685 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -32,6 +32,7 @@
 #include "relational_operators/WorkOrder.hpp"
 #include "threading/ThreadIDBasedMap.hpp"
 #include "threading/ThreadUtil.hpp"
+#include "utility/EventProfiler.hpp"
 
 #include "glog/logging.h"
 
@@ -119,8 +120,12 @@ void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
   const size_t query_id_for_workorder = worker_message.getWorkOrder()->getQueryID();
 
   // Start measuring the execution time.
+  auto *container = relop_profiler.getContainer();
+  auto *line = container->getEventLine(worker_message.getRelationalOpIndex());
   start = std::chrono::steady_clock::now();
+  line->emplace_back();
   worker_message.getWorkOrder()->execute();
+  line->back().endEvent();
   end = std::chrono::steady_clock::now();
   delete worker_message.getWorkOrder();
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index c24ee89..aaad17c 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -79,6 +79,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_queryoptimizer_QueryPlan
                       quickstep_queryoptimizer_costmodel_CostModel
                       quickstep_queryoptimizer_costmodel_SimpleCostModel
+                      quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
                       quickstep_queryoptimizer_expressions_AggregateFunction
                       quickstep_queryoptimizer_expressions_Alias
                       quickstep_queryoptimizer_expressions_AttributeReference
@@ -196,6 +197,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
                       quickstep_queryoptimizer_LogicalToPhysicalMapper
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_rules_AttachBloomFilters
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOptimization
                       quickstep_queryoptimizer_rules_SwapProbeBuild

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index a4b538e..6dbba51 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -59,6 +59,7 @@
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryPlan.hpp"
 #include "query_optimizer/cost_model/SimpleCostModel.hpp"
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
 #include "query_optimizer/expressions/AggregateFunction.hpp"
 #include "query_optimizer/expressions/Alias.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
@@ -167,6 +168,8 @@ void ExecutionGenerator::generatePlan(const P::PhysicalPtr &physical_plan) {
 
   cost_model_.reset(
       new cost::SimpleCostModel(top_level_physical_plan_->shared_subplans()));
+  star_schema_cost_model_.reset(
+      new cost::StarSchemaSimpleCostModel(top_level_physical_plan_->shared_subplans()));
 
   const CatalogRelation *result_relation = nullptr;
 
@@ -600,8 +603,10 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   std::vector<attribute_id> probe_attribute_ids;
   std::vector<attribute_id> build_attribute_ids;
 
-  std::vector<attribute_id> probe_original_attribute_ids;
-  std::vector<attribute_id> build_original_attribute_ids;
+  const P::BloomFilterConfig &bloom_filter_config =
+      physical_plan->bloom_filter_config();
+  std::vector<attribute_id> probe_side_bloom_filter_attribute_ids;
+  std::vector<attribute_id> build_side_bloom_filter_attribute_ids;
 
   const CatalogRelation *referenced_stored_probe_relation = nullptr;
   const CatalogRelation *referenced_stored_build_relation = nullptr;
@@ -616,18 +621,6 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   const std::vector<E::AttributeReferencePtr> &left_join_attributes =
       physical_plan->left_join_attributes();
   for (const E::AttributeReferencePtr &left_join_attribute : left_join_attributes) {
-    // Try to determine the original stored relation referenced in the Hash Join.
-    referenced_stored_probe_relation =
-        optimizer_context_->catalog_database()->getRelationByName(left_join_attribute->relation_name());
-    if (referenced_stored_probe_relation == nullptr) {
-      // Hash Join optimizations are not possible, if the referenced relation cannot be determined.
-      skip_hash_join_optimization = true;
-    } else {
-      const attribute_id probe_operator_attribute_id =
-          referenced_stored_probe_relation->getAttributeByName(left_join_attribute->attribute_name())->getID();
-      probe_original_attribute_ids.emplace_back(probe_operator_attribute_id);
-    }
-
     const CatalogAttribute *probe_catalog_attribute
         = attribute_substitution_map_[left_join_attribute->id()];
     probe_attribute_ids.emplace_back(probe_catalog_attribute->getID());
@@ -640,18 +633,6 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   const std::vector<E::AttributeReferencePtr> &right_join_attributes =
       physical_plan->right_join_attributes();
   for (const E::AttributeReferencePtr &right_join_attribute : right_join_attributes) {
-    // Try to determine the original stored relation referenced in the Hash Join.
-    referenced_stored_build_relation =
-        optimizer_context_->catalog_database()->getRelationByName(right_join_attribute->relation_name());
-    if (referenced_stored_build_relation == nullptr) {
-      // Hash Join optimizations are not possible, if the referenced relation cannot be determined.
-      skip_hash_join_optimization = true;
-    } else {
-      const attribute_id build_operator_attribute_id =
-          referenced_stored_build_relation->getAttributeByName(right_join_attribute->attribute_name())->getID();
-      build_original_attribute_ids.emplace_back(build_operator_attribute_id);
-    }
-
     const CatalogAttribute *build_catalog_attribute
         = attribute_substitution_map_[right_join_attribute->id()];
     build_attribute_ids.emplace_back(build_catalog_attribute->getID());
@@ -661,6 +642,20 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
     }
   }
 
+  for (const auto &bf : bloom_filter_config.probe_side_bloom_filters) {
+    const CatalogAttribute *probe_bf_catalog_attribute
+        = attribute_substitution_map_[bf.attribute->id()];
+    probe_side_bloom_filter_attribute_ids.emplace_back(
+        probe_bf_catalog_attribute->getID());
+  }
+
+  for (const auto &bf : bloom_filter_config.build_side_bloom_filters) {
+    const CatalogAttribute *build_bf_catalog_attribute
+        = attribute_substitution_map_[bf.attribute->id()];
+    build_side_bloom_filter_attribute_ids.emplace_back(
+        build_bf_catalog_attribute->getID());
+  }
+
   // Remember key types for call to SimplifyHashTableImplTypeProto() below.
   std::vector<const Type*> key_types;
   for (std::vector<E::AttributeReferencePtr>::size_type attr_idx = 0;
@@ -675,6 +670,8 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
     key_types.push_back(&left_attribute_type);
   }
 
+  std::size_t build_cardinality = cost_model_->estimateCardinality(build_physical);
+
   // Convert the residual predicate proto.
   QueryContext::predicate_id residual_predicate_index = QueryContext::kInvalidPredicateId;
   if (physical_plan->residual_predicate()) {
@@ -835,9 +832,11 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
                                            join_operator_index,
                                            referenced_stored_build_relation,
                                            referenced_stored_probe_relation,
-                                           std::move(build_original_attribute_ids),
-                                           std::move(probe_original_attribute_ids),
-                                           join_hash_table_index);
+                                           bloom_filter_config,
+                                           std::move(build_side_bloom_filter_attribute_ids),
+                                           std::move(probe_side_bloom_filter_attribute_ids),
+                                           join_hash_table_index,
+                                           star_schema_cost_model_->estimateCardinality(build_physical));
   }
 }
 
@@ -1351,6 +1350,16 @@ void ExecutionGenerator::convertAggregate(
       findRelationInfoOutputByPhysical(physical_plan->input());
   aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
 
+  const P::BloomFilterConfig &bloom_filter_config =
+      physical_plan->bloom_filter_config();
+  std::vector<attribute_id> bloom_filter_attribute_ids;
+
+  for (const auto &bf : bloom_filter_config.probe_side_bloom_filters) {
+    const CatalogAttribute *bf_catalog_attribute
+        = attribute_substitution_map_[bf.attribute->id()];
+    bloom_filter_attribute_ids.emplace_back(bf_catalog_attribute->getID());
+  }
+
   std::vector<const Type*> group_by_types;
   for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) {
     unique_ptr<const Scalar> execution_group_by_expression;
@@ -1465,6 +1474,13 @@ void ExecutionGenerator::convertAggregate(
       std::forward_as_tuple(finalize_aggregation_operator_index, output_relation));
   temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index,
                                             output_relation);
+
+  if (FLAGS_optimize_joins) {
+    execution_heuristics_->addAggregateInfo(aggregation_operator_index,
+                                            bloom_filter_config,
+                                            std::move(bloom_filter_attribute_ids),
+                                            aggr_state_index);
+  }
 }
 
 void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index d5c7b06..8358233 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -37,7 +37,6 @@
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryPlan.hpp"
 #include "query_optimizer/cost_model/CostModel.hpp"
-#include "query_optimizer/cost_model/SimpleCostModel.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
@@ -423,6 +422,7 @@ class ExecutionGenerator {
    * @brief The cost model to use for creating the execution plan.
    */
   std::unique_ptr<cost::CostModel> cost_model_;
+  std::unique_ptr<cost::CostModel> star_schema_cost_model_;
 
   physical::TopLevelPlanPtr top_level_physical_plan_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/ExecutionHeuristics.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.cpp b/query_optimizer/ExecutionHeuristics.cpp
index 4fd7320..0bef716 100644
--- a/query_optimizer/ExecutionHeuristics.cpp
+++ b/query_optimizer/ExecutionHeuristics.cpp
@@ -27,6 +27,8 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.pb.h"
 #include "query_optimizer/QueryPlan.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -34,95 +36,106 @@
 namespace quickstep {
 namespace optimizer {
 
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+static const std::size_t kNumBitsPerByte = 8;
+DEFINE_double(bloom_num_bits_per_tuple, kNumBitsPerByte,
+    "Number of bits per tuple used to size the Bloom filter.");
+
+DEFINE_int32(bloom_num_hash_fns, 1,
+    "Number of hash functions used in the Bloom filter.");
+
 void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan,
                                                 serialization::QueryContext *query_context_proto) {
-  // Currently this only optimizes left deep joins using bloom filters.
-  // It uses a simple algorithm to discover the left deep joins.
-  // It starts with the first hash join in the plan and keeps on iterating
-  // over the next hash joins, till a probe on a different relation id is found.
-  // The set of hash joins found in this way forms a chain and can be recognized
-  // as a left deep join. It becomes a candidate for optimization.
-
-  // The optimization is done by modifying each of the build operators in the chain
-  // to generate a bloom filter on the build key during their hash table creation.
-  // The leaf-level probe operator is then modified to query all the bloom
-  // filters generated from all the build operators in the chain. These
-  // bloom filters are queried to test the membership of the probe key
-  // just prior to probing the hash table.
-
-  QueryPlan::DAGNodeIndex origin_node = 0;
-  while (origin_node < hash_joins_.size() - 1) {
-    std::vector<std::size_t> chained_nodes;
-    chained_nodes.push_back(origin_node);
-    for (std::size_t i = origin_node + 1; i < hash_joins_.size(); ++i) {
-      const relation_id checked_relation_id = hash_joins_[origin_node].referenced_stored_probe_relation_->getID();
-      const relation_id expected_relation_id = hash_joins_[i].referenced_stored_probe_relation_->getID();
-      if (checked_relation_id == expected_relation_id) {
-        chained_nodes.push_back(i);
-      } else {
-        break;
-      }
+  std::map<std::pair<E::ExprId, P::PhysicalPtr>,
+           std::pair<QueryContext::bloom_filter_id, QueryPlan::DAGNodeIndex>> bloom_filter_map;
+  for (const auto &info : hash_joins_) {
+    auto *hash_table_proto =
+        query_context_proto->mutable_join_hash_tables(info.join_hash_table_id_);
+    const auto &bloom_filter_config = info.bloom_filter_config_;
+
+    for (std::size_t i = 0; i < info.build_side_bloom_filter_ids_.size(); ++i) {
+      const QueryContext::bloom_filter_id bloom_filter_id = query_context_proto->bloom_filters_size();
+      serialization::BloomFilter *bloom_filter_proto = query_context_proto->add_bloom_filters();
+      setBloomFilterProperties(bloom_filter_proto, info.estimated_build_relation_cardinality_);
+
+      const auto &build_side_bf =
+          bloom_filter_config.build_side_bloom_filters[i];
+      bloom_filter_map.emplace(
+          std::make_pair(build_side_bf.attribute->id(),
+                         bloom_filter_config.builder),
+          std::make_pair(bloom_filter_id, info.build_operator_index_));
+
+      auto *build_side_bloom_filter = hash_table_proto->add_build_side_bloom_filters();
+      build_side_bloom_filter->set_bloom_filter_id(bloom_filter_id);
+      build_side_bloom_filter->set_attr_id(info.build_side_bloom_filter_ids_[i]);
+      std::cerr << "Build " << build_side_bf.attribute->toString()
+                << " @" << bloom_filter_config.builder << "\n";
     }
+  }
 
-    // Only chains of length greater than one are suitable candidates for semi-join optimization.
-    if (chained_nodes.size() > 1) {
-      std::unordered_map<QueryContext::bloom_filter_id, std::vector<attribute_id>> probe_bloom_filter_info;
-      for (const std::size_t node : chained_nodes) {
-        // Provision for a new bloom filter to be used by the build operator.
-        const QueryContext::bloom_filter_id bloom_filter_id =  query_context_proto->bloom_filters_size();
-        serialization::BloomFilter *bloom_filter_proto = query_context_proto->add_bloom_filters();
-
-        // Modify the bloom filter properties based on the statistics of the relation.
-        setBloomFilterProperties(bloom_filter_proto, hash_joins_[node].referenced_stored_build_relation_);
-
-        // Add build-side bloom filter information to the corresponding hash table proto.
-        query_context_proto->mutable_join_hash_tables(hash_joins_[node].join_hash_table_id_)
-            ->add_build_side_bloom_filter_id(bloom_filter_id);
-
-        probe_bloom_filter_info.insert(std::make_pair(bloom_filter_id, hash_joins_[node].probe_attributes_));
-      }
-
-      // Add probe-side bloom filter information to the corresponding hash table proto for each build-side bloom filter.
-      for (const std::pair<QueryContext::bloom_filter_id, std::vector<attribute_id>>
-               &bloom_filter_info : probe_bloom_filter_info) {
-        auto *probe_side_bloom_filter =
-            query_context_proto->mutable_join_hash_tables(hash_joins_[origin_node].join_hash_table_id_)
-                                  ->add_probe_side_bloom_filters();
-        probe_side_bloom_filter->set_probe_side_bloom_filter_id(bloom_filter_info.first);
-        for (const attribute_id &probe_attribute_id : bloom_filter_info.second) {
-          probe_side_bloom_filter->add_probe_side_attr_ids(probe_attribute_id);
-        }
-      }
-
-      // Add node dependencies from chained build nodes to origin node probe.
-      for (std::size_t i = 1; i < chained_nodes.size(); ++i) {  // Note: It starts from index 1.
-        query_plan->addDirectDependency(hash_joins_[origin_node].join_operator_index_,
-                                        hash_joins_[origin_node + i].build_operator_index_,
-                                        true /* is_pipeline_breaker */);
-      }
+  for (const auto &info : hash_joins_) {
+    auto *hash_table_proto =
+        query_context_proto->mutable_join_hash_tables(info.join_hash_table_id_);
+    const auto &bloom_filter_config = info.bloom_filter_config_;
+
+    for (std::size_t i = 0; i < info.probe_side_bloom_filter_ids_.size(); ++i) {
+      auto *probe_side_bloom_filter = hash_table_proto->add_probe_side_bloom_filters();
+      const auto &probe_side_bf =
+          bloom_filter_config.probe_side_bloom_filters[i];
+      std::cerr << "HashJoin probe " << probe_side_bf.attribute->toString()
+                << " @" << probe_side_bf.builder << "\n";
+
+      const auto &build_side_info =
+           bloom_filter_map.at(
+               std::make_pair(probe_side_bf.source_attribute->id(),
+                              probe_side_bf.builder));
+      probe_side_bloom_filter->set_bloom_filter_id(build_side_info.first);
+      probe_side_bloom_filter->set_attr_id(info.probe_side_bloom_filter_ids_[i]);
+//      std::cerr << "HashJoin probe attr_id = " << info.probe_side_bloom_filter_ids_[i] << "\n";
+
+      query_plan->addDirectDependency(info.join_operator_index_,
+                                      build_side_info.second,
+                                      true /* is_pipeline_breaker */);
     }
+  }
 
-    // Update the origin node.
-    origin_node = chained_nodes.back() + 1;
+  for (const auto &info : aggregates_) {
+    auto *aggregate_proto =
+        query_context_proto->mutable_aggregation_states(info.aggregate_state_id_);
+    const auto &bloom_filter_config = info.bloom_filter_config_;
+
+    for (std::size_t i = 0; i < info.bloom_filter_ids_.size(); ++i) {
+      auto *bloom_filter = aggregate_proto->add_bloom_filters();
+      const auto &bf =
+          bloom_filter_config.probe_side_bloom_filters[i];
+      std::cerr << "Aggregate probe " << bf.attribute->toString()
+                << " @" << bf.builder << "\n";
+
+      const auto &build_side_info =
+           bloom_filter_map.at(
+               std::make_pair(bf.source_attribute->id(),
+                              bf.builder));
+      bloom_filter->set_bloom_filter_id(build_side_info.first);
+      bloom_filter->set_attr_id(info.bloom_filter_ids_[i]);
+//      std::cerr << "Aggregate probe attr_id = "
+//                << info.bloom_filter_ids_[i] << "\n";
+
+      query_plan->addDirectDependency(info.aggregate_operator_index_,
+                                      build_side_info.second,
+                                      true /* is_pipeline_breaker */);
+    }
   }
 }
 
 void ExecutionHeuristics::setBloomFilterProperties(serialization::BloomFilter *bloom_filter_proto,
-                                                   const CatalogRelation *relation) {
-  const std::size_t cardinality = relation->estimateTupleCardinality();
-  if (cardinality < kOneThousand) {
-    bloom_filter_proto->set_bloom_filter_size(kOneThousand / kCompressionFactor);
-    bloom_filter_proto->set_number_of_hashes(kVeryLowSparsityHash);
-  } else if (cardinality < kTenThousand) {
-    bloom_filter_proto->set_bloom_filter_size(kTenThousand / kCompressionFactor);
-    bloom_filter_proto->set_number_of_hashes(kLowSparsityHash);
-  } else if (cardinality < kHundredThousand) {
-    bloom_filter_proto->set_bloom_filter_size(kHundredThousand / kCompressionFactor);
-    bloom_filter_proto->set_number_of_hashes(kMediumSparsityHash);
-  } else {
-    bloom_filter_proto->set_bloom_filter_size(kMillion / kCompressionFactor);
-    bloom_filter_proto->set_number_of_hashes(kHighSparsityHash);
-  }
+                                                   const std::size_t cardinality) {
+  bloom_filter_proto->set_bloom_filter_size(
+      BloomFilter::getNearestAllowedSize(
+          (FLAGS_bloom_num_bits_per_tuple * cardinality) / kNumBitsPerByte));
+//  std::cerr << "bf size = " << bloom_filter_proto->bloom_filter_size() << "\n";
+  bloom_filter_proto->set_number_of_hashes(FLAGS_bloom_num_hash_fns);
 }
 
 }  // namespace optimizer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/ExecutionHeuristics.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.hpp b/query_optimizer/ExecutionHeuristics.hpp
index 8ad3b7a..9e5efc5 100644
--- a/query_optimizer/ExecutionHeuristics.hpp
+++ b/query_optimizer/ExecutionHeuristics.hpp
@@ -27,6 +27,7 @@
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryContext.pb.h"
 #include "query_optimizer/QueryPlan.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -51,7 +52,7 @@ class ExecutionHeuristics {
   static const std::size_t kHundredThousand = 100000;
   static const std::size_t kMillion = 1000000;
 
-  static const std::size_t kCompressionFactor = 10;
+  static const std::size_t kCompressionFactor = 1;
 
   static const std::size_t kVeryLowSparsityHash = 1;
   static const std::size_t kLowSparsityHash = 2;
@@ -67,25 +68,48 @@ class ExecutionHeuristics {
                  const QueryPlan::DAGNodeIndex join_operator_index,
                  const CatalogRelation *referenced_stored_build_relation,
                  const CatalogRelation *referenced_stored_probe_relation,
-                 std::vector<attribute_id> &&build_attributes,
-                 std::vector<attribute_id> &&probe_attributes,
-                 const QueryContext::join_hash_table_id join_hash_table_id)
+                 const physical::BloomFilterConfig &bloom_filter_config,
+                 std::vector<attribute_id> &&build_side_bloom_filter_ids,
+                 std::vector<attribute_id> &&probe_side_bloom_filter_ids,
+                 const QueryContext::join_hash_table_id join_hash_table_id,
+                 const std::size_t estimated_build_relation_cardinality)
         : build_operator_index_(build_operator_index),
           join_operator_index_(join_operator_index),
           referenced_stored_build_relation_(referenced_stored_build_relation),
           referenced_stored_probe_relation_(referenced_stored_probe_relation),
-          build_attributes_(std::move(build_attributes)),
-          probe_attributes_(std::move(probe_attributes)),
-          join_hash_table_id_(join_hash_table_id) {
+          bloom_filter_config_(bloom_filter_config),
+          build_side_bloom_filter_ids_(std::move(build_side_bloom_filter_ids)),
+          probe_side_bloom_filter_ids_(std::move(probe_side_bloom_filter_ids)),
+          join_hash_table_id_(join_hash_table_id),
+          estimated_build_relation_cardinality_(estimated_build_relation_cardinality) {
     }
 
     const QueryPlan::DAGNodeIndex build_operator_index_;
     const QueryPlan::DAGNodeIndex join_operator_index_;
     const CatalogRelation *referenced_stored_build_relation_;
     const CatalogRelation *referenced_stored_probe_relation_;
-    const std::vector<attribute_id> build_attributes_;
-    const std::vector<attribute_id> probe_attributes_;
+    const physical::BloomFilterConfig &bloom_filter_config_;
+    const std::vector<attribute_id> build_side_bloom_filter_ids_;
+    const std::vector<attribute_id> probe_side_bloom_filter_ids_;
     const QueryContext::join_hash_table_id join_hash_table_id_;
+    const std::size_t estimated_build_relation_cardinality_;
+  };
+
+  struct AggregateInfo {
+    AggregateInfo(const QueryPlan::DAGNodeIndex aggregate_operator_index,
+                  const physical::BloomFilterConfig &bloom_filter_config,
+                  std::vector<attribute_id> &&bloom_filter_ids,
+                  const QueryContext::aggregation_state_id aggregate_state_id)
+        : aggregate_operator_index_(aggregate_operator_index),
+          bloom_filter_config_(bloom_filter_config),
+          bloom_filter_ids_(bloom_filter_ids),
+          aggregate_state_id_(aggregate_state_id) {
+    }
+
+    const QueryPlan::DAGNodeIndex aggregate_operator_index_;
+    const physical::BloomFilterConfig &bloom_filter_config_;
+    const std::vector<attribute_id> bloom_filter_ids_;
+    const QueryContext::aggregation_state_id aggregate_state_id_;
   };
 
 
@@ -111,16 +135,30 @@ class ExecutionHeuristics {
                               const QueryPlan::DAGNodeIndex join_operator_index,
                               const CatalogRelation *referenced_stored_build_relation,
                               const CatalogRelation *referenced_stored_probe_relation,
-                              std::vector<attribute_id> &&build_attributes,
-                              std::vector<attribute_id> &&probe_attributes,
-                              const QueryContext::join_hash_table_id join_hash_table_id) {
-    hash_joins_.push_back(HashJoinInfo(build_operator_index,
-                                       join_operator_index,
-                                       referenced_stored_build_relation,
-                                       referenced_stored_probe_relation,
-                                       std::move(build_attributes),
-                                       std::move(probe_attributes),
-                                       join_hash_table_id));
+                              const physical::BloomFilterConfig &bloom_filter_config,
+                              std::vector<attribute_id> &&build_side_bloom_filter_ids,
+                              std::vector<attribute_id> &&probe_side_bloom_filter_ids,
+                              const QueryContext::join_hash_table_id join_hash_table_id,
+                              const std::size_t estimated_build_relation_cardinality) {
+    hash_joins_.emplace_back(build_operator_index,
+                             join_operator_index,
+                             referenced_stored_build_relation,
+                             referenced_stored_probe_relation,
+                             bloom_filter_config,
+                             std::move(build_side_bloom_filter_ids),
+                             std::move(probe_side_bloom_filter_ids),
+                             join_hash_table_id,
+                             estimated_build_relation_cardinality);
+  }
+
+  inline void addAggregateInfo(const QueryPlan::DAGNodeIndex aggregate_operator_index,
+                               const physical::BloomFilterConfig &bloom_filter_config,
+                               std::vector<attribute_id> &&bloom_filter_ids,
+                               const QueryContext::aggregation_state_id aggregate_state_id) {
+    aggregates_.emplace_back(aggregate_operator_index,
+                             bloom_filter_config,
+                             std::move(bloom_filter_ids),
+                             aggregate_state_id);
   }
 
   /**
@@ -141,10 +179,11 @@ class ExecutionHeuristics {
    * @param relation The catalog relation on which bloom filter is being built.
    **/
   void setBloomFilterProperties(serialization::BloomFilter *bloom_filter_proto,
-                                const CatalogRelation *relation);
+                                const std::size_t cardinality);
 
  private:
   std::vector<HashJoinInfo> hash_joins_;
+  std::vector<AggregateInfo> aggregates_;
 
   DISALLOW_COPY_AND_ASSIGN(ExecutionHeuristics);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index 8f19702..9ee685d 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -26,6 +26,7 @@
 #include "query_optimizer/Validator.hpp"
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/AttachBloomFilters.hpp"
 #include "query_optimizer/rules/PruneColumns.hpp"
 #include "query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp"
 #include "query_optimizer/rules/SwapProbeBuild.hpp"
@@ -96,10 +97,12 @@ P::PhysicalPtr PhysicalGenerator::generateInitialPlan(
 P::PhysicalPtr PhysicalGenerator::optimizePlan() {
   std::vector<std::unique_ptr<Rule<P::Physical>>> rules;
   if (FLAGS_reorder_hash_joins) {
+    rules.emplace_back(new PruneColumns());
     rules.emplace_back(new StarSchemaHashJoinOrderOptimization());
   }
   rules.emplace_back(new PruneColumns());
-  rules.emplace_back(new SwapProbeBuild());
+  // rules.emplace_back(new SwapProbeBuild());
+  rules.emplace_back(new AttachBloomFilters());
 
   for (std::unique_ptr<Rule<P::Physical>> &rule : rules) {
     physical_plan_ = rule->apply(physical_plan_);
@@ -110,7 +113,7 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
   DVLOG(4) << "Optimized physical plan:\n" << physical_plan_->toString();
 
   if (FLAGS_visualize_plan) {
-  quickstep::PlanVisualizer plan_visualizer;
+    quickstep::PlanVisualizer plan_visualizer;
     std::cerr << "\n" << plan_visualizer.visualize(physical_plan_) << "\n";
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/cost_model/SimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.cpp b/query_optimizer/cost_model/SimpleCostModel.cpp
index f313c90..45e2f00 100644
--- a/query_optimizer/cost_model/SimpleCostModel.cpp
+++ b/query_optimizer/cost_model/SimpleCostModel.cpp
@@ -88,7 +88,7 @@ std::size_t SimpleCostModel::estimateCardinalityForTopLevelPlan(
 
 std::size_t SimpleCostModel::estimateCardinalityForTableReference(
     const P::TableReferencePtr &physical_plan) {
-  return physical_plan->relation()->estimateTupleCardinality();
+  return physical_plan->relation()->getStatistics().getNumTuples();
 }
 
 std::size_t SimpleCostModel::estimateCardinalityForSelection(
@@ -119,7 +119,7 @@ std::size_t SimpleCostModel::estimateCardinalityForAggregate(
     return 1;
   }
   return std::max(static_cast<std::size_t>(1),
-                  estimateCardinality(physical_plan->input()) / 10);
+                  estimateCardinality(physical_plan->input()));
 }
 
 std::size_t SimpleCostModel::estimateCardinalityForWindowAggregate(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index 911a765..9eea27c 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -123,12 +123,26 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForTableGenerator(
 
 std::size_t StarSchemaSimpleCostModel::estimateCardinalityForHashJoin(
     const P::HashJoinPtr &physical_plan) {
-  std::size_t left_cardinality = estimateCardinality(physical_plan->left());
-  std::size_t right_cardinality = estimateCardinality(physical_plan->right());
-  double left_selectivity = estimateSelectivity(physical_plan->left());
-  double right_selectivity = estimateSelectivity(physical_plan->right());
-  return std::max(static_cast<std::size_t>(left_cardinality * right_selectivity) + 1,
-                  static_cast<std::size_t>(right_cardinality * left_selectivity) + 1);
+  const P::PhysicalPtr &left_child = physical_plan->left();
+  const P::PhysicalPtr &right_child = physical_plan->right();
+
+  std::size_t left_cardinality = estimateCardinality(left_child);
+  std::size_t right_cardinality = estimateCardinality(right_child);
+
+  std::size_t estimated_cardinality = std::max(left_cardinality, right_cardinality);
+  if (left_child->impliesUniqueAttributes(physical_plan->left_join_attributes())) {
+    double left_selectivity = estimateSelectivity(left_child);
+    estimated_cardinality =
+        std::min(estimated_cardinality,
+                 static_cast<std::size_t>(right_cardinality * left_selectivity));
+  }
+  if (right_child->impliesUniqueAttributes(physical_plan->right_join_attributes())) {
+    double right_selectivity = estimateSelectivity(right_child);
+    estimated_cardinality =
+        std::min(estimated_cardinality,
+                 static_cast<std::size_t>(left_cardinality * right_selectivity));
+  }
+  return estimated_cardinality;
 }
 
 std::size_t StarSchemaSimpleCostModel::estimateCardinalityForNestedLoopsJoin(
@@ -143,7 +157,7 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForAggregate(
     return 1;
   }
   return std::max(static_cast<std::size_t>(1),
-                  estimateCardinality(physical_plan->input()) / 10);
+                  estimateCardinality(physical_plan->input()) / 100);
 }
 
 std::size_t StarSchemaSimpleCostModel::estimateCardinalityForWindowAggregate(
@@ -161,8 +175,14 @@ double StarSchemaSimpleCostModel::estimateSelectivity(
     case P::PhysicalType::kHashJoin: {
       const P::HashJoinPtr &hash_join =
           std::static_pointer_cast<const P::HashJoin>(physical_plan);
-      return std::min(estimateSelectivity(hash_join->left()),
-                      estimateSelectivity(hash_join->right()));
+      double left_selectivity = estimateSelectivity(hash_join->left());
+      double right_selectivity = estimateSelectivity(hash_join->right());
+      double min_sel = std::min(left_selectivity, right_selectivity);
+      double max_sel = std::max(left_selectivity, right_selectivity);
+      if (max_sel < 1) {
+        min_sel *= std::max(max_sel, 0.9);
+      }
+      return min_sel;
     }
     case P::PhysicalType::kNestedLoopsJoin: {
       const P::NestedLoopsJoinPtr &nested_loop_join =
@@ -215,7 +235,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForPredicate(
     case E::ExpressionType::kComparisonExpression: {
       // Case 1 - Number of distinct values statistics available
       //   Case 1.1 - Equality comparison: 1.0 / num_distinct_values
-      //   Case 1.2 - Otherwise: 5.0 / num_distinct_values
+      //   Case 1.2 - Otherwise: 0.5
       // Case 2 - Number of distinct values statistics not available
       //   Case 2.1 - Equality comparison: 0.1
       //   Case 2.2 - Otherwise: 0.5
@@ -231,7 +251,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForPredicate(
           double unit_selectivity = 1.0 / it->second;
           return comparison_expression->isEqualityComparisonPredicate()
                      ? unit_selectivity
-                     : std::min(0.5, unit_selectivity * 5.0);
+                     : 0.5;
         }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/expressions/ExpressionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/ExpressionUtil.hpp b/query_optimizer/expressions/ExpressionUtil.hpp
index e9a4067..16c219e 100644
--- a/query_optimizer/expressions/ExpressionUtil.hpp
+++ b/query_optimizer/expressions/ExpressionUtil.hpp
@@ -103,12 +103,12 @@ bool ContainsExpression(
  *              contain the other operand).
  * @return True if \p left is a subset of \p right.
  */
-template <class NamedExpressionType>
+template <class NamedExpressionType1, class NamedExpressionType2>
 bool SubsetOfExpressions(
-    const std::vector<std::shared_ptr<const NamedExpressionType>> &left,
-    const std::vector<std::shared_ptr<const NamedExpressionType>> &right) {
+    const std::vector<std::shared_ptr<const NamedExpressionType1>> &left,
+    const std::vector<std::shared_ptr<const NamedExpressionType2>> &right) {
   UnorderedNamedExpressionSet supset(right.begin(), right.end());
-  for (const std::shared_ptr<const NamedExpressionType> &expr : left) {
+  for (const std::shared_ptr<const NamedExpressionType1> &expr : left) {
     if (supset.find(expr) == supset.end()) {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/physical/Aggregate.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Aggregate.cpp b/query_optimizer/physical/Aggregate.cpp
index 35476af..6d07a52 100644
--- a/query_optimizer/physical/Aggregate.cpp
+++ b/query_optimizer/physical/Aggregate.cpp
@@ -87,6 +87,11 @@ std::vector<E::AttributeReferencePtr> Aggregate::getReferencedAttributes()
   return referenced_attributes;
 }
 
+bool Aggregate::impliesUniqueAttributes(
+    const std::vector<expressions::AttributeReferencePtr> &attributes) const {
+  return E::SubsetOfExpressions(grouping_expressions_, attributes);
+}
+
 void Aggregate::getFieldStringItems(
     std::vector<std::string> *inline_field_names,
     std::vector<std::string> *inline_field_values,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/physical/Aggregate.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Aggregate.hpp b/query_optimizer/physical/Aggregate.hpp
index de36cc3..855d783 100644
--- a/query_optimizer/physical/Aggregate.hpp
+++ b/query_optimizer/physical/Aggregate.hpp
@@ -100,6 +100,13 @@ class Aggregate : public Physical {
     return false;
   }
 
+  bool impliesUniqueAttributes(
+      const std::vector<expressions::AttributeReferencePtr> &attributes) const override;
+
+  const BloomFilterConfig &bloom_filter_config() const {
+    return bloom_filter_config_;
+  }
+
   /**
    * @brief Creates an Aggregate physical node.
    *
@@ -113,9 +120,14 @@ class Aggregate : public Physical {
       PhysicalPtr input,
       const std::vector<expressions::NamedExpressionPtr> &grouping_expressions,
       const std::vector<expressions::AliasPtr> &aggregate_expressions,
-      const expressions::PredicatePtr &filter_predicate) {
+      const expressions::PredicatePtr &filter_predicate,
+      const BloomFilterConfig bloom_filter_config = BloomFilterConfig()) {
     return AggregatePtr(
-        new Aggregate(input, grouping_expressions, aggregate_expressions, filter_predicate));
+        new Aggregate(input,
+                      grouping_expressions,
+                      aggregate_expressions,
+                      filter_predicate,
+                      bloom_filter_config));
   }
 
  protected:
@@ -132,11 +144,13 @@ class Aggregate : public Physical {
       PhysicalPtr input,
       const std::vector<expressions::NamedExpressionPtr> &grouping_expressions,
       const std::vector<expressions::AliasPtr> &aggregate_expressions,
-      const expressions::PredicatePtr &filter_predicate)
+      const expressions::PredicatePtr &filter_predicate,
+      const BloomFilterConfig &bloom_filter_config)
       : input_(input),
         grouping_expressions_(grouping_expressions),
         aggregate_expressions_(aggregate_expressions),
-        filter_predicate_(filter_predicate) {
+        filter_predicate_(filter_predicate),
+        bloom_filter_config_(bloom_filter_config) {
     addChild(input_);
   }
 
@@ -144,6 +158,7 @@ class Aggregate : public Physical {
   std::vector<expressions::NamedExpressionPtr> grouping_expressions_;
   std::vector<expressions::AliasPtr> aggregate_expressions_;
   expressions::PredicatePtr filter_predicate_;
+  BloomFilterConfig bloom_filter_config_;
 
   DISALLOW_COPY_AND_ASSIGN(Aggregate);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/physical/HashJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.cpp b/query_optimizer/physical/HashJoin.cpp
index e186072..883c87a 100644
--- a/query_optimizer/physical/HashJoin.cpp
+++ b/query_optimizer/physical/HashJoin.cpp
@@ -85,6 +85,15 @@ bool HashJoin::maybeCopyWithPrunedExpressions(
   return false;
 }
 
+bool HashJoin::impliesUniqueAttributes(
+    const std::vector<expressions::AttributeReferencePtr> &attributes) const {
+  return (left()->impliesUniqueAttributes(left_join_attributes_)
+              && right()->impliesUniqueAttributes(attributes))
+         || (right()->impliesUniqueAttributes(right_join_attributes_)
+                 && left()->impliesUniqueAttributes(attributes));
+
+}
+
 void HashJoin::getFieldStringItems(
     std::vector<std::string> *inline_field_names,
     std::vector<std::string> *inline_field_values,
@@ -106,6 +115,24 @@ void HashJoin::getFieldStringItems(
   container_child_fields->push_back(CastSharedPtrVector<OptimizerTreeBase>(left_join_attributes_));
   container_child_field_names->push_back("right_join_attributes");
   container_child_fields->push_back(CastSharedPtrVector<OptimizerTreeBase>(right_join_attributes_));
+
+  if (!bloom_filter_config_.build_side_bloom_filters.empty()) {
+    container_child_field_names->push_back("build_side_bloom_filters");
+    container_child_fields->emplace_back();
+    auto &container = container_child_fields->back();
+    for (const auto& bf : bloom_filter_config_.build_side_bloom_filters) {
+      container.emplace_back(bf.attribute);
+    }
+  }
+
+  if (!bloom_filter_config_.probe_side_bloom_filters.empty()) {
+    container_child_field_names->push_back("probe_side_bloom_filters");
+    container_child_fields->emplace_back();
+    auto &container = container_child_fields->back();
+    for (const auto& bf : bloom_filter_config_.probe_side_bloom_filters) {
+      container.emplace_back(bf.attribute);
+    }
+  }
 }
 
 }  // namespace physical

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/physical/HashJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.hpp b/query_optimizer/physical/HashJoin.hpp
index c513f77..a830d0b 100644
--- a/query_optimizer/physical/HashJoin.hpp
+++ b/query_optimizer/physical/HashJoin.hpp
@@ -116,7 +116,8 @@ class HashJoin : public BinaryJoin {
                   right_join_attributes_,
                   residual_predicate_,
                   project_expressions(),
-                  join_type_);
+                  join_type_,
+                  bloom_filter_config_);
   }
 
   std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
@@ -125,6 +126,13 @@ class HashJoin : public BinaryJoin {
       const expressions::UnorderedNamedExpressionSet &referenced_expressions,
       PhysicalPtr *output) const override;
 
+  bool impliesUniqueAttributes(
+      const std::vector<expressions::AttributeReferencePtr> &attributes) const override;
+
+  const BloomFilterConfig &bloom_filter_config() const {
+    return bloom_filter_config_;
+  }
+
   /**
    * @brief Creates a physical HashJoin. The left/right operand does not correspond to
    *        probe/build operand.
@@ -145,7 +153,8 @@ class HashJoin : public BinaryJoin {
       const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
       const expressions::PredicatePtr &residual_predicate,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions,
-      const JoinType join_type) {
+      const JoinType join_type,
+      const BloomFilterConfig bloom_filter_config = BloomFilterConfig()) {
     return HashJoinPtr(
         new HashJoin(left,
                      right,
@@ -153,7 +162,8 @@ class HashJoin : public BinaryJoin {
                      right_join_attributes,
                      residual_predicate,
                      project_expressions,
-                     join_type));
+                     join_type,
+                     bloom_filter_config));
   }
 
  protected:
@@ -173,18 +183,21 @@ class HashJoin : public BinaryJoin {
       const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
       const expressions::PredicatePtr &residual_predicate,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions,
-      const JoinType join_type)
+      const JoinType join_type,
+      const BloomFilterConfig &bloom_filter_config)
       : BinaryJoin(left, right, project_expressions),
         left_join_attributes_(left_join_attributes),
         right_join_attributes_(right_join_attributes),
         residual_predicate_(residual_predicate),
-        join_type_(join_type) {
+        join_type_(join_type),
+        bloom_filter_config_(bloom_filter_config) {
   }
 
   std::vector<expressions::AttributeReferencePtr> left_join_attributes_;
   std::vector<expressions::AttributeReferencePtr> right_join_attributes_;
   expressions::PredicatePtr residual_predicate_;
   JoinType join_type_;
+  BloomFilterConfig bloom_filter_config_;
 
   DISALLOW_COPY_AND_ASSIGN(HashJoin);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/physical/Physical.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Physical.hpp b/query_optimizer/physical/Physical.hpp
index 4bed593..f70d142 100644
--- a/query_optimizer/physical/Physical.hpp
+++ b/query_optimizer/physical/Physical.hpp
@@ -41,6 +41,56 @@ namespace physical {
 class Physical;
 typedef std::shared_ptr<const Physical> PhysicalPtr;
 
+struct BloomFilterConfig {
+  struct BuildSide {
+    BuildSide(const expressions::AttributeReferencePtr &attribute_in)
+        : attribute(attribute_in) {
+    }
+    expressions::AttributeReferencePtr attribute;
+  };
+  struct ProbeSide {
+    ProbeSide(const expressions::AttributeReferencePtr &attribute_in,
+              const expressions::AttributeReferencePtr &source_attribute_in,
+              const physical::PhysicalPtr &builder_in)
+        : attribute(attribute_in),
+          source_attribute(source_attribute_in),
+          builder(builder_in) {
+    }
+    expressions::AttributeReferencePtr attribute;
+    expressions::AttributeReferencePtr source_attribute;
+    PhysicalPtr builder;
+  };
+  BloomFilterConfig() {}
+  BloomFilterConfig(const PhysicalPtr &builder_in)
+      : builder(builder_in) {
+  }
+  BloomFilterConfig(const PhysicalPtr &builder_in,
+                    const std::vector<BuildSide> &build_side_bloom_filters_in,
+                    const std::vector<ProbeSide> &probe_side_bloom_filters_in)
+      : builder(builder_in),
+        build_side_bloom_filters(build_side_bloom_filters_in),
+        probe_side_bloom_filters(probe_side_bloom_filters_in) {
+  }
+  void addBuildSideBloomFilter(const expressions::AttributeReferencePtr &attribute_in) {
+    for (const auto &build_bf : build_side_bloom_filters) {
+      if (attribute_in == build_bf.attribute) {
+        return;
+      }
+    }
+    build_side_bloom_filters.emplace_back(attribute_in);
+  }
+  void addProbeSideBloomFilter(const expressions::AttributeReferencePtr &attribute_in,
+                               const expressions::AttributeReferencePtr &source_attribute_in,
+                               const physical::PhysicalPtr &builder_in) {
+    probe_side_bloom_filters.emplace_back(attribute_in,
+                                          source_attribute_in,
+                                          builder_in);
+  }
+  PhysicalPtr builder;
+  std::vector<BuildSide> build_side_bloom_filters;
+  std::vector<ProbeSide> probe_side_bloom_filters;
+};
+
 /**
  * @brief Base class for physical plan nodes.
  */
@@ -86,6 +136,11 @@ class Physical : public OptimizerTree<Physical> {
       const expressions::UnorderedNamedExpressionSet &referenced_expressions,
       PhysicalPtr *output) const = 0;
 
+  virtual bool impliesUniqueAttributes(
+      const std::vector<expressions::AttributeReferencePtr> &attributes) const {
+    return false;
+  }
+
  protected:
   /**
    * @brief Constructor.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/physical/Selection.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Selection.cpp b/query_optimizer/physical/Selection.cpp
index 36ade04..73af500 100644
--- a/query_optimizer/physical/Selection.cpp
+++ b/query_optimizer/physical/Selection.cpp
@@ -82,6 +82,12 @@ bool Selection::maybeCopyWithPrunedExpressions(
   return false;
 }
 
+bool Selection::impliesUniqueAttributes(
+    const std::vector<expressions::AttributeReferencePtr> &attributes) const {
+  return input()->impliesUniqueAttributes(attributes);
+}
+
+
 void Selection::getFieldStringItems(
     std::vector<std::string> *inline_field_names,
     std::vector<std::string> *inline_field_values,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/physical/Selection.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Selection.hpp b/query_optimizer/physical/Selection.hpp
index b6874a1..f42fc71 100644
--- a/query_optimizer/physical/Selection.hpp
+++ b/query_optimizer/physical/Selection.hpp
@@ -86,6 +86,9 @@ class Selection : public Physical {
       const expressions::UnorderedNamedExpressionSet &referenced_attributes,
       PhysicalPtr *output) const override;
 
+  bool impliesUniqueAttributes(
+      const std::vector<expressions::AttributeReferencePtr> &attributes) const override;
+
   /**
    * @brief Creates a Selection.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/physical/TableReference.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/TableReference.cpp b/query_optimizer/physical/TableReference.cpp
index bfd6464..399ee51 100644
--- a/query_optimizer/physical/TableReference.cpp
+++ b/query_optimizer/physical/TableReference.cpp
@@ -20,6 +20,7 @@
 #include "query_optimizer/physical/TableReference.hpp"
 
 #include <string>
+#include <set>
 #include <vector>
 
 #include "catalog/CatalogRelation.hpp"
@@ -32,6 +33,23 @@ namespace physical {
 
 namespace E = ::quickstep::optimizer::expressions;
 
+bool TableReference::impliesUniqueAttributes(
+    const std::vector<expressions::AttributeReferencePtr> &attributes) const {
+  std::set<E::ExprId> attr_ids;
+  for (const auto &attr : attributes) {
+    attr_ids.emplace(attr->id());
+  }
+
+  std::set<attribute_id> rel_attr_ids;
+  for (std::size_t i = 0; i < attribute_list_.size(); ++i) {
+    if (attr_ids.find(attribute_list_[i]->id()) != attr_ids.end()) {
+      rel_attr_ids.emplace(i);
+    }
+  }
+
+  return relation_->getConstraints().impliesUniqueAttributes(rel_attr_ids);
+}
+
 void TableReference::getFieldStringItems(
     std::vector<std::string> *inline_field_names,
     std::vector<std::string> *inline_field_values,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/query_optimizer/physical/TableReference.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/TableReference.hpp b/query_optimizer/physical/TableReference.hpp
index 638d73b..7643c07 100644
--- a/query_optimizer/physical/TableReference.hpp
+++ b/query_optimizer/physical/TableReference.hpp
@@ -90,6 +90,9 @@ class TableReference : public Physical {
     return false;
   }
 
+  bool impliesUniqueAttributes(
+      const std::vector<expressions::AttributeReferencePtr> &attributes) const override;
+
   /**
    * @brief Creates a TableReference.
    *