You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/10/19 19:34:12 UTC

[1/7] incubator-quickstep git commit: Update travis to run only 1 thread while building release version [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/partitioned-aggregate-new aaba94624 -> 7f0269bfd (forced update)


Update travis to run only 1 thread while building release version


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/160276c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/160276c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/160276c7

Branch: refs/heads/partitioned-aggregate-new
Commit: 160276c7d790883ad667105cf1ec3e43a487855c
Parents: 17ffbb0
Author: Saket Saurabh <ss...@cs.wisc.edu>
Authored: Mon Oct 17 23:18:30 2016 -0500
Committer: Saket Saurabh <ss...@cs.wisc.edu>
Committed: Tue Oct 18 09:52:17 2016 -0500

----------------------------------------------------------------------
 .travis.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/160276c7/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 6895c0d..784a46f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -21,7 +21,7 @@ env:
   - BUILD_TYPE=Release VECTOR_COPY_ELISION_LEVEL=selection
 
 install:
-  - if [ "$CC" = "gcc" ] || [[ "$BUILD_TYPE" = "Release" &&  "$VECTOR_COPY_ELISION_LEVEL" = "selection" ]]; then
+  - if [ "$CC" = "gcc" ] || [[ "$BUILD_TYPE" = "Release" ]]; then
       export MAKE_JOBS=1;
     else
       export MAKE_JOBS=2;


[7/7] incubator-quickstep git commit: Support for performing partitioned aggregation.

Posted by hb...@apache.org.
Support for performing partitioned aggregation.

- Used for creating a pool of hash tables such that each hash table
  belongs to a unique partition.
- The partitioning is done on the group-by keys.
- Wrote a utility function to compute composite hash of a group of
  TypedValues.
- Added a check for whether the aggregation is partitioned or not.
- The conditions for whether the aggregation can be partitioned
  are as follows:
  1. The query has a GROUP BY clause.
  2. There are no aggrgeations with a DISTINCT clause.
  3. The estimated number of groups are greater than a pre-defined
  threshold.
  4. The query has at least one aggregation function.
- Method for partitioned aggregation with GROUP BY
- StorageBlock now provides a method for performing GROUP BY aggregation
  in a partitioned way.
- The Tuple class now supports a method to compute the hash of the entire
  tuple (i.e. hash key is the composite key made up of all the
  attributes in the tuple).
- AggregationOperationState calls appropriate method (i.e.
  aggregateGroupBy or aggregateGroupByPartitioned) based on the way in
  which aggregation is being performed.
- Set each hash table's size estimate to be overall estimate divided by
  the number of partitions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7f0269bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7f0269bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7f0269bf

Branch: refs/heads/partitioned-aggregate-new
Commit: 7f0269bfd5fb3a5056290d3c5ab1dbe203dadaec
Parents: d3a0920
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Wed Sep 21 11:43:39 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Wed Oct 19 14:31:14 2016 -0500

----------------------------------------------------------------------
 .../FinalizeAggregationOperator.cpp             |  35 ++-
 .../FinalizeAggregationOperator.hpp             |   9 +-
 storage/AggregationOperationState.cpp           | 135 ++++++++---
 storage/AggregationOperationState.hpp           |  52 +++++
 storage/CMakeLists.txt                          |  12 +
 storage/HashTablePool.hpp                       |  17 +-
 storage/PartitionedHashTablePool.hpp            | 224 +++++++++++++++++++
 storage/StorageBlock.cpp                        | 115 ++++++++++
 storage/StorageBlock.hpp                        |  43 ++++
 types/containers/CMakeLists.txt                 |   1 +
 types/containers/Tuple.hpp                      |   8 +
 utility/CMakeLists.txt                          |   6 +
 utility/CompositeHash.hpp                       |  52 +++++
 13 files changed, 661 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0269bf/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 7e337de..55d1357 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -41,12 +41,29 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
 
   if (blocking_dependencies_met_ && !started_) {
     started_ = true;
-    container->addNormalWorkOrder(
-        new FinalizeAggregationWorkOrder(
-            query_id_,
-            query_context->getAggregationState(aggr_state_index_),
-            query_context->getInsertDestination(output_destination_index_)),
-        op_index_);
+    DCHECK(query_context->getAggregationState(aggr_state_index_) != nullptr);
+    if (query_context->getAggregationState(aggr_state_index_)->isAggregatePartitioned()) {
+      // The same AggregationState is shared across all the WorkOrders.
+      for (std::size_t part_id = 0;
+           part_id < query_context->getAggregationState(aggr_state_index_)
+                         ->getNumPartitions();
+           ++part_id) {
+        container->addNormalWorkOrder(
+            new FinalizeAggregationWorkOrder(
+                query_id_,
+                query_context->getAggregationState(aggr_state_index_),
+                query_context->getInsertDestination(output_destination_index_),
+                static_cast<int>(part_id)),
+            op_index_);
+      }
+    } else {
+      container->addNormalWorkOrder(
+          new FinalizeAggregationWorkOrder(
+              query_id_,
+              query_context->getAggregationState(aggr_state_index_),
+              query_context->getInsertDestination(output_destination_index_)),
+          op_index_);
+    }
   }
   return started_;
 }
@@ -70,7 +87,11 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
 
 
 void FinalizeAggregationWorkOrder::execute() {
-  state_->finalizeAggregate(output_destination_);
+  if (state_->isAggregatePartitioned()) {
+    state_->finalizeAggregatePartitioned(part_id_, output_destination_);
+  } else {
+    state_->finalizeAggregate(output_destination_);
+  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0269bf/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index 0aeac2a..ae7127a 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -119,13 +119,17 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
    * @param state The AggregationState to use.
    * @param output_destination The InsertDestination to insert aggregation
    *        results.
+   * @param part_id The partition ID for which the Finalize aggregation work
+   *        order is issued. Ignore if aggregation is not partitioned.
    */
   FinalizeAggregationWorkOrder(const std::size_t query_id,
                                AggregationOperationState *state,
-                               InsertDestination *output_destination)
+                               InsertDestination *output_destination,
+                               const int part_id = -1)
       : WorkOrder(query_id),
         state_(DCHECK_NOTNULL(state)),
-        output_destination_(DCHECK_NOTNULL(output_destination)) {}
+        output_destination_(DCHECK_NOTNULL(output_destination)),
+        part_id_(part_id) {}
 
   ~FinalizeAggregationWorkOrder() override {}
 
@@ -134,6 +138,7 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
  private:
   AggregationOperationState *state_;
   InsertDestination *output_destination_;
+  const int part_id_;
 
   DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0269bf/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 7908db1..beb1899 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -69,6 +69,8 @@ AggregationOperationState::AggregationOperationState(
     const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
     StorageManager *storage_manager)
     : input_relation_(input_relation),
+      is_aggregate_partitioned_(checkAggregatePartitioned(
+          estimated_num_entries, is_distinct, group_by, aggregate_functions)),
       predicate_(predicate),
       group_by_list_(std::move(group_by)),
       arguments_(std::move(arguments)),
@@ -166,18 +168,16 @@ AggregationOperationState::AggregationOperationState(
       }
 
       // Initialize the corresponding distinctify hash table if this is a
-      // DISTINCT
-      // aggregation.
+      // DISTINCT aggregation.
       if (*is_distinct_it) {
         std::vector<const Type *> key_types(group_by_types);
         key_types.insert(
             key_types.end(), argument_types.begin(), argument_types.end());
         // TODO(jianqiao): estimated_num_entries is quite inaccurate for
-        // estimating
-        // the number of entries in the distinctify hash table. We may estimate
-        // for each distinct aggregation an estimated_num_distinct_keys value
-        // during
-        // query optimization, if it worths.
+        // estimating the number of entries in the distinctify hash table.
+        // We may estimate for each distinct aggregation an
+        // estimated_num_distinct_keys value during query optimization, if it's
+        // worth.
         distinctify_hashtables_.emplace_back(
             AggregationStateFastHashTableFactory::CreateResizable(
                 *distinctify_hash_table_impl_types_it,
@@ -193,14 +193,24 @@ AggregationOperationState::AggregationOperationState(
     }
 
     if (!group_by_handles.empty()) {
-      // Aggregation with GROUP BY: create a HashTable pool for per-group
-      // states.
-      group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
-                                                       hash_table_impl_type,
-                                                       group_by_types,
-                                                       payload_sizes,
-                                                       group_by_handles,
-                                                       storage_manager));
+      // Aggregation with GROUP BY: create a HashTable pool.
+      if (!is_aggregate_partitioned_) {
+        group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+                                                         hash_table_impl_type,
+                                                         group_by_types,
+                                                         payload_sizes,
+                                                         group_by_handles,
+                                                         storage_manager));
+      } else {
+        partitioned_group_by_hashtable_pool_.reset(
+            new PartitionedHashTablePool(estimated_num_entries,
+                                         kNumPartitionsForAggregate,
+                                         hash_table_impl_type,
+                                         group_by_types,
+                                         payload_sizes,
+                                         group_by_handles,
+                                         storage_manager));
+      }
     }
   }
 }
@@ -439,20 +449,30 @@ void AggregationOperationState::aggregateBlockHashTable(
     }
   }
 
-  // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
-  // directly into the (threadsafe) shared global HashTable for this
-  // aggregate.
-  DCHECK(group_by_hashtable_pool_ != nullptr);
-  AggregationStateHashTableBase *agg_hash_table =
+  if (!is_aggregate_partitioned_) {
+    // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
+    // directly into the (threadsafe) shared global HashTable for this
+    // aggregate.
+    DCHECK(group_by_hashtable_pool_ != nullptr);
+    AggregationStateHashTableBase *agg_hash_table =
       group_by_hashtable_pool_->getHashTableFast();
-  DCHECK(agg_hash_table != nullptr);
-  block->aggregateGroupBy(arguments_,
-                          group_by_list_,
-                          predicate_.get(),
-                          agg_hash_table,
-                          &reuse_matches,
-                          &reuse_group_by_vectors);
-  group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+    DCHECK(agg_hash_table != nullptr);
+    block->aggregateGroupBy(arguments_,
+        group_by_list_,
+        predicate_.get(),
+        agg_hash_table,
+        &reuse_matches,
+        &reuse_group_by_vectors);
+    group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+  } else {
+    block->aggregateGroupByPartitioned(
+        arguments_,
+        group_by_list_,
+        predicate_.get(),
+        &reuse_matches,
+        &reuse_group_by_vectors,
+        partitioned_group_by_hashtable_pool_.get());
+  }
 }
 
 void AggregationOperationState::finalizeSingleState(
@@ -595,4 +615,63 @@ void AggregationOperationState::finalizeHashTable(
   output_destination->bulkInsertTuples(&complete_result);
 }
 
+void AggregationOperationState::finalizeAggregatePartitioned(
+    const std::size_t partition_id, InsertDestination *output_destination) {
+  // Each element of 'group_by_keys' is a vector of values for a particular
+  // group (which is also the prefix of the finalized Tuple for that group).
+  std::vector<std::vector<TypedValue>> group_by_keys;
+
+  // Collect per-aggregate finalized values.
+  std::vector<std::unique_ptr<ColumnVector>> final_values;
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+    AggregationStateHashTableBase *hash_table =
+        partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
+    ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
+        *hash_table, &group_by_keys, agg_idx);
+    if (agg_result_col != nullptr) {
+      final_values.emplace_back(agg_result_col);
+    }
+  }
+
+  // Reorganize 'group_by_keys' in column-major order so that we can make a
+  // ColumnVectorsValueAccessor to bulk-insert results.
+  //
+  // TODO(chasseur): Shuffling around the GROUP BY keys like this is suboptimal
+  // if there is only one aggregate. The need to do this should hopefully go
+  // away when we work out storing composite structures for multiple aggregates
+  // in a single HashTable.
+  std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
+  std::size_t group_by_element_idx = 0;
+  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
+    const Type &group_by_type = group_by_element->getType();
+    if (NativeColumnVector::UsableForType(group_by_type)) {
+      NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size());
+      group_by_cvs.emplace_back(element_cv);
+      for (std::vector<TypedValue> &group_key : group_by_keys) {
+        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+      }
+    } else {
+      IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size());
+      group_by_cvs.emplace_back(element_cv);
+      for (std::vector<TypedValue> &group_key : group_by_keys) {
+        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+      }
+    }
+    ++group_by_element_idx;
+  }
+
+  // Stitch together a ColumnVectorsValueAccessor combining the GROUP BY keys
+  // and the finalized aggregates.
+  ColumnVectorsValueAccessor complete_result;
+  for (std::unique_ptr<ColumnVector> &group_by_cv : group_by_cvs) {
+    complete_result.addColumn(group_by_cv.release());
+  }
+  for (std::unique_ptr<ColumnVector> &final_value_cv : final_values) {
+    complete_result.addColumn(final_value_cv.release());
+  }
+
+  // Bulk-insert the complete result.
+  output_destination->bulkInsertTuples(&complete_result);
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0269bf/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index cbbfc22..4dae8c7 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -32,6 +32,7 @@
 #include "storage/AggregationOperationState.pb.h"
 #include "storage/HashTableBase.hpp"
 #include "storage/HashTablePool.hpp"
+#include "storage/PartitionedHashTablePool.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
 
@@ -167,12 +168,31 @@ class AggregationOperationState {
    **/
   void finalizeAggregate(InsertDestination *output_destination);
 
+  void finalizeAggregatePartitioned(
+      const std::size_t partition_id, InsertDestination *output_destination);
+
   static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
                                      AggregationStateHashTableBase *dst);
 
+  bool isAggregatePartitioned() const {
+    return is_aggregate_partitioned_;
+  }
+
+  /**
+   * @note This method is relevant only when the aggregate is partitioned.
+   **/
+  std::size_t getNumPartitions() const {
+    return is_aggregate_partitioned_
+               ? partitioned_group_by_hashtable_pool_->getNumPartitions()
+               : 1;
+  }
+
   int dflag;
 
  private:
+  static constexpr std::size_t kPartitionedAggregateThreshold = 100;
+  static constexpr std::size_t kNumPartitionsForAggregate = 40;
+
   // Merge locally (per storage block) aggregated states with global aggregation
   // states.
   void mergeSingleState(
@@ -185,9 +205,39 @@ class AggregationOperationState {
   void finalizeSingleState(InsertDestination *output_destination);
   void finalizeHashTable(InsertDestination *output_destination);
 
+  bool checkAggregatePartitioned(
+      const std::size_t estimated_num_groups,
+      const std::vector<bool> &is_distinct,
+      const std::vector<std::unique_ptr<const Scalar>> &group_by,
+      const std::vector<const AggregateFunction *> &aggregate_functions) const {
+    // If there's no aggregation, return false.
+    if (aggregate_functions.empty()) {
+      return false;
+    }
+    // Check if there's a distinct operation involved in any aggregate, if so
+    // the aggregate can't be partitioned.
+    for (auto distinct : is_distinct) {
+      if (distinct) {
+        return false;
+      }
+    }
+    // There's no distinct aggregation involved, Check if there's at least one
+    // GROUP BY operation.
+    if (group_by.empty()) {
+      return false;
+    }
+    // There are GROUP BYs without DISTINCT. Check if the estimated number of
+    // groups is large enough to warrant a partitioned aggregation.
+    return estimated_num_groups > kPartitionedAggregateThreshold;
+  }
+
   // Common state for all aggregates in this operation: the input relation, the
   // filter predicate (if any), and the list of GROUP BY expressions (if any).
   const CatalogRelationSchema &input_relation_;
+
+  // Whether the aggregation is partitioned or not.
+  const bool is_aggregate_partitioned_;
+
   std::unique_ptr<const Predicate> predicate_;
   std::vector<std::unique_ptr<const Scalar>> group_by_list_;
 
@@ -224,6 +274,8 @@ class AggregationOperationState {
   // A vector of group by hash table pools.
   std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
 
+  std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
+
   StorageManager *storage_manager_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0269bf/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index e85e005..acc1791 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -235,6 +235,7 @@ add_library(quickstep_storage_PackedRowStoreTupleStorageSubBlock
 add_library(quickstep_storage_PackedRowStoreValueAccessor
             ../empty_src.cpp
             PackedRowStoreValueAccessor.hpp)
+add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp PartitionedHashTablePool.hpp)
 add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp PreloaderThread.hpp)
 add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp SMAIndexSubBlock.hpp)
 add_library(quickstep_storage_SeparateChainingHashTable ../empty_src.cpp SeparateChainingHashTable.hpp)
@@ -289,6 +290,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_storage_HashTableFactory
                       quickstep_storage_HashTablePool
                       quickstep_storage_InsertDestination
+                      quickstep_storage_PartitionedHashTablePool
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
@@ -848,6 +850,14 @@ target_link_libraries(quickstep_storage_PackedRowStoreValueAccessor
                       quickstep_types_TypedValue
                       quickstep_utility_BitVector
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_PartitionedHashTablePool
+                      glog
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
+                      quickstep_storage_FastHashTableFactory
+                      quickstep_storage_HashTableBase
+                      quickstep_utility_Macros
+                      quickstep_utility_StringUtil)                    
 target_link_libraries(quickstep_storage_PreloaderThread
                       glog
                       quickstep_catalog_CatalogDatabase
@@ -969,6 +979,7 @@ target_link_libraries(quickstep_storage_StorageBlock
                       quickstep_storage_IndexSubBlock
                       quickstep_storage_InsertDestinationInterface
                       quickstep_storage_PackedRowStoreTupleStorageSubBlock
+                      quickstep_storage_PartitionedHashTablePool
                       quickstep_storage_SMAIndexSubBlock
                       quickstep_storage_SplitRowStoreTupleStorageSubBlock
                       quickstep_storage_StorageBlockBase
@@ -1165,6 +1176,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_LinearOpenAddressingHashTable
                       quickstep_storage_PackedRowStoreTupleStorageSubBlock
                       quickstep_storage_PackedRowStoreValueAccessor
+                      quickstep_storage_PartitionedHashTablePool
                       quickstep_storage_PreloaderThread
                       quickstep_storage_SMAIndexSubBlock
                       quickstep_storage_SeparateChainingHashTable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0269bf/storage/HashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp
index 3cdfcb3..c34a435 100644
--- a/storage/HashTablePool.hpp
+++ b/storage/HashTablePool.hpp
@@ -77,7 +77,7 @@ class HashTablePool {
                 const std::vector<const Type *> &group_by_types,
                 AggregationHandle *agg_handle,
                 StorageManager *storage_manager)
-      : estimated_num_entries_(reduceEstimatedCardinality(estimated_num_entries)),
+      : estimated_num_entries_(setHashTableSize()),
         hash_table_impl_type_(hash_table_impl_type),
         group_by_types_(group_by_types),
         agg_handle_(DCHECK_NOTNULL(agg_handle)),
@@ -104,7 +104,7 @@ class HashTablePool {
                 const std::vector<std::size_t> &payload_sizes,
                 const std::vector<AggregationHandle *> &handles,
                 StorageManager *storage_manager)
-      : estimated_num_entries_(reduceEstimatedCardinality(estimated_num_entries)),
+      : estimated_num_entries_(setHashTableSize()),
         hash_table_impl_type_(hash_table_impl_type),
         group_by_types_(group_by_types),
         payload_sizes_(payload_sizes),
@@ -196,17 +196,12 @@ class HashTablePool {
                 storage_manager_);
   }
 
-  inline std::size_t reduceEstimatedCardinality(
-      const std::size_t original_estimate) const {
-    if (original_estimate < kEstimateReductionFactor) {
-      return original_estimate;
-    } else {
-      DCHECK_GT(kEstimateReductionFactor, 0u);
-      return original_estimate / kEstimateReductionFactor;
-    }
+  inline std::size_t setHashTableSize() const {
+    return kHashTableSize;
   }
 
-  static constexpr std::size_t kEstimateReductionFactor = 100;
+  // L2 cache size.
+  static constexpr std::size_t kHashTableSize = 262144;
 
   std::vector<std::unique_ptr<AggregationStateHashTableBase>> hash_tables_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0269bf/storage/PartitionedHashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/PartitionedHashTablePool.hpp b/storage/PartitionedHashTablePool.hpp
new file mode 100644
index 0000000..be37f78
--- /dev/null
+++ b/storage/PartitionedHashTablePool.hpp
@@ -0,0 +1,224 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_PARTITIONED_HASH_TABLE_POOL_HPP_
+#define QUICKSTEP_STORAGE_PARTITIONED_HASH_TABLE_POOL_HPP_
+
+#include <algorithm>
+#include <chrono>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
+#include "storage/FastHashTableFactory.hpp"
+#include "utility/Macros.hpp"
+#include "utility/StringUtil.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageManager;
+class Type;
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+/**
+ * @brief A pool of HashTables used for a single aggregation handle. Each
+ *        HashTable represents values from a given partition, which is
+ *        determined by the keys in the group by clause.
+ **/
+class PartitionedHashTablePool {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param estimated_num_entries The maximum number of entries in a hash table.
+   * @param num_partitions The number of partitions (i.e. number of HashTables)
+   * @param hash_table_impl_type The type of hash table implementation.
+   * @param group_by_types A vector of pointer of types which form the group by
+   *        key.
+   * @param agg_handle The aggregation handle.
+   * @param storage_manager A pointer to the storage manager.
+   *
+   * @note The estimate of number of entries is quite inaccurate at this time.
+   *       If we go by the current estimate, each hash table demands much
+   *       larger space than it actually needs, which causes the system to
+   *       either trigger evictions or worse - run out of memory. To fix this
+   *       issue, we divide the estimate by 100. The division will not affect
+   *       correctness, however it may allocate some hash tables smaller space
+   *       than their requirement, causing them to be resized during build
+   *       phase, which has a performance penalty.
+   **/
+  PartitionedHashTablePool(const std::size_t estimated_num_entries,
+                           const std::size_t num_partitions,
+                           const HashTableImplType hash_table_impl_type,
+                           const std::vector<const Type *> &group_by_types,
+                           AggregationHandle *agg_handle,
+                           StorageManager *storage_manager)
+      : estimated_num_entries_(
+            setHashTableSize(estimated_num_entries, num_partitions)),
+        num_partitions_(num_partitions),
+        hash_table_impl_type_(hash_table_impl_type),
+        group_by_types_(group_by_types),
+        agg_handle_(DCHECK_NOTNULL(agg_handle)),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+    initializeAllHashTables();
+  }
+
+  /**
+   * @brief Constructor.
+   *
+   * @note This constructor is relevant for the HashTable specialized for
+   *       aggregation.
+   *
+   * @param estimated_num_entries The maximum number of entries in a hash table.
+   * @param num_partitions The number of partitions (i.e. number of HashTables)
+   * @param hash_table_impl_type The type of hash table implementation.
+   * @param group_by_types A vector of pointer of types which form the group by
+   *        key.
+   * @param payload_sizes The sizes of the payload elements (i.e.
+   *        AggregationStates).
+   * @param handles The aggregation handles.
+   * @param storage_manager A pointer to the storage manager.
+   **/
+  PartitionedHashTablePool(const std::size_t estimated_num_entries,
+                           const std::size_t num_partitions,
+                           const HashTableImplType hash_table_impl_type,
+                           const std::vector<const Type *> &group_by_types,
+                           const std::vector<std::size_t> &payload_sizes,
+                           const std::vector<AggregationHandle *> &handles,
+                           StorageManager *storage_manager)
+      : estimated_num_entries_(
+            setHashTableSize(estimated_num_entries, num_partitions)),
+        num_partitions_(num_partitions),
+        hash_table_impl_type_(hash_table_impl_type),
+        group_by_types_(group_by_types),
+        payload_sizes_(payload_sizes),
+        handles_(handles),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+    initializeAllHashTables();
+  }
+
+  /**
+   * @brief Check out a hash table for insertion.
+   *
+   * @param partition_id The ID of the partitioned HashTable.
+   *
+   * @return A hash table pointer for the given HashTable.
+   **/
+  AggregationStateHashTableBase* getHashTable(const std::size_t partition_id) {
+    DCHECK_LT(partition_id, num_partitions_);
+    DCHECK_LT(partition_id, hash_tables_.size());
+    return hash_tables_[partition_id].get();
+  }
+
+  /**
+   * @brief Check out a hash table for insertion.
+   *
+   * @param partition_id The ID of the partitioned HashTable.
+   *
+   * @return A hash table pointer for the given HashTable.
+   **/
+  AggregationStateHashTableBase* getHashTableFast(const std::size_t partition_id) {
+    DCHECK_LT(partition_id, num_partitions_);
+    DCHECK_LT(partition_id, hash_tables_.size());
+    return hash_tables_[partition_id].get();
+  }
+
+  /**
+   * @brief Get all the hash tables from the pool.
+   *
+   * @warning The caller should ensure that this call is made when no hash table
+   *          is being checked in or checked out from the pool. In other words
+   *          the hash table pool is in read-only state.
+   *
+   * @param All the hash tables in the pool.
+   *
+   **/
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>>*
+      getAllHashTables() {
+    return &hash_tables_;
+  }
+
+  /**
+   * @brief Get the number of partitions used for the aggregation.
+   **/
+  inline std::size_t getNumPartitions() const {
+    return num_partitions_;
+  }
+
+ private:
+  void initializeAllHashTables() {
+    for (std::size_t part_num = 0; part_num < num_partitions_; ++part_num) {
+      AggregationStateHashTableBase *part_hash_table = createNewHashTableFast();
+      hash_tables_.push_back(
+          std::unique_ptr<AggregationStateHashTableBase>(part_hash_table));
+    }
+  }
+
+  AggregationStateHashTableBase* createNewHashTable() {
+    return agg_handle_->createGroupByHashTable(hash_table_impl_type_,
+                                               group_by_types_,
+                                               estimated_num_entries_,
+                                               storage_manager_);
+  }
+
+  AggregationStateHashTableBase* createNewHashTableFast() {
+    return AggregationStateFastHashTableFactory::CreateResizable(
+                hash_table_impl_type_,
+                group_by_types_,
+                estimated_num_entries_,
+                payload_sizes_,
+                handles_,
+                storage_manager_);
+  }
+
+  inline std::size_t setHashTableSize(const std::size_t overall_estimate,
+                                      const std::size_t num_partitions) const {
+    CHECK_NE(num_partitions, 0Lu);
+    return std::min(static_cast<std::size_t>(overall_estimate/num_partitions), 1Lu);
+  }
+
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>> hash_tables_;
+
+  const std::size_t estimated_num_entries_;
+  const std::size_t num_partitions_;
+
+  const HashTableImplType hash_table_impl_type_;
+
+  const std::vector<const Type *> group_by_types_;
+
+  std::vector<std::size_t> payload_sizes_;
+
+  AggregationHandle *agg_handle_;
+  const std::vector<AggregationHandle *> handles_;
+  StorageManager *storage_manager_;
+
+  DISALLOW_COPY_AND_ASSIGN(PartitionedHashTablePool);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_HASH_TABLE_POOL_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0269bf/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index ec5990f..94c46a8 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -41,6 +41,7 @@
 #include "storage/IndexSubBlock.hpp"
 #include "storage/InsertDestinationInterface.hpp"
 #include "storage/PackedRowStoreTupleStorageSubBlock.hpp"
+#include "storage/PartitionedHashTablePool.hpp"
 #include "storage/SMAIndexSubBlock.hpp"
 #include "storage/SplitRowStoreTupleStorageSubBlock.hpp"
 #include "storage/StorageBlockBase.hpp"
@@ -1369,4 +1370,118 @@ const std::size_t StorageBlock::getNumTuples() const {
   return tuple_store_->numTuples();
 }
 
+void StorageBlock::aggregateGroupByPartitioned(
+    const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+    const std::vector<std::unique_ptr<const Scalar>> &group_by,
+    const Predicate *predicate,
+    std::unique_ptr<TupleIdSequence> *reuse_matches,
+    std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors,
+    PartitionedHashTablePool *hashtable_pool) const {
+  DCHECK(!group_by.empty())
+      << "Called aggregateGroupByPartitioned() with zero GROUP BY expressions";
+
+  SubBlocksReference sub_blocks_ref(*tuple_store_,
+                                    indices_,
+                                    indices_consistent_);
+
+  // IDs of 'arguments' as attributes in the ValueAccessor we create below.
+  std::vector<attribute_id> arg_ids;
+  std::vector<attribute_id> argument_ids;
+
+  // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
+  std::vector<attribute_id> key_ids;
+
+  // An intermediate ValueAccessor that stores the materialized 'arguments' for
+  // this aggregate, as well as the GROUP BY expression values.
+  ColumnVectorsValueAccessor temp_result;
+  std::unique_ptr<ValueAccessor> accessor;
+  if (predicate) {
+    if (!*reuse_matches) {
+      // If there is a filter predicate that hasn't already been evaluated,
+      // evaluate it now and save the results for other aggregates on this
+      // same block.
+      reuse_matches->reset(getMatchesForPredicate(predicate));
+    }
+
+    // Create a filtered ValueAccessor that only iterates over predicate
+    // matches.
+    accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
+  } else {
+    // Create a ValueAccessor that iterates over all tuples in this block
+    accessor.reset(tuple_store_->createValueAccessor());
+  }
+
+  attribute_id attr_id = 0;
+
+  // First, put GROUP BY keys into 'temp_result'.
+  if (reuse_group_by_vectors->empty()) {
+    // Compute GROUP BY values from group_by Scalars, and store them in
+    // reuse_group_by_vectors for reuse by other aggregates on this same
+    // block.
+    reuse_group_by_vectors->reserve(group_by.size());
+    for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+      reuse_group_by_vectors->emplace_back(
+          group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
+      temp_result.addColumn(reuse_group_by_vectors->back().get(), false);
+      key_ids.push_back(attr_id++);
+    }
+  } else {
+    // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
+    DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
+        << "Wrong number of reuse_group_by_vectors";
+    for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
+      temp_result.addColumn(reuse_cv.get(), false);
+      key_ids.push_back(attr_id++);
+    }
+  }
+
+  // Compute argument vectors and add them to 'temp_result'.
+  for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) {
+    arg_ids.clear();
+    for (const std::unique_ptr<const Scalar> &args : argument) {
+      temp_result.addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref));
+      argument_ids.push_back(attr_id++);
+    }
+    if (argument.empty()) {
+      argument_ids.push_back(kInvalidAttributeID);
+    }
+  }
+
+  // Compute the partitions for the tuple formed by group by values.
+  std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
+  partition_membership.resize(hashtable_pool->getNumPartitions());
+
+  // Create a tuple-id sequence for each partition.
+  for (std::size_t partition = 0;
+       partition < hashtable_pool->getNumPartitions();
+       ++partition) {
+    partition_membership[partition].reset(new TupleIdSequence(temp_result.getEndPosition()));
+  }
+
+  // Iterate over ValueAccessor for each tuple,
+  // set a bit in the appropriate TupleIdSequence.
+  temp_result.beginIteration();
+  while (temp_result.next()) {
+    const std::size_t curr_tuple_partition_id =
+        temp_result.getTupleWithAttributes(key_ids)->getTupleHash() %
+        hashtable_pool->getNumPartitions();
+    partition_membership[curr_tuple_partition_id]->set(
+        temp_result.getCurrentPosition(), true);
+  }
+  // For each partition, create an adapter around Value Accessor and
+  // TupleIdSequence.
+  std::vector<std::unique_ptr<
+      TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
+  adapter.resize(hashtable_pool->getNumPartitions());
+  for (std::size_t partition = 0;
+       partition < hashtable_pool->getNumPartitions();
+       ++partition) {
+    adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
+        *partition_membership[partition]));
+    hashtable_pool->getHashTable(partition)
+        ->upsertValueAccessorCompositeKeyFast(
+            argument_ids, adapter[partition].get(), key_ids, true);
+  }
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0269bf/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index bab5bab..3fcaddf 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -44,6 +44,7 @@ class AggregationState;
 class CatalogRelationSchema;
 class ColumnVector;
 class InsertDestinationInterface;
+class PartitionedHashTablePool;
 class Predicate;
 class Scalar;
 class StorageBlockLayout;
@@ -466,6 +467,48 @@ class StorageBlock : public StorageBlockBase {
       std::unique_ptr<TupleIdSequence> *reuse_matches,
       std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
 
+
+  /**
+   * @brief Perform the GROUP BY aggregation for the case when aggregation is
+   *        partitioned.
+   *
+   * @note The difference between this method and the aggregateGroupBy method
+   *       is that in this method, the tuples are routed to different HashTables
+   *       based on the partition to which they belong to. The partition is
+   *       determined by the GROUP BY attributes. Right now hash based
+   *       partitioning is performed.
+   *
+   * @param arguments The arguments to the aggregation function as Scalars.
+   * @param group_by The list of GROUP BY attributes/expressions. The tuples in
+   *        this storage block are grouped by these attributes before
+   *        aggregation.
+   * @param predicate A predicate for selection. nullptr indicates that all
+   *        tuples should be aggregated on.
+   * @param reuse_matches This parameter is used to store and reuse tuple-id
+   *        sequence of matches pre-computed in an earlier invocations of
+   *        aggregateGroupBy(). \c reuse_matches is never \c nullptr for ease of
+   *        use.  Current invocation of aggregateGroupBy() will reuse
+   *        TupleIdSequence if passed, otherwise computes a TupleIdSequence based
+   *        on \c predicate and stores in \c reuse_matches. We use
+   *        std::unique_ptr for each of use, since the caller will not have to
+   *        selective free.
+   * @param reuse_group_by_vectors This parameter is used to store and reuse
+   *        GROUP BY attribute vectors pre-computed in an earlier invocation of
+   *        aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr
+   *        for ease of use. Current invocation of aggregateGroupBy() will reuse
+   *        ColumnVectors if non-empty, otherwise computes ColumnVectors based
+   *        on \c group_by and stores them in \c reuse_group_by_vectors.
+   * @param hashtable_pool The pool of aggregation HashTables. Each hash table
+   *        in this pool belongs to a unique partition.
+   **/
+  void aggregateGroupByPartitioned(
+      const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+      const std::vector<std::unique_ptr<const Scalar>> &group_by,
+      const Predicate *predicate,
+      std::unique_ptr<TupleIdSequence> *reuse_matches,
+      std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors,
+      PartitionedHashTablePool *hashtable_pool) const;
+
   /**
    * @brief Inserts the GROUP BY expressions and aggregation arguments together
    *        as keys into the distinctify hash table.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0269bf/types/containers/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/types/containers/CMakeLists.txt b/types/containers/CMakeLists.txt
index aacb63a..c2a6623 100644
--- a/types/containers/CMakeLists.txt
+++ b/types/containers/CMakeLists.txt
@@ -49,6 +49,7 @@ target_link_libraries(quickstep_types_containers_Tuple
                       quickstep_catalog_CatalogTypedefs
                       quickstep_types_TypedValue
                       quickstep_types_containers_Tuple_proto
+                      quickstep_utility_CompositeHash
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_types_containers_Tuple_proto
                       quickstep_types_TypedValue_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0269bf/types/containers/Tuple.hpp
----------------------------------------------------------------------
diff --git a/types/containers/Tuple.hpp b/types/containers/Tuple.hpp
index 60f832c..6237d54 100644
--- a/types/containers/Tuple.hpp
+++ b/types/containers/Tuple.hpp
@@ -28,6 +28,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/Tuple.pb.h"
+#include "utility/CompositeHash.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -218,6 +219,13 @@ class Tuple {
     return attribute_values_.size();
   }
 
+  /**
+   * @brief Get the hash value of the tuple.
+   **/
+  std::size_t getTupleHash() const {
+    return HashCompositeKey(attribute_values_);
+  }
+
  private:
   /**
    * @brief Constructor which does not create any attributes, nor pre-reserve

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0269bf/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index 395e264..e9be2ec 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -169,6 +169,7 @@ add_library(quickstep_utility_BloomFilter_proto
 add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp)
 add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
 add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
+add_library(quickstep_utility_CompositeHash ../empty_src.cpp CompositeHash.hpp)
 add_library(quickstep_utility_DAG ../empty_src.cpp DAG.hpp)
 add_library(quickstep_utility_DisjointTreeForest ../empty_src.cpp DisjointTreeForest.hpp)
 add_library(quickstep_utility_EqualsAnyConstant ../empty_src.cpp EqualsAnyConstant.hpp)
@@ -230,6 +231,10 @@ target_link_libraries(quickstep_utility_CalculateInstalledMemory
                       glog)
 target_link_libraries(quickstep_utility_CheckSnprintf
                       glog)
+target_link_libraries(quickstep_utility_CompositeHash
+                      quickstep_types_TypedValue
+                      quickstep_utility_HashPair
+                      glog)
 target_link_libraries(quickstep_utility_DAG
                       glog
                       quickstep_utility_Macros)
@@ -325,6 +330,7 @@ target_link_libraries(quickstep_utility
                       quickstep_utility_CalculateInstalledMemory
                       quickstep_utility_Cast
                       quickstep_utility_CheckSnprintf
+                      quickstep_utility_CompositeHash
                       quickstep_utility_DAG
                       quickstep_utility_DisjointTreeForest
                       quickstep_utility_EqualsAnyConstant

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0269bf/utility/CompositeHash.hpp
----------------------------------------------------------------------
diff --git a/utility/CompositeHash.hpp b/utility/CompositeHash.hpp
new file mode 100644
index 0000000..517bc96
--- /dev/null
+++ b/utility/CompositeHash.hpp
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_
+#define QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_
+
+#include <cstddef>
+#include <vector>
+
+#include "types/TypedValue.hpp"
+#include "utility/HashPair.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/**
+ * @brief Compute the hash value of a composite key.
+ *
+ * @param key A vector of TypedValues which together form the composite key.
+ * @return The hash value.
+ **/
+static std::size_t HashCompositeKey(const std::vector<TypedValue> &key) {
+  DCHECK(!key.empty());
+  std::size_t hash = key.front().getHash();
+  for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1;
+       key_it != key.end();
+       ++key_it) {
+    hash = CombineHashes(hash, key_it->getHash());
+  }
+  return hash;
+}
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_



[2/7] incubator-quickstep git commit: Optimizer changes for the LIPFilter feature.

Posted by hb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt
index 597dbe0..ac4548a 100644
--- a/query_optimizer/tests/CMakeLists.txt
+++ b/query_optimizer/tests/CMakeLists.txt
@@ -94,22 +94,6 @@ add_executable(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
                ExecutionGeneratorTestRunner.hpp
                "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp"
                "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp")
-add_executable(ExecutionHeuristics_unittest ExecutionHeuristics_unittest.cpp)
-target_link_libraries(ExecutionHeuristics_unittest
-                      gtest
-                      gtest_main
-                      quickstep_catalog_Catalog
-                      quickstep_catalog_CatalogDatabase
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_queryexecution_QueryContext
-                      quickstep_queryexecution_QueryContext_proto
-                      quickstep_queryoptimizer_ExecutionHeuristics
-                      quickstep_queryoptimizer_QueryPlan
-                      quickstep_relationaloperators_BuildHashOperator
-                      quickstep_relationaloperators_HashJoinOperator
-                      quickstep_utility_Macros)
-add_test(ExecutionHeuristics_unittest ExecutionHeuristics_unittest)
-
 add_executable(quickstep_queryoptimizer_tests_OptimizerTextTest
                OptimizerTextTest.cpp
                OptimizerTextTestRunner.cpp

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionHeuristics_unittest.cpp b/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
deleted file mode 100644
index 73b3e84..0000000
--- a/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
+++ /dev/null
@@ -1,311 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include <cstddef>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "catalog/Catalog.hpp"
-#include "catalog/CatalogDatabase.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryContext.pb.h"
-#include "query_optimizer/ExecutionHeuristics.hpp"
-#include "query_optimizer/QueryPlan.hpp"
-#include "relational_operators/BuildHashOperator.hpp"
-#include "relational_operators/HashJoinOperator.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-#include "gtest/gtest.h"
-
-namespace quickstep {
-namespace optimizer {
-
-namespace {
-constexpr std::size_t kQueryId = 0;
-}
-
-class ExecutionHeuristicsTest : public ::testing::Test {
- protected:
-  virtual void SetUp() {
-    db_ = cat_.getDatabaseByIdMutable(cat_.addDatabase(new CatalogDatabase(nullptr, "db")));
-    execution_heuristics_.reset(new ExecutionHeuristics());
-    query_plan_.reset(new QueryPlan());
-    query_context_proto_.reset(new serialization::QueryContext());
-  }
-
-  CatalogRelation* createCatalogRelation(const std::string &name, bool temporary = false) {
-    return db_->getRelationByIdMutable(db_->addRelation(new CatalogRelation(nullptr, name, -1, temporary)));
-  }
-
-  void addDummyHashJoinInfo(ExecutionHeuristics *execution_heuristics,
-                            const QueryPlan::DAGNodeIndex build_operator_index,
-                            const QueryPlan::DAGNodeIndex join_operator_index,
-                            const CatalogRelation *build_relation,
-                            const CatalogRelation *probe_relation,
-                            const attribute_id build_attribute_id,
-                            const attribute_id probe_attribute_id,
-                            const QueryContext::join_hash_table_id join_hash_table_id) {
-    std::vector<attribute_id> build_attribute_ids(1, build_attribute_id);
-    std::vector<attribute_id> probe_attribute_ids(1, probe_attribute_id);
-    execution_heuristics->addHashJoinInfo(build_operator_index,
-                                          join_operator_index,
-                                          build_relation,
-                                          probe_relation,
-                                          std::move(build_attribute_ids),
-                                          std::move(probe_attribute_ids),
-                                          join_hash_table_id);
-  }
-
-  QueryPlan::DAGNodeIndex createDummyBuildHashOperator(QueryPlan *query_plan,
-                                                       const CatalogRelation *build_relation,
-                                                       const attribute_id build_attribute_id,
-                                                       const QueryContext::join_hash_table_id join_hash_table_index) {
-    std::vector<attribute_id> build_attribute_ids;
-    build_attribute_ids.push_back(build_attribute_id);
-    QueryPlan::DAGNodeIndex build_operator_index =
-        query_plan->addRelationalOperator(new BuildHashOperator(kQueryId,
-                                                                *build_relation,
-                                                                true,
-                                                                build_attribute_ids,
-                                                                false,
-                                                                join_hash_table_index));
-    return build_operator_index;
-  }
-
-  QueryPlan::DAGNodeIndex createDummyHashJoinOperator(QueryPlan *query_plan,
-                                                      const CatalogRelation *build_relation,
-                                                      const CatalogRelation *probe_relation,
-                                                      const attribute_id probe_attribute_id,
-                                                      const QueryContext::join_hash_table_id join_hash_table_index) {
-    std::vector<attribute_id> probe_attribute_ids;
-    probe_attribute_ids.push_back(probe_attribute_id);
-    QueryPlan::DAGNodeIndex join_operator_index =
-        query_plan->addRelationalOperator(
-            new HashJoinOperator(kQueryId,
-                                 *build_relation,
-                                 *probe_relation,
-                                 true,
-                                 probe_attribute_ids,
-                                 false,
-                                 *probe_relation,
-                                 0,
-                                 join_hash_table_index,
-                                 0,
-                                 0));
-    return join_operator_index;
-  }
-
-  Catalog cat_;
-  CatalogDatabase *db_;  // db_ is owned by cat_.
-  std::unique_ptr<QueryPlan> query_plan_;
-  std::unique_ptr<serialization::QueryContext> query_context_proto_;
-  std::unique_ptr<ExecutionHeuristics> execution_heuristics_;
-};
-
-TEST_F(ExecutionHeuristicsTest, HashJoinOptimizedTest) {
-  // This test case creates three hash joins, all of which are being probed on the same relation.
-  // Since the probe are being made on the same relation, ExecutionHeuristics should optimize
-  // these hash joins using bloom filters.
-
-  const CatalogRelation *build_relation_1 = createCatalogRelation("build_relation_1");
-  const CatalogRelation *build_relation_2 = createCatalogRelation("build_relation_2");
-  const CatalogRelation *build_relation_3 = createCatalogRelation("build_relation_3");
-  const CatalogRelation *probe_relation_1 = createCatalogRelation("probe_relation_1");
-
-  const attribute_id build_attribute_id_1 = 0;
-  const attribute_id build_attribute_id_2 = 0;
-  const attribute_id build_attribute_id_3 = 0;
-  const attribute_id probe_attribute_id_1 = 1;
-  const attribute_id probe_attribute_id_2 = 2;
-  const attribute_id probe_attribute_id_3 = 3;
-
-  const QueryContext::join_hash_table_id join_hash_table_index_1 = 0;
-  const QueryContext::join_hash_table_id join_hash_table_index_2 = 1;
-  const QueryContext::join_hash_table_id join_hash_table_index_3 = 2;
-  query_context_proto_->add_join_hash_tables();
-  query_context_proto_->add_join_hash_tables();
-  query_context_proto_->add_join_hash_tables();
-
-  const QueryPlan::DAGNodeIndex build_operator_index_1 = createDummyBuildHashOperator(query_plan_.get(),
-                                                                                      build_relation_1,
-                                                                                      build_attribute_id_1,
-                                                                                      join_hash_table_index_1);
-  const QueryPlan::DAGNodeIndex probe_operator_index_1 = createDummyHashJoinOperator(query_plan_.get(),
-                                                                                     build_relation_1,
-                                                                                     probe_relation_1,
-                                                                                     probe_attribute_id_1,
-                                                                                     join_hash_table_index_1);
-  const QueryPlan::DAGNodeIndex build_operator_index_2 = createDummyBuildHashOperator(query_plan_.get(),
-                                                                                      build_relation_2,
-                                                                                      build_attribute_id_2,
-                                                                                      join_hash_table_index_2);
-  const QueryPlan::DAGNodeIndex probe_operator_index_2 = createDummyHashJoinOperator(query_plan_.get(),
-                                                                                     build_relation_2,
-                                                                                     probe_relation_1,
-                                                                                     probe_attribute_id_2,
-                                                                                     join_hash_table_index_2);
-  const QueryPlan::DAGNodeIndex build_operator_index_3 = createDummyBuildHashOperator(query_plan_.get(),
-                                                                                      build_relation_3,
-                                                                                      build_attribute_id_3,
-                                                                                      join_hash_table_index_3);
-  const QueryPlan::DAGNodeIndex probe_operator_index_3 = createDummyHashJoinOperator(query_plan_.get(),
-                                                                                     build_relation_3,
-                                                                                     probe_relation_1,
-                                                                                     probe_attribute_id_3,
-                                                                                     join_hash_table_index_3);
-
-  addDummyHashJoinInfo(execution_heuristics_.get(),
-                       build_operator_index_1,
-                       probe_operator_index_1,
-                       build_relation_1,
-                       probe_relation_1,
-                       build_attribute_id_1,
-                       probe_attribute_id_1,
-                       join_hash_table_index_1);
-  addDummyHashJoinInfo(execution_heuristics_.get(),
-                       build_operator_index_2,
-                       probe_operator_index_2,
-                       build_relation_2,
-                       probe_relation_1,
-                       build_attribute_id_2,
-                       probe_attribute_id_2,
-                       join_hash_table_index_2);
-  addDummyHashJoinInfo(execution_heuristics_.get(),
-                       build_operator_index_3,
-                       probe_operator_index_3,
-                       build_relation_3,
-                       probe_relation_1,
-                       build_attribute_id_3,
-                       probe_attribute_id_3,
-                       join_hash_table_index_3);
-
-  execution_heuristics_->optimizeExecutionPlan(query_plan_.get(), query_context_proto_.get());
-
-  // Test whether correct number of bloom filters were added.
-  EXPECT_EQ(1, query_context_proto_->join_hash_tables(0).build_side_bloom_filter_id_size());
-  EXPECT_EQ(1, query_context_proto_->join_hash_tables(1).build_side_bloom_filter_id_size());
-  EXPECT_EQ(1, query_context_proto_->join_hash_tables(2).build_side_bloom_filter_id_size());
-  EXPECT_EQ(3, query_context_proto_->join_hash_tables(0).probe_side_bloom_filters_size());
-
-  // Test that the DAG was modified correctly or not.
-  // Probe operator 1 should have now build operator 1 and build operator 2 added as dependencies.
-  auto const probe_node_dependencies = query_plan_->getQueryPlanDAG().getDependencies(probe_operator_index_1);
-  EXPECT_EQ(1u, probe_node_dependencies.count(build_operator_index_2));
-  EXPECT_EQ(1u, probe_node_dependencies.count(build_operator_index_3));
-}
-
-TEST_F(ExecutionHeuristicsTest, HashJoinNotOptimizedTest) {
-  // This test case creates three hash joins, all of which are being probed on different relations.
-  // Since the probe are being made on the different relations, ExecutionHeuristics should optimize
-  // these hash joins using bloom filters.
-
-  const CatalogRelation *build_relation_1 = createCatalogRelation("build_relation_1");
-  const CatalogRelation *build_relation_2 = createCatalogRelation("build_relation_2");
-  const CatalogRelation *build_relation_3 = createCatalogRelation("build_relation_3");
-  const CatalogRelation *probe_relation_1 = createCatalogRelation("probe_relation_1");
-  const CatalogRelation *probe_relation_2 = createCatalogRelation("probe_relation_2");
-  const CatalogRelation *probe_relation_3 = createCatalogRelation("probe_relation_3");
-
-  const attribute_id build_attribute_id_1 = 0;
-  const attribute_id build_attribute_id_2 = 0;
-  const attribute_id build_attribute_id_3 = 0;
-  const attribute_id probe_attribute_id_1 = 1;
-  const attribute_id probe_attribute_id_2 = 2;
-  const attribute_id probe_attribute_id_3 = 3;
-
-  const QueryContext::join_hash_table_id join_hash_table_index_1 = 0;
-  const QueryContext::join_hash_table_id join_hash_table_index_2 = 1;
-  const QueryContext::join_hash_table_id join_hash_table_index_3 = 2;
-  query_context_proto_->add_join_hash_tables();
-  query_context_proto_->add_join_hash_tables();
-  query_context_proto_->add_join_hash_tables();
-
-  const QueryPlan::DAGNodeIndex build_operator_index_1 = createDummyBuildHashOperator(query_plan_.get(),
-                                                                                      build_relation_1,
-                                                                                      build_attribute_id_1,
-                                                                                      join_hash_table_index_1);
-  const QueryPlan::DAGNodeIndex probe_operator_index_1 = createDummyHashJoinOperator(query_plan_.get(),
-                                                                                     build_relation_1,
-                                                                                     probe_relation_1,
-                                                                                     probe_attribute_id_1,
-                                                                                     join_hash_table_index_1);
-  const QueryPlan::DAGNodeIndex build_operator_index_2 = createDummyBuildHashOperator(query_plan_.get(),
-                                                                                      build_relation_2,
-                                                                                      build_attribute_id_2,
-                                                                                      join_hash_table_index_2);
-  const QueryPlan::DAGNodeIndex probe_operator_index_2 = createDummyHashJoinOperator(query_plan_.get(),
-                                                                                     build_relation_2,
-                                                                                     probe_relation_2,
-                                                                                     probe_attribute_id_2,
-                                                                                     join_hash_table_index_2);
-  const QueryPlan::DAGNodeIndex build_operator_index_3 = createDummyBuildHashOperator(query_plan_.get(),
-                                                                                      build_relation_3,
-                                                                                      build_attribute_id_3,
-                                                                                      join_hash_table_index_3);
-  const QueryPlan::DAGNodeIndex probe_operator_index_3 = createDummyHashJoinOperator(query_plan_.get(),
-                                                                                     build_relation_3,
-                                                                                     probe_relation_3,
-                                                                                     probe_attribute_id_3,
-                                                                                     join_hash_table_index_3);
-
-  addDummyHashJoinInfo(execution_heuristics_.get(),
-                       build_operator_index_1,
-                       probe_operator_index_1,
-                       build_relation_1,
-                       probe_relation_1,
-                       build_attribute_id_1,
-                       probe_attribute_id_1,
-                       join_hash_table_index_1);
-  addDummyHashJoinInfo(execution_heuristics_.get(),
-                       build_operator_index_2,
-                       probe_operator_index_2,
-                       build_relation_2,
-                       probe_relation_2,
-                       build_attribute_id_2,
-                       probe_attribute_id_2,
-                       join_hash_table_index_2);
-  addDummyHashJoinInfo(execution_heuristics_.get(),
-                       build_operator_index_3,
-                       probe_operator_index_3,
-                       build_relation_3,
-                       probe_relation_3,
-                       build_attribute_id_3,
-                       probe_attribute_id_3,
-                       join_hash_table_index_3);
-
-  execution_heuristics_->optimizeExecutionPlan(query_plan_.get(), query_context_proto_.get());
-
-  // Test that no bloom filters were added.
-  EXPECT_EQ(0, query_context_proto_->join_hash_tables(0).build_side_bloom_filter_id_size());
-  EXPECT_EQ(0, query_context_proto_->join_hash_tables(1).build_side_bloom_filter_id_size());
-  EXPECT_EQ(0, query_context_proto_->join_hash_tables(2).build_side_bloom_filter_id_size());
-  EXPECT_EQ(0, query_context_proto_->join_hash_tables(0).probe_side_bloom_filters_size());
-
-  // Test that the DAG was not modified at all.
-  // Probe operator 1 should not have build operator 1 and build operator 2 added as dependencies.
-  auto probe_node_dependencies = query_plan_->getQueryPlanDAG().getDependencies(probe_operator_index_1);
-  EXPECT_EQ(0u, probe_node_dependencies.count(build_operator_index_2));
-  EXPECT_EQ(0u, probe_node_dependencies.count(build_operator_index_3));
-}
-
-}  // namespace optimizer
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index ddaae45..395e264 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -156,6 +156,8 @@ QS_PROTOBUF_GENERATE_CPP(quickstep_utility_SortConfiguration_proto_srcs
                          quickstep_utility_SortConfiguration_proto_hdrs
                          SortConfiguration.proto)
 
+add_subdirectory(lip_filter)
+
 # Declare micro-libs:
 add_library(quickstep_utility_Alignment ../empty_src.cpp Alignment.hpp)
 add_library(quickstep_utility_BitManipulation ../empty_src.cpp BitManipulation.hpp)
@@ -168,6 +170,7 @@ add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.
 add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
 add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
 add_library(quickstep_utility_DAG ../empty_src.cpp DAG.hpp)
+add_library(quickstep_utility_DisjointTreeForest ../empty_src.cpp DisjointTreeForest.hpp)
 add_library(quickstep_utility_EqualsAnyConstant ../empty_src.cpp EqualsAnyConstant.hpp)
 add_library(quickstep_utility_ExecutionDAGVisualizer
             ExecutionDAGVisualizer.cpp
@@ -230,6 +233,8 @@ target_link_libraries(quickstep_utility_CheckSnprintf
 target_link_libraries(quickstep_utility_DAG
                       glog
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_DisjointTreeForest
+                      glog)
 target_link_libraries(quickstep_utility_ExecutionDAGVisualizer
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_queryexecution_QueryExecutionTypedefs
@@ -253,7 +258,9 @@ target_link_libraries(quickstep_utility_PlanVisualizer
                       quickstep_catalog_CatalogRelation
                       quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
                       quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExprId
                       quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_LIPFilterConfiguration
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_queryoptimizer_physical_TableReference
@@ -319,6 +326,7 @@ target_link_libraries(quickstep_utility
                       quickstep_utility_Cast
                       quickstep_utility_CheckSnprintf
                       quickstep_utility_DAG
+                      quickstep_utility_DisjointTreeForest
                       quickstep_utility_EqualsAnyConstant
                       quickstep_utility_ExecutionDAGVisualizer
                       quickstep_utility_Glob
@@ -375,6 +383,13 @@ target_link_libraries(DAG_unittest
                       ${LIBS})
 add_test(DAG_unittest DAG_unittest)
 
+add_executable(DisjointTreeForest_unittest "${CMAKE_CURRENT_SOURCE_DIR}/tests/DisjointTreeForest_unittest.cpp")
+target_link_libraries(DisjointTreeForest_unittest
+                      gtest
+                      gtest_main
+                      quickstep_utility_DisjointTreeForest)
+add_test(DisjointTreeForest_unittest DisjointTreeForest_unittest)
+
 add_executable(EqualsAnyConstant_unittest "${CMAKE_CURRENT_SOURCE_DIR}/tests/EqualsAnyConstant_unittest.cpp")
 target_link_libraries(EqualsAnyConstant_unittest
                       gtest

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/utility/DisjointTreeForest.hpp
----------------------------------------------------------------------
diff --git a/utility/DisjointTreeForest.hpp b/utility/DisjointTreeForest.hpp
new file mode 100644
index 0000000..971ba10
--- /dev/null
+++ b/utility/DisjointTreeForest.hpp
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_DISJOINT_TREE_FOREST_HPP_
+#define QUICKSTEP_UTILITY_DISJOINT_TREE_FOREST_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_map>
+#include <vector>
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+/**
+ * @brief Disjoint sets implemented with tree data structures so that the
+ *        union/find operations have nearly O(1) time complexity.
+ */
+template <typename ElementT,
+          class MapperT = std::unordered_map<ElementT, std::size_t>>
+class DisjointTreeForest {
+ public:
+  /**
+   * @brief Whether the given element is in a subset.
+   *
+   * @param element The element.
+   * @return True if the element is in a subset.
+   */
+  bool hasElement(const ElementT &element) const {
+    return elements_map_.find(element) != elements_map_.end();
+  }
+
+  /**
+   * @brief If the given element is not in any subset yet, make a singleton
+   *        subset for it. Otherwise do nothing.
+   *
+   * @param element The element.
+   */
+  void makeSet(const ElementT &element) {
+    if (!hasElement(element)) {
+      std::size_t loc = nodes_.size();
+      nodes_.emplace_back(0, loc);
+      elements_map_.emplace(element, loc);
+    }
+  }
+
+  /**
+   * @brief Find the subset id for the given element.
+   *
+   * @param element The element.
+   */
+  std::size_t find(const ElementT &element) {
+    DCHECK(hasElement(element));
+
+    const std::size_t node_id = elements_map_.at(element);
+    std::size_t root_id = node_id;
+    std::size_t parent_id;
+    while ((parent_id = nodes_[root_id].parent) != root_id) {
+      root_id = parent_id;
+    }
+    compress_path(node_id, root_id);
+    return root_id;
+  }
+
+  /**
+   * @brief Union the two subsets that the two given elements belong to.
+   *
+   * @param element1 The first element.
+   * @param element2 The second element.
+   */
+  void merge(const ElementT &element1, const ElementT &element2) {
+    std::size_t root_id1 = find(element1);
+    std::size_t root_id2 = find(element2);
+    if (root_id1 != root_id2) {
+      Node &n1 = nodes_[root_id1];
+      Node &n2 = nodes_[root_id2];
+      if (n1.rank > n2.rank) {
+        n2.parent = root_id1;
+      } else if (n1.rank < n2.rank) {
+        n1.parent = root_id2;
+      } else {
+        n1.parent = root_id2;
+        n2.rank += 1;
+      }
+    }
+  }
+
+  /**
+   * @brief Whether the two given elements are in the same subset.
+   *
+   * @param element1 The first element.
+   * @param element2 The second element.
+   * @return True if the twos elements are in the same subset, false otherwise.
+   */
+  bool isConnected(const ElementT &element1, const ElementT &element2) {
+    return find(element1) == find(element2);
+  }
+
+ private:
+  struct Node {
+    Node(const std::size_t rank_in, const std::size_t parent_in)
+        : rank(rank_in), parent(parent_in) {
+    }
+    std::size_t rank;
+    std::size_t parent;
+  };
+
+  inline void compress_path(const std::size_t leaf_node_id,
+                            const std::size_t root_node_id) {
+    std::size_t node_id = leaf_node_id;
+    std::size_t max_rank = 0;
+    while (node_id != root_node_id) {
+      const Node &node = nodes_[node_id];
+      max_rank = std::max(max_rank, node.rank);
+
+      const std::size_t parent_id = node.parent;
+      nodes_[node_id].parent = root_node_id;
+      node_id = parent_id;
+    }
+    nodes_[root_node_id].rank = max_rank + 1;
+  }
+
+  std::vector<Node> nodes_;
+  MapperT elements_map_;
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_DISJOINT_TREE_FOREST_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/utility/PlanVisualizer.cpp
----------------------------------------------------------------------
diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp
index 50cf7f0..2adf674 100644
--- a/utility/PlanVisualizer.cpp
+++ b/utility/PlanVisualizer.cpp
@@ -21,15 +21,16 @@
 
 #include <cstddef>
 #include <memory>
+#include <set>
 #include <sstream>
 #include <string>
 #include <unordered_map>
 #include <vector>
 
 #include "catalog/CatalogRelation.hpp"
-
 #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
@@ -47,9 +48,12 @@ namespace C = ::quickstep::optimizer::cost;
 
 std::string PlanVisualizer::visualize(const P::PhysicalPtr &input) {
   DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+  const P::TopLevelPlanPtr top_level_plan =
+      std::static_pointer_cast<const P::TopLevelPlan>(input);
   cost_model_.reset(
       new C::StarSchemaSimpleCostModel(
-          std::static_pointer_cast<const P::TopLevelPlan>(input)->shared_subplans()));
+          top_level_plan->shared_subplans()));
+  lip_filter_conf_ = top_level_plan->lip_filter_configuration();
 
   color_map_["TableReference"] = "skyblue";
   color_map_["Selection"] = "#90EE90";
@@ -86,6 +90,9 @@ std::string PlanVisualizer::visualize(const P::PhysicalPtr &input) {
   for (const EdgeInfo &edge_info : edges_) {
     graph_oss << "  " << edge_info.src_node_id << " -> "
               << edge_info.dst_node_id << " [";
+    if (edge_info.dashed) {
+      graph_oss << "style=dashed ";
+    }
     if (!edge_info.labels.empty()) {
       graph_oss << "label=\""
                 << EscapeSpecialChars(JoinToString(edge_info.labels, "&#10;"))
@@ -103,6 +110,10 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
   int node_id = ++id_counter_;
   node_id_map_.emplace(input, node_id);
 
+  std::set<E::ExprId> referenced_ids;
+  for (const auto &attr : input->getReferencedAttributes()) {
+    referenced_ids.emplace(attr->id());
+  }
   for (const auto &child : input->children()) {
     visit(child);
 
@@ -112,12 +123,18 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
     EdgeInfo &edge_info = edges_.back();
     edge_info.src_node_id = child_id;
     edge_info.dst_node_id = node_id;
+    edge_info.dashed = false;
 
-    // Print output attributes except for TableReference -- there are just too many
-    // attributes out of TableReference.
-    if (child->getPhysicalType() != P::PhysicalType::kTableReference) {
-      for (const auto &attr : child->getOutputAttributes()) {
-        edge_info.labels.emplace_back(attr->attribute_alias());
+    if (input->getPhysicalType() == P::PhysicalType::kHashJoin &&
+        child == input->children()[1]) {
+      edge_info.dashed = true;
+    }
+
+    for (const auto &attr : child->getOutputAttributes()) {
+      if (referenced_ids.find(attr->id()) != referenced_ids.end()) {
+        edge_info.labels.emplace_back(
+            attr->attribute_alias() + ", est # distinct = " +
+            std::to_string(cost_model_->estimateNumDistinctValues(attr->id(), child)));
       }
     }
   }
@@ -154,6 +171,26 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
       break;
     }
   }
+
+  if (lip_filter_conf_ != nullptr) {
+    const auto &build_filters = lip_filter_conf_->getBuildInfoMap();
+    const auto build_it = build_filters.find(input);
+    if (build_it != build_filters.end()) {
+      for (const auto &build_info : build_it->second) {
+        node_info.labels.emplace_back(
+            std::string("[LIP build] ") + build_info.build_attribute->attribute_alias());
+      }
+    }
+    const auto &probe_filters = lip_filter_conf_->getProbeInfoMap();
+    const auto probe_it = probe_filters.find(input);
+    if (probe_it != probe_filters.end()) {
+      for (const auto &probe_info : probe_it->second) {
+        node_info.labels.emplace_back(
+            std::string("[LIP probe] ") + probe_info.probe_attribute->attribute_alias());
+      }
+    }
+  }
+
   node_info.labels.emplace_back(
       "est. # = " + std::to_string(cost_model_->estimateCardinality(input)));
   node_info.labels.emplace_back(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/utility/PlanVisualizer.hpp
----------------------------------------------------------------------
diff --git a/utility/PlanVisualizer.hpp b/utility/PlanVisualizer.hpp
index 1c0df77..9b8b0db 100644
--- a/utility/PlanVisualizer.hpp
+++ b/utility/PlanVisualizer.hpp
@@ -26,6 +26,7 @@
 #include <vector>
 
 #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "utility/Macros.hpp"
 
@@ -73,6 +74,7 @@ class PlanVisualizer {
     int src_node_id;
     int dst_node_id;
     std::vector<std::string> labels;
+    bool dashed;
   };
 
   void visit(const optimizer::physical::PhysicalPtr &input);
@@ -85,6 +87,7 @@ class PlanVisualizer {
   std::vector<EdgeInfo> edges_;
 
   std::unique_ptr<optimizer::cost::StarSchemaSimpleCostModel> cost_model_;
+  optimizer::physical::LIPFilterConfigurationPtr lip_filter_conf_;
 
   DISALLOW_COPY_AND_ASSIGN(PlanVisualizer);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
new file mode 100644
index 0000000..2232abe
--- /dev/null
+++ b/utility/lip_filter/CMakeLists.txt
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Declare micro-libs:
+add_library(quickstep_utility_lipfilter_LIPFilter ../../empty_src.cpp LIPFilter.hpp)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/utility/lip_filter/LIPFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.hpp b/utility/lip_filter/LIPFilter.hpp
new file mode 100644
index 0000000..33165ed
--- /dev/null
+++ b/utility/lip_filter/LIPFilter.hpp
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+enum class LIPFilterType {
+  kBloomFilter,
+  kExactFilter,
+  kSingleIdentityHashFilter
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/utility/tests/DisjointTreeForest_unittest.cpp
----------------------------------------------------------------------
diff --git a/utility/tests/DisjointTreeForest_unittest.cpp b/utility/tests/DisjointTreeForest_unittest.cpp
new file mode 100644
index 0000000..2e12fad
--- /dev/null
+++ b/utility/tests/DisjointTreeForest_unittest.cpp
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "utility/DisjointTreeForest.hpp"
+
+#include <cstddef>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+namespace quickstep {
+
+TEST(DisjointTreeForestTest, IntTest) {
+  DisjointTreeForest<int> forest;
+  for (int i = 10; i < 20; ++i) {
+    forest.makeSet(i);
+  }
+
+  for (int i = 10; i < 20; i += 2) {
+    EXPECT_NE(forest.find(i), forest.find(i+1));
+    EXPECT_FALSE(forest.isConnected(i, i+1));
+
+    forest.merge(i, i+1);
+    EXPECT_EQ(forest.find(i), forest.find(i+1));
+    EXPECT_TRUE(forest.isConnected(i, i+1));
+
+    forest.merge(i+1, i);
+    EXPECT_EQ(forest.find(i), forest.find(i+1));
+    EXPECT_TRUE(forest.isConnected(i, i+1));
+  }
+
+  for (int i = 12; i < 20; i += 2) {
+    EXPECT_NE(forest.find(i), forest.find(i-1));
+    EXPECT_FALSE(forest.isConnected(i, i-1));
+  }
+
+  forest.merge(10, 17);
+  forest.merge(11, 18);
+  EXPECT_EQ(forest.find(11), forest.find(16));
+  EXPECT_EQ(forest.find(10), forest.find(19));
+  EXPECT_NE(forest.find(10), forest.find(12));
+  EXPECT_NE(forest.find(15), forest.find(17));
+
+  forest.merge(12, 14);
+  forest.merge(15, 16);
+  const std::size_t id = forest.find(10);
+  for (int i = 10; i < 20; ++i) {
+    EXPECT_EQ(forest.find(i), id);
+  }
+}
+
+TEST(DisjointTreeForestTest, StringTest) {
+  DisjointTreeForest<std::string> forest;
+  const std::vector<std::string> elements = { "aaa", "bbb", "ccc", "ddd" };
+  for (const std::string &element : elements) {
+    forest.makeSet(element);
+  }
+
+  EXPECT_NE(forest.find("aaa"), forest.find("bbb"));
+  forest.merge("aaa", "bbb");
+  EXPECT_EQ(forest.find("aaa"), forest.find("bbb"));
+
+  EXPECT_NE(forest.find("ccc"), forest.find("ddd"));
+  forest.merge("ccc", "ddd");
+  EXPECT_EQ(forest.find("ccc"), forest.find("ddd"));
+
+  EXPECT_NE(forest.find("aaa"), forest.find("ccc"));
+  EXPECT_NE(forest.find("aaa"), forest.find("ddd"));
+  EXPECT_NE(forest.find("bbb"), forest.find("ccc"));
+  EXPECT_NE(forest.find("bbb"), forest.find("ddd"));
+
+  forest.merge("aaa", "ddd");
+  for (const std::string &e1 : elements) {
+    for (const std::string &e2 : elements) {
+      EXPECT_EQ(forest.find(e1), forest.find(e2));
+      EXPECT_TRUE(forest.isConnected(e1, e2));
+    }
+  }
+}
+
+}  // namespace quickstep



[4/7] incubator-quickstep git commit: Use SimpleCostModel for estimating JoinHashTable size

Posted by hb...@apache.org.
Use SimpleCostModel for estimating JoinHashTable size


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2ee5c1cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2ee5c1cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2ee5c1cc

Branch: refs/heads/partitioned-aggregate-new
Commit: 2ee5c1cc656ef71c99714380b1a78bbd16a74925
Parents: 7a46443
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Tue Oct 18 11:07:51 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Tue Oct 18 11:36:55 2016 -0500

----------------------------------------------------------------------
 query_optimizer/CMakeLists.txt                 |  1 +
 query_optimizer/ExecutionGenerator.cpp         | 10 +++++++---
 query_optimizer/ExecutionGenerator.hpp         |  9 +++++++--
 query_optimizer/cost_model/CMakeLists.txt      |  1 +
 query_optimizer/cost_model/SimpleCostModel.cpp | 11 +++++++++++
 query_optimizer/cost_model/SimpleCostModel.hpp |  6 ++++++
 6 files changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ee5c1cc/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index fa9141c..8333d4b 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -76,6 +76,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_queryoptimizer_QueryPlan
                       quickstep_queryoptimizer_costmodel_CostModel
+                      quickstep_queryoptimizer_costmodel_SimpleCostModel
                       quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
                       quickstep_queryoptimizer_expressions_AggregateFunction
                       quickstep_queryoptimizer_expressions_Alias

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ee5c1cc/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 09ef9e0..5a701b7 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -57,6 +57,7 @@
 #include "query_optimizer/OptimizerContext.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryPlan.hpp"
+#include "query_optimizer/cost_model/SimpleCostModel.hpp"
 #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
 #include "query_optimizer/expressions/AggregateFunction.hpp"
 #include "query_optimizer/expressions/Alias.hpp"
@@ -165,8 +166,10 @@ void ExecutionGenerator::generatePlan(const P::PhysicalPtr &physical_plan) {
   CHECK(P::SomeTopLevelPlan::MatchesWithConditionalCast(physical_plan, &top_level_physical_plan_))
       << "The physical plan must be rooted by a TopLevelPlan";
 
-  cost_model_.reset(
+  cost_model_for_aggregation_.reset(
       new cost::StarSchemaSimpleCostModel(top_level_physical_plan_->shared_subplans()));
+  cost_model_for_hash_join_.reset(
+      new cost::SimpleCostModel(top_level_physical_plan_->shared_subplans()));
 
   const CatalogRelation *result_relation = nullptr;
 
@@ -594,7 +597,8 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   std::vector<attribute_id> probe_attribute_ids;
   std::vector<attribute_id> build_attribute_ids;
 
-  std::size_t build_cardinality = cost_model_->estimateCardinality(build_physical);
+  std::size_t build_cardinality =
+      cost_model_for_hash_join_->estimateCardinality(build_physical);
 
   bool any_probe_attributes_nullable = false;
   bool any_build_attributes_nullable = false;
@@ -1363,7 +1367,7 @@ void ExecutionGenerator::convertAggregate(
   }
 
   const std::size_t estimated_num_groups =
-      cost_model_->estimateNumGroupsForAggregate(physical_plan);
+      cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
   aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
 
   const QueryPlan::DAGNodeIndex aggregation_operator_index =

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ee5c1cc/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index 495955e..b7d8ef9 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -416,9 +416,14 @@ class ExecutionGenerator {
   std::vector<CatalogRelationInfo> temporary_relation_info_vec_;
 
   /**
-   * @brief The cost model to use for creating the execution plan.
+   * @brief The cost model to use for estimating aggregation hash table size.
    */
-  std::unique_ptr<cost::CostModel> cost_model_;
+  std::unique_ptr<cost::CostModel> cost_model_for_aggregation_;
+
+  /**
+   * @brief The cost model to use for estimating join hash table size.
+   */
+  std::unique_ptr<cost::CostModel> cost_model_for_hash_join_;
 
   physical::TopLevelPlanPtr top_level_physical_plan_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ee5c1cc/query_optimizer/cost_model/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/CMakeLists.txt b/query_optimizer/cost_model/CMakeLists.txt
index ba99de3..d616696 100644
--- a/query_optimizer/cost_model/CMakeLists.txt
+++ b/query_optimizer/cost_model/CMakeLists.txt
@@ -38,6 +38,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_SimpleCostModel
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_queryoptimizer_physical_Selection
                       quickstep_queryoptimizer_physical_SharedSubplanReference
+                      quickstep_queryoptimizer_physical_Sort
                       quickstep_queryoptimizer_physical_TableGenerator
                       quickstep_queryoptimizer_physical_TableReference
                       quickstep_queryoptimizer_physical_TopLevelPlan

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ee5c1cc/query_optimizer/cost_model/SimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.cpp b/query_optimizer/cost_model/SimpleCostModel.cpp
index f313c90..74b62d6 100644
--- a/query_optimizer/cost_model/SimpleCostModel.cpp
+++ b/query_optimizer/cost_model/SimpleCostModel.cpp
@@ -30,6 +30,7 @@
 #include "query_optimizer/physical/PhysicalType.hpp"
 #include "query_optimizer/physical/Selection.hpp"
 #include "query_optimizer/physical/SharedSubplanReference.hpp"
+#include "query_optimizer/physical/Sort.hpp"
 #include "query_optimizer/physical/TableGenerator.hpp"
 #include "query_optimizer/physical/TableReference.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
@@ -73,6 +74,9 @@ std::size_t SimpleCostModel::estimateCardinality(
       return estimateCardinality(
           shared_subplans_[shared_subplan_reference->subplan_id()]);
     }
+    case P::PhysicalType::kSort:
+      return estimateCardinalityForSort(
+          std::static_pointer_cast<const P::Sort>(physical_plan));
     case P::PhysicalType::kWindowAggregate:
       return estimateCardinalityForWindowAggregate(
           std::static_pointer_cast<const P::WindowAggregate>(physical_plan));
@@ -96,6 +100,13 @@ std::size_t SimpleCostModel::estimateCardinalityForSelection(
   return estimateCardinality(physical_plan->input());
 }
 
+std::size_t SimpleCostModel::estimateCardinalityForSort(
+    const physical::SortPtr &physical_plan) {
+  std::size_t cardinality = estimateCardinality(
+      std::static_pointer_cast<const P::Sort>(physical_plan)->input());
+  return std::min(cardinality, static_cast<std::size_t>(physical_plan->limit()));
+}
+
 std::size_t SimpleCostModel::estimateCardinalityForTableGenerator(
     const P::TableGeneratorPtr &physical_plan) {
   return physical_plan->generator_function_handle()->getEstimatedCardinality();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ee5c1cc/query_optimizer/cost_model/SimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.hpp b/query_optimizer/cost_model/SimpleCostModel.hpp
index 25ff8fe..16366cd 100644
--- a/query_optimizer/cost_model/SimpleCostModel.hpp
+++ b/query_optimizer/cost_model/SimpleCostModel.hpp
@@ -29,6 +29,7 @@
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/Sort.hpp"
 #include "query_optimizer/physical/TableGenerator.hpp"
 #include "query_optimizer/physical/TableReference.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
@@ -74,6 +75,11 @@ class SimpleCostModel : public CostModel {
   std::size_t estimateCardinalityForTableGenerator(
       const physical::TableGeneratorPtr &physical_plan);
 
+  // Returns the estimated cardinality as K if there is a LIMIT K clause,
+  // otherwise returns the estimated cardinality of the input plan.
+  std::size_t estimateCardinalityForSort(
+      const physical::SortPtr &physical_plan);
+
   // Returns the larger value of the estimated cardinalities of two
   // input plans.
   std::size_t estimateCardinalityForHashJoin(


[3/7] incubator-quickstep git commit: Optimizer changes for the LIPFilter feature.

Posted by hb...@apache.org.
Optimizer changes for the LIPFilter feature.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7a464434
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7a464434
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7a464434

Branch: refs/heads/partitioned-aggregate-new
Commit: 7a46443491b1c25af3d7aaf738d6e9b096ed52d0
Parents: 160276c
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Wed Sep 7 13:20:43 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Oct 18 11:26:02 2016 -0500

----------------------------------------------------------------------
 query_optimizer/CMakeLists.txt                  |  12 +-
 query_optimizer/ExecutionGenerator.cpp          |  49 ---
 query_optimizer/ExecutionGenerator.hpp          |   5 +-
 query_optimizer/ExecutionHeuristics.cpp         | 129 --------
 query_optimizer/ExecutionHeuristics.hpp         | 157 ----------
 query_optimizer/PhysicalGenerator.cpp           |  18 +-
 .../cost_model/StarSchemaSimpleCostModel.cpp    |   2 +-
 query_optimizer/physical/CMakeLists.txt         |   7 +
 .../physical/LIPFilterConfiguration.hpp         | 171 ++++++++++
 query_optimizer/physical/TopLevelPlan.hpp       |  43 ++-
 query_optimizer/rules/AttachLIPFilters.cpp      | 248 +++++++++++++++
 query_optimizer/rules/AttachLIPFilters.hpp      | 151 +++++++++
 query_optimizer/rules/CMakeLists.txt            |  19 ++
 .../StarSchemaHashJoinOrderOptimization.cpp     | 273 ++++++++++------
 .../StarSchemaHashJoinOrderOptimization.hpp     | 118 +++++--
 query_optimizer/tests/CMakeLists.txt            |  16 -
 .../tests/ExecutionHeuristics_unittest.cpp      | 311 -------------------
 utility/CMakeLists.txt                          |  15 +
 utility/DisjointTreeForest.hpp                  | 152 +++++++++
 utility/PlanVisualizer.cpp                      |  51 ++-
 utility/PlanVisualizer.hpp                      |   3 +
 utility/lip_filter/CMakeLists.txt               |  19 ++
 utility/lip_filter/LIPFilter.hpp                |  39 +++
 utility/tests/DisjointTreeForest_unittest.cpp   |  98 ++++++
 24 files changed, 1277 insertions(+), 829 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 988ffd8..fa9141c 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -41,7 +41,6 @@ add_subdirectory(tests)
 
 # Declare micro-libs:
 add_library(quickstep_queryoptimizer_ExecutionGenerator ExecutionGenerator.cpp ExecutionGenerator.hpp)
-add_library(quickstep_queryoptimizer_ExecutionHeuristics ExecutionHeuristics.cpp ExecutionHeuristics.hpp)
 add_library(quickstep_queryoptimizer_LogicalGenerator LogicalGenerator.cpp LogicalGenerator.hpp)
 add_library(quickstep_queryoptimizer_LogicalToPhysicalMapper
             ../empty_src.cpp
@@ -73,7 +72,6 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto
-                      quickstep_queryoptimizer_ExecutionHeuristics
                       quickstep_queryoptimizer_OptimizerContext
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_queryoptimizer_QueryPlan
@@ -153,14 +151,6 @@ if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                         quickstep_catalog_Catalog_proto)
 endif()
-target_link_libraries(quickstep_queryoptimizer_ExecutionHeuristics
-                      glog
-                      quickstep_catalog_CatalogRelation
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_queryexecution_QueryContext
-                      quickstep_queryexecution_QueryContext_proto
-                      quickstep_queryoptimizer_QueryPlan
-                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_LogicalGenerator
                       glog
                       quickstep_parser_ParseStatement
@@ -196,6 +186,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
                       quickstep_queryoptimizer_LogicalToPhysicalMapper
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_rules_AttachLIPFilters
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOptimization
                       quickstep_queryoptimizer_rules_SwapProbeBuild
@@ -233,7 +224,6 @@ target_link_libraries(quickstep_queryoptimizer_Validator
 add_library(quickstep_queryoptimizer ../empty_src.cpp QueryOptimizerModule.hpp)
 target_link_libraries(quickstep_queryoptimizer
                       quickstep_queryoptimizer_ExecutionGenerator
-                      quickstep_queryoptimizer_ExecutionHeuristics
                       quickstep_queryoptimizer_LogicalGenerator
                       quickstep_queryoptimizer_LogicalToPhysicalMapper
                       quickstep_queryoptimizer_Optimizer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 9347c9c..09ef9e0 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -54,7 +54,6 @@
 #include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryContext.pb.h"
-#include "query_optimizer/ExecutionHeuristics.hpp"
 #include "query_optimizer/OptimizerContext.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryPlan.hpp"
@@ -211,11 +210,6 @@ void ExecutionGenerator::generatePlan(const P::PhysicalPtr &physical_plan) {
         temporary_relation_info.producer_operator_index);
   }
 
-  // Optimize execution plan based on heuristics captured during execution plan generation, if enabled.
-  if (FLAGS_optimize_joins) {
-    execution_heuristics_->optimizeExecutionPlan(execution_plan_, query_context_proto_);
-  }
-
 #ifdef QUICKSTEP_DISTRIBUTED
   catalog_database_cache_proto_->set_name(catalog_database_->getName());
 
@@ -600,34 +594,14 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   std::vector<attribute_id> probe_attribute_ids;
   std::vector<attribute_id> build_attribute_ids;
 
-  std::vector<attribute_id> probe_original_attribute_ids;
-  std::vector<attribute_id> build_original_attribute_ids;
-
-  const CatalogRelation *referenced_stored_probe_relation = nullptr;
-  const CatalogRelation *referenced_stored_build_relation = nullptr;
-
   std::size_t build_cardinality = cost_model_->estimateCardinality(build_physical);
 
   bool any_probe_attributes_nullable = false;
   bool any_build_attributes_nullable = false;
 
-  bool skip_hash_join_optimization = false;
-
   const std::vector<E::AttributeReferencePtr> &left_join_attributes =
       physical_plan->left_join_attributes();
   for (const E::AttributeReferencePtr &left_join_attribute : left_join_attributes) {
-    // Try to determine the original stored relation referenced in the Hash Join.
-    referenced_stored_probe_relation =
-        catalog_database_->getRelationByName(left_join_attribute->relation_name());
-    if (referenced_stored_probe_relation == nullptr) {
-      // Hash Join optimizations are not possible, if the referenced relation cannot be determined.
-      skip_hash_join_optimization = true;
-    } else {
-      const attribute_id probe_operator_attribute_id =
-          referenced_stored_probe_relation->getAttributeByName(left_join_attribute->attribute_name())->getID();
-      probe_original_attribute_ids.emplace_back(probe_operator_attribute_id);
-    }
-
     const CatalogAttribute *probe_catalog_attribute
         = attribute_substitution_map_[left_join_attribute->id()];
     probe_attribute_ids.emplace_back(probe_catalog_attribute->getID());
@@ -640,18 +614,6 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   const std::vector<E::AttributeReferencePtr> &right_join_attributes =
       physical_plan->right_join_attributes();
   for (const E::AttributeReferencePtr &right_join_attribute : right_join_attributes) {
-    // Try to determine the original stored relation referenced in the Hash Join.
-    referenced_stored_build_relation =
-        catalog_database_->getRelationByName(right_join_attribute->relation_name());
-    if (referenced_stored_build_relation == nullptr) {
-      // Hash Join optimizations are not possible, if the referenced relation cannot be determined.
-      skip_hash_join_optimization = true;
-    } else {
-      const attribute_id build_operator_attribute_id =
-          referenced_stored_build_relation->getAttributeByName(right_join_attribute->attribute_name())->getID();
-      build_original_attribute_ids.emplace_back(build_operator_attribute_id);
-    }
-
     const CatalogAttribute *build_catalog_attribute
         = attribute_substitution_map_[right_join_attribute->id()];
     build_attribute_ids.emplace_back(build_catalog_attribute->getID());
@@ -828,17 +790,6 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
       std::forward_as_tuple(join_operator_index,
                             output_relation));
   temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation);
-
-  // Add heuristics for the Hash Join, if enabled.
-  if (FLAGS_optimize_joins && !skip_hash_join_optimization) {
-    execution_heuristics_->addHashJoinInfo(build_operator_index,
-                                           join_operator_index,
-                                           referenced_stored_build_relation,
-                                           referenced_stored_probe_relation,
-                                           std::move(build_original_attribute_ids),
-                                           std::move(probe_original_attribute_ids),
-                                           join_hash_table_index);
-  }
 }
 
 void ExecutionGenerator::convertNestedLoopsJoin(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index 2aaf5ab..495955e 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -33,7 +33,6 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryContext.pb.h"
-#include "query_optimizer/ExecutionHeuristics.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryPlan.hpp"
 #include "query_optimizer/cost_model/CostModel.hpp"
@@ -102,8 +101,7 @@ class ExecutionGenerator {
       : catalog_database_(DCHECK_NOTNULL(catalog_database)),
         query_handle_(DCHECK_NOTNULL(query_handle)),
         execution_plan_(DCHECK_NOTNULL(query_handle->getQueryPlanMutable())),
-        query_context_proto_(DCHECK_NOTNULL(query_handle->getQueryContextProtoMutable())),
-        execution_heuristics_(new ExecutionHeuristics()) {
+        query_context_proto_(DCHECK_NOTNULL(query_handle->getQueryContextProtoMutable())) {
     query_context_proto_->set_query_id(query_handle_->query_id());
 #ifdef QUICKSTEP_DISTRIBUTED
     catalog_database_cache_proto_ = DCHECK_NOTNULL(query_handle->getCatalogDatabaseCacheProtoMutable());
@@ -386,7 +384,6 @@ class ExecutionGenerator {
   QueryHandle *query_handle_;
   QueryPlan *execution_plan_;  // A part of QueryHandle.
   serialization::QueryContext *query_context_proto_;  // A part of QueryHandle.
-  std::unique_ptr<ExecutionHeuristics> execution_heuristics_;
 
 #ifdef QUICKSTEP_DISTRIBUTED
   serialization::CatalogDatabase *catalog_database_cache_proto_;  // A part of QueryHandle.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/ExecutionHeuristics.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.cpp b/query_optimizer/ExecutionHeuristics.cpp
deleted file mode 100644
index 4fd7320..0000000
--- a/query_optimizer/ExecutionHeuristics.cpp
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "query_optimizer/ExecutionHeuristics.hpp"
-
-#include <cstddef>
-#include <utility>
-#include <unordered_map>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.pb.h"
-#include "query_optimizer/QueryPlan.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-namespace optimizer {
-
-void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan,
-                                                serialization::QueryContext *query_context_proto) {
-  // Currently this only optimizes left deep joins using bloom filters.
-  // It uses a simple algorithm to discover the left deep joins.
-  // It starts with the first hash join in the plan and keeps on iterating
-  // over the next hash joins, till a probe on a different relation id is found.
-  // The set of hash joins found in this way forms a chain and can be recognized
-  // as a left deep join. It becomes a candidate for optimization.
-
-  // The optimization is done by modifying each of the build operators in the chain
-  // to generate a bloom filter on the build key during their hash table creation.
-  // The leaf-level probe operator is then modified to query all the bloom
-  // filters generated from all the build operators in the chain. These
-  // bloom filters are queried to test the membership of the probe key
-  // just prior to probing the hash table.
-
-  QueryPlan::DAGNodeIndex origin_node = 0;
-  while (origin_node < hash_joins_.size() - 1) {
-    std::vector<std::size_t> chained_nodes;
-    chained_nodes.push_back(origin_node);
-    for (std::size_t i = origin_node + 1; i < hash_joins_.size(); ++i) {
-      const relation_id checked_relation_id = hash_joins_[origin_node].referenced_stored_probe_relation_->getID();
-      const relation_id expected_relation_id = hash_joins_[i].referenced_stored_probe_relation_->getID();
-      if (checked_relation_id == expected_relation_id) {
-        chained_nodes.push_back(i);
-      } else {
-        break;
-      }
-    }
-
-    // Only chains of length greater than one are suitable candidates for semi-join optimization.
-    if (chained_nodes.size() > 1) {
-      std::unordered_map<QueryContext::bloom_filter_id, std::vector<attribute_id>> probe_bloom_filter_info;
-      for (const std::size_t node : chained_nodes) {
-        // Provision for a new bloom filter to be used by the build operator.
-        const QueryContext::bloom_filter_id bloom_filter_id =  query_context_proto->bloom_filters_size();
-        serialization::BloomFilter *bloom_filter_proto = query_context_proto->add_bloom_filters();
-
-        // Modify the bloom filter properties based on the statistics of the relation.
-        setBloomFilterProperties(bloom_filter_proto, hash_joins_[node].referenced_stored_build_relation_);
-
-        // Add build-side bloom filter information to the corresponding hash table proto.
-        query_context_proto->mutable_join_hash_tables(hash_joins_[node].join_hash_table_id_)
-            ->add_build_side_bloom_filter_id(bloom_filter_id);
-
-        probe_bloom_filter_info.insert(std::make_pair(bloom_filter_id, hash_joins_[node].probe_attributes_));
-      }
-
-      // Add probe-side bloom filter information to the corresponding hash table proto for each build-side bloom filter.
-      for (const std::pair<QueryContext::bloom_filter_id, std::vector<attribute_id>>
-               &bloom_filter_info : probe_bloom_filter_info) {
-        auto *probe_side_bloom_filter =
-            query_context_proto->mutable_join_hash_tables(hash_joins_[origin_node].join_hash_table_id_)
-                                  ->add_probe_side_bloom_filters();
-        probe_side_bloom_filter->set_probe_side_bloom_filter_id(bloom_filter_info.first);
-        for (const attribute_id &probe_attribute_id : bloom_filter_info.second) {
-          probe_side_bloom_filter->add_probe_side_attr_ids(probe_attribute_id);
-        }
-      }
-
-      // Add node dependencies from chained build nodes to origin node probe.
-      for (std::size_t i = 1; i < chained_nodes.size(); ++i) {  // Note: It starts from index 1.
-        query_plan->addDirectDependency(hash_joins_[origin_node].join_operator_index_,
-                                        hash_joins_[origin_node + i].build_operator_index_,
-                                        true /* is_pipeline_breaker */);
-      }
-    }
-
-    // Update the origin node.
-    origin_node = chained_nodes.back() + 1;
-  }
-}
-
-void ExecutionHeuristics::setBloomFilterProperties(serialization::BloomFilter *bloom_filter_proto,
-                                                   const CatalogRelation *relation) {
-  const std::size_t cardinality = relation->estimateTupleCardinality();
-  if (cardinality < kOneThousand) {
-    bloom_filter_proto->set_bloom_filter_size(kOneThousand / kCompressionFactor);
-    bloom_filter_proto->set_number_of_hashes(kVeryLowSparsityHash);
-  } else if (cardinality < kTenThousand) {
-    bloom_filter_proto->set_bloom_filter_size(kTenThousand / kCompressionFactor);
-    bloom_filter_proto->set_number_of_hashes(kLowSparsityHash);
-  } else if (cardinality < kHundredThousand) {
-    bloom_filter_proto->set_bloom_filter_size(kHundredThousand / kCompressionFactor);
-    bloom_filter_proto->set_number_of_hashes(kMediumSparsityHash);
-  } else {
-    bloom_filter_proto->set_bloom_filter_size(kMillion / kCompressionFactor);
-    bloom_filter_proto->set_number_of_hashes(kHighSparsityHash);
-  }
-}
-
-}  // namespace optimizer
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/ExecutionHeuristics.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.hpp b/query_optimizer/ExecutionHeuristics.hpp
deleted file mode 100644
index 8ad3b7a..0000000
--- a/query_optimizer/ExecutionHeuristics.hpp
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_HEURISTICS_HPP_
-#define QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_HEURISTICS_HPP_
-
-#include <vector>
-
-#include "catalog/CatalogRelation.hpp"
-#include "catalog/CatalogTypedefs.hpp"
-#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryContext.pb.h"
-#include "query_optimizer/QueryPlan.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-namespace optimizer {
-
-/** \addtogroup QueryOptimizer
- *  @{
- */
-
-/**
- * @brief The ExecutionHeuristics compiles certain heuristics for an execution plan
- *        as it is being converted to a physical plan. These heuristics can then be
- *        used to optimize the execution plan after it has been generated.
- **/
-class ExecutionHeuristics {
- public:
-  static const std::size_t kOneHundred = 100;
-  static const std::size_t kOneThousand = 1000;
-  static const std::size_t kTenThousand = 10000;
-  static const std::size_t kHundredThousand = 100000;
-  static const std::size_t kMillion = 1000000;
-
-  static const std::size_t kCompressionFactor = 10;
-
-  static const std::size_t kVeryLowSparsityHash = 1;
-  static const std::size_t kLowSparsityHash = 2;
-  static const std::size_t kMediumSparsityHash = 5;
-  static const std::size_t kHighSparsityHash = 10;
-
-  /**
-   * @brief A simple internal class that holds information about various
-   *        hash joins within the execution plan for a query.
-   **/
-  struct HashJoinInfo {
-    HashJoinInfo(const QueryPlan::DAGNodeIndex build_operator_index,
-                 const QueryPlan::DAGNodeIndex join_operator_index,
-                 const CatalogRelation *referenced_stored_build_relation,
-                 const CatalogRelation *referenced_stored_probe_relation,
-                 std::vector<attribute_id> &&build_attributes,
-                 std::vector<attribute_id> &&probe_attributes,
-                 const QueryContext::join_hash_table_id join_hash_table_id)
-        : build_operator_index_(build_operator_index),
-          join_operator_index_(join_operator_index),
-          referenced_stored_build_relation_(referenced_stored_build_relation),
-          referenced_stored_probe_relation_(referenced_stored_probe_relation),
-          build_attributes_(std::move(build_attributes)),
-          probe_attributes_(std::move(probe_attributes)),
-          join_hash_table_id_(join_hash_table_id) {
-    }
-
-    const QueryPlan::DAGNodeIndex build_operator_index_;
-    const QueryPlan::DAGNodeIndex join_operator_index_;
-    const CatalogRelation *referenced_stored_build_relation_;
-    const CatalogRelation *referenced_stored_probe_relation_;
-    const std::vector<attribute_id> build_attributes_;
-    const std::vector<attribute_id> probe_attributes_;
-    const QueryContext::join_hash_table_id join_hash_table_id_;
-  };
-
-
-  /**
-   * @brief Constructor.
-   **/
-  ExecutionHeuristics() {}
-
-  /**
-   * @brief Saves information about a hash join used within the execution plan
-   *        for a query.
-   *
-   * @param build_operator_index Index of the build operator of the hash join.
-   * @param join_operator_index Index of the join operator of the hash join.
-   * @param build_relation_id Id of the relation on which hash table is being built.
-   * @param probe_relation_id Id of the relation on which hash table is being probed.
-   * @param build_attributes List of attributes on which hash table is being built.
-   * @param probe_attributes List of attributes on which hash table is being probed.
-   * @param join_hash_table_id Id of the hash table which refers to the actual hash
-   *        table within the query context.
-   **/
-  inline void addHashJoinInfo(const QueryPlan::DAGNodeIndex build_operator_index,
-                              const QueryPlan::DAGNodeIndex join_operator_index,
-                              const CatalogRelation *referenced_stored_build_relation,
-                              const CatalogRelation *referenced_stored_probe_relation,
-                              std::vector<attribute_id> &&build_attributes,
-                              std::vector<attribute_id> &&probe_attributes,
-                              const QueryContext::join_hash_table_id join_hash_table_id) {
-    hash_joins_.push_back(HashJoinInfo(build_operator_index,
-                                       join_operator_index,
-                                       referenced_stored_build_relation,
-                                       referenced_stored_probe_relation,
-                                       std::move(build_attributes),
-                                       std::move(probe_attributes),
-                                       join_hash_table_id));
-  }
-
-  /**
-   * @brief Optimize the execution plan based on heuristics generated
-   *        during physical plan to execution plan conversion.
-   *
-   * @param query_plan A mutable reference to the query execution plan.
-   * @param query_context_proto A mutable reference to the protobuf representation
-   *        of the query context.
-   **/
-  void optimizeExecutionPlan(QueryPlan *query_plan, serialization::QueryContext *query_context_proto);
-
-  /**
-   * @brief Set the properties of the bloom filter proto based on the statistics
-   *        of the given relation.
-   *
-   * @param bloom_filter_proto A mutable reference to the bloom filter protobuf representation.
-   * @param relation The catalog relation on which bloom filter is being built.
-   **/
-  void setBloomFilterProperties(serialization::BloomFilter *bloom_filter_proto,
-                                const CatalogRelation *relation);
-
- private:
-  std::vector<HashJoinInfo> hash_joins_;
-
-  DISALLOW_COPY_AND_ASSIGN(ExecutionHeuristics);
-};
-
-/** @} */
-
-}  // namespace optimizer
-}  // namespace quickstep
-
-#endif /* QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_HEURISTICS_HPP_ */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index 8f19702..9db4037 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -26,6 +26,7 @@
 #include "query_optimizer/Validator.hpp"
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/AttachLIPFilters.hpp"
 #include "query_optimizer/rules/PruneColumns.hpp"
 #include "query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp"
 #include "query_optimizer/rules/SwapProbeBuild.hpp"
@@ -49,6 +50,12 @@ DEFINE_bool(reorder_hash_joins, true,
             "cardinality and selective tables to be joined first, which is suitable "
             "for queries on star-schema tables.");
 
+DEFINE_bool(use_lip_filters, false,
+            "If true, use LIP (Lookahead Information Passing) filters to accelerate "
+            "query processing. LIP filters are effective for queries on star schema "
+            "tables (e.g. the SSB benchmark) and snowflake schema tables (e.g. the "
+            "TPC-H benchmark).");
+
 DEFINE_bool(visualize_plan, false,
             "If true, visualize the final physical plan into a graph in DOT format "
             "(DOT is a plain text graph description language). Then print the "
@@ -95,11 +102,16 @@ P::PhysicalPtr PhysicalGenerator::generateInitialPlan(
 
 P::PhysicalPtr PhysicalGenerator::optimizePlan() {
   std::vector<std::unique_ptr<Rule<P::Physical>>> rules;
+  rules.emplace_back(new PruneColumns());
   if (FLAGS_reorder_hash_joins) {
     rules.emplace_back(new StarSchemaHashJoinOrderOptimization());
+    rules.emplace_back(new PruneColumns());
+  } else {
+    rules.emplace_back(new SwapProbeBuild());
+  }
+  if (FLAGS_use_lip_filters) {
+    rules.emplace_back(new AttachLIPFilters());
   }
-  rules.emplace_back(new PruneColumns());
-  rules.emplace_back(new SwapProbeBuild());
 
   for (std::unique_ptr<Rule<P::Physical>> &rule : rules) {
     physical_plan_ = rule->apply(physical_plan_);
@@ -110,7 +122,7 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
   DVLOG(4) << "Optimized physical plan:\n" << physical_plan_->toString();
 
   if (FLAGS_visualize_plan) {
-  quickstep::PlanVisualizer plan_visualizer;
+    quickstep::PlanVisualizer plan_visualizer;
     std::cerr << "\n" << plan_visualizer.visualize(physical_plan_) << "\n";
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index 8d254fa..1075739 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -358,7 +358,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForPredicate(
           std::static_pointer_cast<const E::LogicalAnd>(filter_predicate);
       double selectivity = 1.0;
       for (const auto &predicate : logical_and->operands()) {
-        selectivity = selectivity * estimateSelectivityForPredicate(predicate, physical_plan);
+        selectivity = std::min(selectivity, estimateSelectivityForPredicate(predicate, physical_plan));
       }
       return selectivity;
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/physical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CMakeLists.txt b/query_optimizer/physical/CMakeLists.txt
index 3b7d3f0..5c2cd0b 100644
--- a/query_optimizer/physical/CMakeLists.txt
+++ b/query_optimizer/physical/CMakeLists.txt
@@ -27,6 +27,7 @@ add_library(quickstep_queryoptimizer_physical_HashJoin HashJoin.cpp HashJoin.hpp
 add_library(quickstep_queryoptimizer_physical_InsertSelection InsertSelection.cpp InsertSelection.hpp)
 add_library(quickstep_queryoptimizer_physical_InsertTuple InsertTuple.cpp InsertTuple.hpp)
 add_library(quickstep_queryoptimizer_physical_Join ../../empty_src.cpp Join.hpp)
+add_library(quickstep_queryoptimizer_physical_LIPFilterConfiguration ../../empty_src.cpp LIPFilterConfiguration.hpp)
 add_library(quickstep_queryoptimizer_physical_NestedLoopsJoin NestedLoopsJoin.cpp NestedLoopsJoin.hpp)
 add_library(quickstep_queryoptimizer_physical_PatternMatcher ../../empty_src.cpp PatternMatcher.hpp)
 add_library(quickstep_queryoptimizer_physical_Physical ../../empty_src.cpp Physical.hpp)
@@ -150,6 +151,10 @@ target_link_libraries(quickstep_queryoptimizer_physical_Join
                       quickstep_queryoptimizer_expressions_NamedExpression
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_physical_LIPFilterConfiguration
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter)
 target_link_libraries(quickstep_queryoptimizer_physical_NestedLoopsJoin
                       glog
                       quickstep_queryoptimizer_OptimizerTree
@@ -237,6 +242,7 @@ target_link_libraries(quickstep_queryoptimizer_physical_TopLevelPlan
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_ExprId
                       quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_physical_LIPFilterConfiguration
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_utility_Cast
@@ -279,6 +285,7 @@ target_link_libraries(quickstep_queryoptimizer_physical
                       quickstep_queryoptimizer_physical_InsertSelection
                       quickstep_queryoptimizer_physical_InsertTuple
                       quickstep_queryoptimizer_physical_Join
+                      quickstep_queryoptimizer_physical_LIPFilterConfiguration
                       quickstep_queryoptimizer_physical_NestedLoopsJoin
                       quickstep_queryoptimizer_physical_PatternMatcher
                       quickstep_queryoptimizer_physical_Physical

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/physical/LIPFilterConfiguration.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/LIPFilterConfiguration.hpp b/query_optimizer/physical/LIPFilterConfiguration.hpp
new file mode 100644
index 0000000..62a6149
--- /dev/null
+++ b/query_optimizer/physical/LIPFilterConfiguration.hpp
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_LIP_FILTER_CONFIGURATION_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_LIP_FILTER_CONFIGURATION_HPP_
+
+#include <cstddef>
+#include <map>
+#include <memory>
+#include <vector>
+
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+/** \addtogroup OptimizerPhysical
+ *  @{
+ */
+
+class Physical;
+typedef std::shared_ptr<const Physical> PhysicalPtr;
+
+/**
+ * @brief Optimizer information for a LIP filter builder.
+ */
+struct LIPFilterBuildInfo {
+  /**
+   * @brief Constructor.
+   *
+   * @param build_attribute_in The attribute to build the LIP filter with.
+   * @param filter_cardinality_in The LIP filter's cardinality.
+   * @param filter_type_in The LIP filter's type.
+   */
+  LIPFilterBuildInfo(const expressions::AttributeReferencePtr &build_attribute_in,
+                     const std::size_t filter_cardinality_in,
+                     const LIPFilterType &filter_type_in)
+      : build_attribute(build_attribute_in),
+        filter_cardinality(filter_cardinality_in),
+        filter_type(filter_type_in) {
+  }
+  const expressions::AttributeReferencePtr build_attribute;
+  const std::size_t filter_cardinality;
+  const LIPFilterType filter_type;
+};
+
+/**
+ * @brief Optimizer information for a LIP filter prober.
+ */
+struct LIPFilterProbeInfo {
+  /**
+   * @brief Constructor.
+   *
+   * @param probe_attribute_in The attribute to probe the LIP filter with.
+   * @param build_attribute_in The attribute that the LIP filter is built with.
+   * @param builder_in The physical node that the LIP filter's builder is attached to.
+   */
+  LIPFilterProbeInfo(const expressions::AttributeReferencePtr &probe_attribute_in,
+                     const expressions::AttributeReferencePtr &build_attribute_in,
+                     const PhysicalPtr &builder_in)
+      : probe_attribute(probe_attribute_in),
+        build_attribute(build_attribute_in),
+        builder(builder_in) {
+  }
+  const expressions::AttributeReferencePtr probe_attribute;
+  const expressions::AttributeReferencePtr build_attribute;
+  const PhysicalPtr builder;
+};
+
+
+class LIPFilterConfiguration;
+typedef std::shared_ptr<const LIPFilterConfiguration> LIPFilterConfigurationPtr;
+
+/**
+ * @brief Configuration information of all the LIP filters in a query plan.
+ */
+class LIPFilterConfiguration {
+ public:
+  /**
+   * @brief Constructor.
+   */
+  LIPFilterConfiguration() {
+  }
+
+  /**
+   * @brief Add information for a LIP filter builder.
+   *
+   * @param build_attribute The attribute to build the LIP filter with.
+   * @param builder The physical node to attach the LIP filter builder to.
+   * @param filter_size The LIP filter's cardinality.
+   * @param filter_type The LIP filter's type.
+   */
+  void addBuildInfo(const expressions::AttributeReferencePtr &build_attribute,
+                    const PhysicalPtr &builder,
+                    const std::size_t filter_size,
+                    const LIPFilterType &filter_type) {
+    build_info_map_[builder].emplace_back(
+        build_attribute, filter_size, filter_type);
+  }
+
+  /**
+   * @brief Add information for a LIP filter prober.
+   *
+   * @param probe_attribute The attribute to probe the LIP filter with.
+   * @param prober The physical node to attach the LIP filter prober to.
+   * @param build_attribute The attribute that the LIP filter is built with.
+   * @param builder The physical node that the LIP filter's builder is attached to.
+   */
+  void addProbeInfo(const expressions::AttributeReferencePtr &probe_attribute,
+                    const PhysicalPtr &prober,
+                    const expressions::AttributeReferencePtr &build_attribute,
+                    const PhysicalPtr &builder) {
+    probe_info_map_[prober].emplace_back(
+        probe_attribute, build_attribute, builder);
+  }
+
+  /**
+   * @brief Get all the LIP filter builders.
+   *
+   * @return A map where each key is a physical node and each mapped value is
+   *         a vector of all the LIP filter builders that are attached to the
+   *         physical node.
+   */
+  const std::map<PhysicalPtr, std::vector<LIPFilterBuildInfo>>& getBuildInfoMap() const {
+    return build_info_map_;
+  }
+
+  /**
+   * @brief Get all the LIP filter probers.
+   *
+   * @return A map where each key is a physical node and each mapped value is
+   *         a vector of all the LIP filter probers that are attached to the
+   *         physical node.
+   */
+  const std::map<PhysicalPtr, std::vector<LIPFilterProbeInfo>>& getProbeInfoMap() const {
+    return probe_info_map_;
+  }
+
+ private:
+  std::map<PhysicalPtr, std::vector<LIPFilterBuildInfo>> build_info_map_;
+  std::map<PhysicalPtr, std::vector<LIPFilterProbeInfo>> probe_info_map_;
+
+  DISALLOW_COPY_AND_ASSIGN(LIPFilterConfiguration);
+};
+
+/** @} */
+
+}  // namespace physical
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif /* QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_LIP_FILTER_CONFIGURATION_HPP_ */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/physical/TopLevelPlan.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/TopLevelPlan.hpp b/query_optimizer/physical/TopLevelPlan.hpp
index 8f07dec..7dfc2b6 100644
--- a/query_optimizer/physical/TopLevelPlan.hpp
+++ b/query_optimizer/physical/TopLevelPlan.hpp
@@ -29,6 +29,7 @@
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
 #include "utility/Macros.hpp"
@@ -89,6 +90,29 @@ class TopLevelPlan : public Physical {
     return shared_subplans_[index];
   }
 
+  /**
+   * @brief Creates a copy of the TopLevelPlan with lip_filter_configuration_
+   *        replaced by \p new_lip_filter_configuration.
+   *
+   * @param new_lip_filter_configuration The new lip_filter_configuration to be
+   *        substituted for the existing one.
+   * @return A copy of this TopLevelPlan with the new lip_filter_configuration.
+   */
+  TopLevelPlanPtr copyWithLIPFilterConfiguration(
+      const LIPFilterConfigurationPtr &new_lip_filter_configuration) const {
+    return TopLevelPlan::Create(plan_,
+                                shared_subplans_,
+                                uncorrelated_subquery_map_,
+                                new_lip_filter_configuration);
+  }
+
+  /**
+   * @return The LIPFilter configuration information for the overall query plan.
+   */
+  const LIPFilterConfigurationPtr& lip_filter_configuration() const {
+    return lip_filter_configuration_;
+  }
+
   PhysicalPtr copyWithNewChildren(
       const std::vector<PhysicalPtr> &new_children) const override {
     DCHECK_EQ(getNumChildren(), new_children.size());
@@ -125,18 +149,22 @@ class TopLevelPlan : public Physical {
    *
    * @param plan The query plan.
    * @param shared_subplans The subplans referenced in the main input plan.
-   * @param Map from the expression ID of an attribute reference to the
-   *        uncorrelated subquery that produces the attribute.
+   * @param uncorrelated_subquery_map Map from the expression ID of an attribute
+   *        reference to the uncorrelated subquery that produces the attribute.
+   * @param lip_filter_configuration The LIPFilter configuration information
+   *        for the overall query plan.
    * @return An immutable TopLevelPlan.
    */
   static TopLevelPlanPtr Create(
       const PhysicalPtr &plan,
       const std::vector<PhysicalPtr> &shared_subplans = {},
       const std::unordered_map<expressions::ExprId, int> &uncorrelated_subquery_map
-          = std::unordered_map<expressions::ExprId, int>()) {
+          = std::unordered_map<expressions::ExprId, int>(),
+      const LIPFilterConfigurationPtr &lip_filter_configuration = nullptr) {
     return TopLevelPlanPtr(new TopLevelPlan(plan,
                                             shared_subplans,
-                                            uncorrelated_subquery_map));
+                                            uncorrelated_subquery_map,
+                                            lip_filter_configuration));
   }
 
  protected:
@@ -151,10 +179,12 @@ class TopLevelPlan : public Physical {
  private:
   TopLevelPlan(const PhysicalPtr &plan,
                const std::vector<PhysicalPtr> &shared_subplans,
-               const std::unordered_map<expressions::ExprId, int> &uncorrelated_subquery_map)
+               const std::unordered_map<expressions::ExprId, int> &uncorrelated_subquery_map,
+               const LIPFilterConfigurationPtr &lip_filter_configuration)
       : plan_(plan),
         shared_subplans_(shared_subplans),
-        uncorrelated_subquery_map_(uncorrelated_subquery_map) {
+        uncorrelated_subquery_map_(uncorrelated_subquery_map),
+        lip_filter_configuration_(lip_filter_configuration) {
     addChild(plan);
     for (const PhysicalPtr &shared_subplan : shared_subplans) {
       addChild(shared_subplan);
@@ -165,6 +195,7 @@ class TopLevelPlan : public Physical {
   // Stored in the topological ordering based on dependencies.
   std::vector<PhysicalPtr> shared_subplans_;
   std::unordered_map<expressions::ExprId, int> uncorrelated_subquery_map_;
+  LIPFilterConfigurationPtr lip_filter_configuration_;
 
   DISALLOW_COPY_AND_ASSIGN(TopLevelPlan);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/rules/AttachLIPFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachLIPFilters.cpp b/query_optimizer/rules/AttachLIPFilters.cpp
new file mode 100644
index 0000000..090fb8c
--- /dev/null
+++ b/query_optimizer/rules/AttachLIPFilters.cpp
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/AttachLIPFilters.hpp"
+
+#include <map>
+#include <set>
+#include <unordered_set>
+#include <unordered_map>
+#include <vector>
+#include <utility>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+P::PhysicalPtr AttachLIPFilters::apply(const P::PhysicalPtr &input) {
+  DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+
+  const P::TopLevelPlanPtr top_level_plan =
+     std::static_pointer_cast<const P::TopLevelPlan>(input);
+  cost_model_.reset(
+      new cost::StarSchemaSimpleCostModel(
+          top_level_plan->shared_subplans()));
+  lip_filter_configuration_.reset(new P::LIPFilterConfiguration());
+
+  std::set<E::ExprId> already_filtered_attributes;
+  attachLIPFilters(NodeList(input), &already_filtered_attributes);
+
+  P::PhysicalPtr output;
+  if (!lip_filter_configuration_->getBuildInfoMap().empty() ||
+      !lip_filter_configuration_->getProbeInfoMap().empty()) {
+    output = top_level_plan->copyWithLIPFilterConfiguration(
+        P::LIPFilterConfigurationPtr(lip_filter_configuration_.release()));
+  } else {
+    output = input;
+  }
+  return output;
+}
+
+void AttachLIPFilters::attachLIPFilters(
+    const NodeList &path,
+    std::set<expressions::ExprId> *already_filtered_attributes) {
+  const P::PhysicalPtr &node = path.node;
+
+  // First process child nodes
+  for (const auto &child : node->children()) {
+    std::set<E::ExprId> child_filtered_attributes;
+    attachLIPFilters(path.cons(child), &child_filtered_attributes);
+    already_filtered_attributes->insert(child_filtered_attributes.begin(),
+                                        child_filtered_attributes.end());
+  }
+
+  // Attach LIP filters to HashJoin/Selection/Aggregate nodes
+  P::PhysicalPtr probe_child = nullptr;
+  switch (node->getPhysicalType()) {
+    case P::PhysicalType::kHashJoin:
+      probe_child = std::static_pointer_cast<const P::HashJoin>(node)->left();
+      break;
+    case P::PhysicalType::kSelection:
+      probe_child = std::static_pointer_cast<const P::Selection>(node)->input();
+      break;
+    case P::PhysicalType::kAggregate:
+      probe_child = std::static_pointer_cast<const P::Aggregate>(node)->input();
+      break;
+    default:
+      break;
+  }
+
+  if (probe_child != nullptr &&
+      cost_model_->estimateCardinality(probe_child) > 10000000) {
+    const auto &candidate_lip_filters = getProbeSideInfo(path.cons(probe_child));
+    if (!candidate_lip_filters.empty()) {
+      std::map<E::AttributeReferencePtr, LIPFilterInfoPtr> selected_filters;
+      for (const auto &info : candidate_lip_filters) {
+        auto it = selected_filters.find(info->attribute);
+        if (it == selected_filters.end()) {
+          selected_filters.emplace(info->attribute, info);
+        } else if (LIPFilterInfo::isBetterThan(*info, *it->second)) {
+          it->second = info;
+        }
+      }
+
+      for (const auto &pair : selected_filters) {
+        const E::ExprId source_attr_id = pair.second->source_attribute->id();
+        if (already_filtered_attributes->find(source_attr_id)
+                == already_filtered_attributes->end()) {
+          lip_filter_configuration_->addBuildInfo(
+              pair.second->source_attribute,
+              pair.second->source,
+              pair.second->estimated_cardinality * 8,
+              LIPFilterType::kSingleIdentityHashFilter);
+          lip_filter_configuration_->addProbeInfo(
+              pair.first,
+              node,
+              pair.second->source_attribute,
+              pair.second->source);
+          already_filtered_attributes->emplace(source_attr_id);
+        }
+      }
+    }
+  }
+}
+
+const std::vector<AttachLIPFilters::LIPFilterInfoPtr>& AttachLIPFilters
+    ::getBuildSideInfo(const NodeList &path) {
+  const P::PhysicalPtr &node = path.node;
+  if (build_side_info_.find(node) == build_side_info_.end()) {
+    std::vector<LIPFilterInfoPtr> lip_filters;
+
+    // 1. Gather candidate LIP filters propagated from descendant nodes.
+    std::unordered_set<E::ExprId> output_attribute_ids;
+    for (const auto &attr : node->getOutputAttributes()) {
+      output_attribute_ids.emplace(attr->id());
+    }
+    switch (node->getPhysicalType()) {
+      case P::PhysicalType::kAggregate:
+      case P::PhysicalType::kSelection:
+      case P::PhysicalType::kHashJoin: {
+        for (const P::PhysicalPtr &child : node->children()) {
+          for (const LIPFilterInfoPtr &info : getBuildSideInfo(path.cons(child))) {
+            lip_filters.emplace_back(info);
+          }
+        }
+        break;
+      }
+      default:
+        break;
+    }
+
+    // 2. Consider the parent physical node. If it is a HashJoin,
+    // then each build-side join attribute is a candidate LIP filter
+    // which can be built by the BuildHashOperator that corresponds
+    // to the parent HashJoin node.
+    P::HashJoinPtr hash_join;
+    if (path.cdr() != nullptr &&
+        P::SomeHashJoin::MatchesWithConditionalCast(path.cdr()->node, &hash_join)) {
+      const P::PhysicalPtr &build_node = hash_join->right();
+      // TODO(jianqiao): consider probe-side info to allow cascading propagation.
+      double selectivity = cost_model_->estimateSelectivity(build_node);
+      // Only consider attributes that are selective.
+      if (selectivity < 1.0) {
+        std::size_t cardinality = cost_model_->estimateCardinality(build_node);
+        for (const auto &attr : hash_join->right_join_attributes()) {
+          lip_filters.emplace_back(
+              std::make_shared<LIPFilterInfo>(attr,
+                                              path.cdr()->node,
+                                              path.depth,
+                                              selectivity,
+                                              cardinality));
+        }
+      }
+    }
+    build_side_info_.emplace(node, std::move(lip_filters));
+  }
+  return build_side_info_.at(node);
+}
+
+const std::vector<AttachLIPFilters::LIPFilterInfoPtr>& AttachLIPFilters
+    ::getProbeSideInfo(const NodeList &path) {
+  const P::PhysicalPtr &node = path.node;
+  if (probe_side_info_.find(node) == probe_side_info_.end()) {
+    std::vector<LIPFilterInfoPtr> lip_filters;
+    if (path.cdr() != nullptr) {
+      // 1. Gather candidate LIP filters propagated from ancestor nodes.
+      const auto &parent_lip_filters = getProbeSideInfo(*path.cdr());
+      if (!parent_lip_filters.empty()) {
+        std::unordered_set<E::ExprId> output_attribute_ids;
+        for (const auto &attr : node->getOutputAttributes()) {
+          output_attribute_ids.emplace(attr->id());
+        }
+        for (const auto &info : parent_lip_filters) {
+          if (output_attribute_ids.find(info->attribute->id()) != output_attribute_ids.end()) {
+            lip_filters.emplace_back(info);
+          }
+        }
+      }
+
+      // 2. Consider the parent physical node. If it is an InnerHashJoin or
+      // LeftSemiHashJoin, then we can propagate the build-side LIP filters
+      // to the probe-side.
+      P::HashJoinPtr hash_join;
+      if (P::SomeHashJoin::MatchesWithConditionalCast(path.cdr()->node, &hash_join) &&
+          (hash_join->join_type() == P::HashJoin::JoinType::kInnerJoin ||
+           hash_join->join_type() == P::HashJoin::JoinType::kLeftSemiJoin)) {
+        const P::PhysicalPtr &build_side_child = hash_join->right();
+        std::unordered_map<E::ExprId, E::AttributeReferencePtr> join_attribute_pairs;
+        for (std::size_t i = 0; i < hash_join->left_join_attributes().size(); ++i) {
+          const E::AttributeReferencePtr probe_side_join_attribute =
+              hash_join->left_join_attributes()[i];
+          const E::AttributeReferencePtr build_side_join_attribute =
+              hash_join->right_join_attributes()[i];
+          join_attribute_pairs.emplace(build_side_join_attribute->id(),
+                                       probe_side_join_attribute);
+        }
+        for (const auto &info : getBuildSideInfo(path.cdr()->cons(build_side_child))) {
+          const auto pair_it = join_attribute_pairs.find(info->attribute->id());
+          if (pair_it != join_attribute_pairs.end()) {
+            lip_filters.emplace_back(
+                std::make_shared<LIPFilterInfo>(pair_it->second,
+                                                info->source,
+                                                info->depth,
+                                                info->estimated_selectivity,
+                                                info->estimated_cardinality,
+                                                info->attribute));
+          }
+        }
+      }
+    }
+    probe_side_info_.emplace(node, std::move(lip_filters));
+  }
+  return probe_side_info_.at(node);
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/rules/AttachLIPFilters.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachLIPFilters.hpp b/query_optimizer/rules/AttachLIPFilters.hpp
new file mode 100644
index 0000000..b8cfc39
--- /dev/null
+++ b/query_optimizer/rules/AttachLIPFilters.hpp
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_LIP_FILTERS_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_LIP_FILTERS_HPP_
+
+#include <cstddef>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+/**
+ * @brief Rule that applies to a physical plan to attach LIPFilters.
+ */
+class AttachLIPFilters : public Rule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   */
+  AttachLIPFilters() {}
+
+  ~AttachLIPFilters() override {}
+
+  std::string getName() const override {
+    return "AttachLIPFilters";
+  }
+
+  physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override;
+
+ private:
+  /**
+   * @brief Internal data structure for passing around LIPFilter information.
+   */
+  struct LIPFilterInfo {
+    LIPFilterInfo(const expressions::AttributeReferencePtr &attribute_in,
+                  const physical::PhysicalPtr &source_in,
+                  const int depth_in,
+                  const double estimated_selectivity_in,
+                  const std::size_t estimated_cardinality_in,
+                  const expressions::AttributeReferencePtr &source_attribute_in = nullptr)
+        : attribute(attribute_in),
+          source(source_in),
+          depth(depth_in),
+          estimated_selectivity(estimated_selectivity_in),
+          estimated_cardinality(estimated_cardinality_in),
+          source_attribute(
+              source_attribute_in == nullptr
+                  ? attribute_in
+                  : source_attribute_in) {}
+
+    static bool isBetterThan(const LIPFilterInfo &a, const LIPFilterInfo &b) {
+      if (a.estimated_selectivity == b.estimated_selectivity) {
+        return a.depth > b.depth;
+      } else {
+        return a.estimated_selectivity < b.estimated_selectivity;
+      }
+    }
+
+    expressions::AttributeReferencePtr attribute;
+    physical::PhysicalPtr source;
+    int depth;
+    double estimated_selectivity;
+    std::size_t estimated_cardinality;
+    expressions::AttributeReferencePtr source_attribute;
+  };
+
+  typedef std::shared_ptr<const LIPFilterInfo> LIPFilterInfoPtr;
+
+  /**
+   * @brief Functional list data structure for internal use.
+   */
+  struct NodeList {
+    explicit NodeList(const physical::PhysicalPtr &node_in)
+        : node(node_in),
+          next(nullptr),
+          depth(0) {}
+
+    NodeList(const physical::PhysicalPtr &node_in,
+             const NodeList *next_in,
+             const int depth_in)
+        : node(node_in),
+          next(next_in),
+          depth(depth_in) {}
+
+    inline const NodeList *cdr() const {
+      return next;
+    }
+
+    inline const NodeList cons(const physical::PhysicalPtr &new_node) const {
+      return NodeList(new_node, this, depth+1);
+    }
+
+    const physical::PhysicalPtr node;
+    const NodeList *next;
+    const int depth;
+  };
+
+  void attachLIPFilters(const NodeList &path,
+                        std::set<expressions::ExprId> *already_filtered_attributes);
+
+  const std::vector<LIPFilterInfoPtr>& getBuildSideInfo(const NodeList &path);
+
+  const std::vector<LIPFilterInfoPtr>& getProbeSideInfo(const NodeList &path);
+
+  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+  std::map<physical::PhysicalPtr, std::vector<LIPFilterInfoPtr>> build_side_info_;
+  std::map<physical::PhysicalPtr, std::vector<LIPFilterInfoPtr>> probe_side_info_;
+  std::unique_ptr<physical::LIPFilterConfiguration> lip_filter_configuration_;
+
+  DISALLOW_COPY_AND_ASSIGN(AttachLIPFilters);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif /* QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_LIP_FILTERS_HPP_ */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index d9709ce..29875f6 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -18,6 +18,7 @@
 add_subdirectory(tests)
 
 # Declare micro-libs:
+add_library(quickstep_queryoptimizer_rules_AttachLIPFilters AttachLIPFilters.cpp AttachLIPFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp)
 add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp)
 add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
@@ -36,6 +37,21 @@ add_library(quickstep_queryoptimizer_rules_UnnestSubqueries UnnestSubqueries.cpp
 
 
 # Link dependencies:
+target_link_libraries(quickstep_queryoptimizer_rules_AttachLIPFilters
+                      quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExprId
+                      quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_LIPFilterConfiguration
+                      quickstep_queryoptimizer_physical_PatternMatcher
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_queryoptimizer_physical_Selection
+                      quickstep_queryoptimizer_physical_TopLevelPlan
+                      quickstep_queryoptimizer_rules_Rule
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter)
 target_link_libraries(quickstep_queryoptimizer_rules_BottomUpRule
                       glog
                       quickstep_queryoptimizer_rules_Rule
@@ -121,12 +137,14 @@ target_link_libraries(quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOpti
                       quickstep_queryoptimizer_expressions_NamedExpression
                       quickstep_queryoptimizer_expressions_PatternMatcher
                       quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_physical_Aggregate
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_PatternMatcher
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_queryoptimizer_physical_TopLevelPlan
                       quickstep_queryoptimizer_rules_Rule
+                      quickstep_utility_DisjointTreeForest
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_rules_SwapProbeBuild
                       quickstep_queryoptimizer_costmodel_SimpleCostModel
@@ -187,6 +205,7 @@ target_link_libraries(quickstep_queryoptimizer_rules_UpdateExpression
 # Module all-in-one library:
 add_library(quickstep_queryoptimizer_rules ../../empty_src.cpp OptimizerRulesModule.hpp)
 target_link_libraries(quickstep_queryoptimizer_rules
+                      quickstep_queryoptimizer_rules_AttachLIPFilters
                       quickstep_queryoptimizer_rules_BottomUpRule
                       quickstep_queryoptimizer_rules_CollapseProject
                       quickstep_queryoptimizer_rules_GenerateJoins

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
index 946d316..5906b98 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
@@ -19,6 +19,8 @@
 
 #include "query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp"
 
+#include <algorithm>
+#include <map>
 #include <memory>
 #include <set>
 #include <unordered_map>
@@ -28,11 +30,13 @@
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
 #include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/PatternMatcher.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "utility/DisjointTreeForest.hpp"
 
 #include "glog/logging.h"
 
@@ -74,6 +78,9 @@ P::PhysicalPtr StarSchemaHashJoinOrderOptimization::applyInternal(const P::Physi
     JoinGroupInfo *join_group = nullptr;
     if (parent_join_group == nullptr || !is_valid_cascading_hash_join) {
       new_join_group.reset(new JoinGroupInfo());
+      for (const auto &attr : input->getOutputAttributes()) {
+        new_join_group->referenced_attributes.emplace(attr->id());
+      }
       join_group = new_join_group.get();
     } else {
       join_group = parent_join_group;
@@ -146,7 +153,10 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
         i,
         tables[i],
         cost_model_->estimateCardinality(tables[i]),
-        cost_model_->estimateSelectivity(tables[i]));
+        cost_model_->estimateSelectivity(tables[i]),
+        CountSharedAttributes(join_group.referenced_attributes,
+                              tables[i]->getOutputAttributes()),
+        tables[i]->getPhysicalType() == physical::PhysicalType::kAggregate);
   }
 
   // Auxiliary mapping info.
@@ -163,9 +173,25 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
     }
   }
 
-  // Create a join graph where tables are vertices, and add an edge between vertices
-  // t1 and t2 for each join predicate t1.x = t2.y
-  std::vector<std::unordered_set<std::size_t>> join_graph(table_info_storage.size());
+  // The pool of tables.
+  std::set<TableInfo*> remaining_tables;
+  for (auto &table_info : table_info_storage) {
+    remaining_tables.emplace(&table_info);
+  }
+
+  // The equal-join (e.g. =) operator defines an equivalence relation on the
+  // set of all the attributes. The disjoint set data structure is used to keep
+  // track of the equivalence classes that each attribute belongs to.
+  DisjointTreeForest<E::ExprId> join_attribute_forest;
+  for (const auto &attr_id_pair : join_group.join_attribute_pairs) {
+    join_attribute_forest.makeSet(attr_id_pair.first);
+    join_attribute_forest.makeSet(attr_id_pair.second);
+    join_attribute_forest.merge(attr_id_pair.first, attr_id_pair.second);
+  }
+
+  // Map each equivalence class id to the members (e.g. <table id, attribute id>
+  // pairs) in that equivalence class.
+  std::map<std::size_t, std::map<std::size_t, E::ExprId>> join_attribute_groups;
   for (const auto &attr_id_pair : join_group.join_attribute_pairs) {
     DCHECK(attribute_id_to_table_info_index_map.find(attr_id_pair.first)
                != attribute_id_to_table_info_index_map.end());
@@ -178,128 +204,148 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
         attribute_id_to_table_info_index_map[attr_id_pair.second];
     DCHECK_NE(first_table_idx, second_table_idx);
 
-    table_info_storage[first_table_idx].join_attribute_pairs.emplace(
-        attr_id_pair.first, attr_id_pair.second);
-    table_info_storage[second_table_idx].join_attribute_pairs.emplace(
-        attr_id_pair.second, attr_id_pair.first);
-
-    join_graph[first_table_idx].emplace(second_table_idx);
-    join_graph[second_table_idx].emplace(first_table_idx);
-  }
-
-  std::set<TableInfo*, TableInfoPtrLessComparator> table_info_ordered_by_priority;
-  for (std::size_t i = 0; i < table_info_storage.size(); ++i) {
-    table_info_ordered_by_priority.emplace(&table_info_storage[i]);
+    DCHECK_EQ(join_attribute_forest.find(attr_id_pair.first),
+              join_attribute_forest.find(attr_id_pair.second));
+    const std::size_t attr_group_id = join_attribute_forest.find(attr_id_pair.first);
+    auto &attr_group = join_attribute_groups[attr_group_id];
+    attr_group.emplace(first_table_idx, attr_id_pair.first);
+    attr_group.emplace(second_table_idx, attr_id_pair.second);
   }
 
-  // Contruct hash join tree.
   while (true) {
-    TableInfo *first_table_info = *table_info_ordered_by_priority.begin();
-    table_info_ordered_by_priority.erase(
-        table_info_ordered_by_priority.begin());
-    const std::size_t first_table_info_id = first_table_info->table_info_id;
-
-    TableInfo *second_table_info = nullptr;
-    std::set<TableInfo*, TableInfoPtrLessComparator>::iterator second_table_info_it;
-    for (auto candidate_table_info_it = table_info_ordered_by_priority.begin();
-         candidate_table_info_it != table_info_ordered_by_priority.end();
-         ++candidate_table_info_it) {
-      TableInfo *candidate_table_info = *candidate_table_info_it;
-      const std::size_t candidate_table_info_id = candidate_table_info->table_info_id;
-
-      if (join_graph[first_table_info_id].find(candidate_table_info_id)
-              == join_graph[first_table_info_id].end() &&
-          join_graph[candidate_table_info_id].find(first_table_info_id)
-              == join_graph[candidate_table_info_id].end()) {
-        continue;
-      } else if (second_table_info == nullptr) {
-        second_table_info = candidate_table_info;
-        second_table_info_it = candidate_table_info_it;
-      }
-
-      bool is_likely_many_to_many_join = false;
-      for (const auto join_attr_pair : first_table_info->join_attribute_pairs) {
-        if (candidate_table_info->joined_attribute_set.find(join_attr_pair.second)
-                != candidate_table_info->joined_attribute_set.end()) {
-          is_likely_many_to_many_join = true;
-          break;
+    // Find the best probe/build pair out of the remaining tables.
+    // TODO(jianqiao): design better data structure to improve efficiency here.
+    std::unique_ptr<JoinPair> best_join = nullptr;
+    for (TableInfo *probe_table_info : remaining_tables) {
+      for (TableInfo *build_table_info : remaining_tables) {
+        if (probe_table_info != build_table_info) {
+          const std::size_t probe_table_id = probe_table_info->table_info_id;
+          const std::size_t build_table_id = build_table_info->table_info_id;
+          std::size_t num_join_attributes = 0;
+          double build_side_uniqueness = 1.0;
+          for (const auto &attr_group_pair : join_attribute_groups) {
+            const auto &attr_group = attr_group_pair.second;
+            auto probe_it = attr_group.find(probe_table_id);
+            auto build_it = attr_group.find(build_table_id);
+            if (probe_it != attr_group.end() && build_it != attr_group.end()) {
+              ++num_join_attributes;
+              build_side_uniqueness *= std::max(
+                  1uL,
+                  cost_model_->estimateNumDistinctValues(
+                      build_it->second, build_table_info->table));
+            }
+          }
+          build_side_uniqueness /= build_table_info->estimated_cardinality;
+
+          if (num_join_attributes > 0) {
+            std::unique_ptr<JoinPair> new_join(
+                new JoinPair(probe_table_info,
+                             build_table_info,
+                             build_side_uniqueness >= 0.9,
+                             num_join_attributes));
+            if (best_join == nullptr || new_join->isBetterThan(*best_join)) {
+              best_join.reset(new_join.release());
+            }
+          }
         }
       }
-      for (const auto join_attr_pair : candidate_table_info->join_attribute_pairs) {
-        if (first_table_info->joined_attribute_set.find(join_attr_pair.second)
-                != first_table_info->joined_attribute_set.end()) {
-          is_likely_many_to_many_join = true;
-          break;
-        }
-      }
-      if (!is_likely_many_to_many_join) {
-        second_table_info = candidate_table_info;
-        second_table_info_it = candidate_table_info_it;
-        break;
+    }
+
+    CHECK(best_join != nullptr);
+
+    TableInfo *selected_probe_table_info = best_join->probe;
+    TableInfo *selected_build_table_info = best_join->build;
+
+    // Swap probe/build sides if:
+    // (1) Build side is an aggregation with large number of groups, so that
+    //     there is a change to push LIPFilters down the aggregation.
+    // (2) Build side's join attributes are not unique, and it has larger
+    //     cardinality than the probe side.
+    const std::size_t probe_num_groups_as_agg =
+        getEstimatedNumGroups(selected_probe_table_info->table);
+    const std::size_t build_num_groups_as_agg =
+        getEstimatedNumGroups(selected_build_table_info->table);
+    if (build_num_groups_as_agg > 1000000 || probe_num_groups_as_agg > 1000000) {
+      if (build_num_groups_as_agg > probe_num_groups_as_agg) {
+        std::swap(selected_probe_table_info, selected_build_table_info);
       }
+    } else if ((!best_join->build_side_unique || best_join->num_join_attributes > 1) &&
+        selected_probe_table_info->estimated_cardinality < selected_build_table_info->estimated_cardinality) {
+      std::swap(selected_probe_table_info, selected_build_table_info);
     }
-    DCHECK(second_table_info != nullptr);
-    table_info_ordered_by_priority.erase(second_table_info_it);
 
-    const P::PhysicalPtr &left_child = first_table_info->table;
-    const P::PhysicalPtr &right_child = second_table_info->table;
+    remaining_tables.erase(selected_probe_table_info);
+    remaining_tables.erase(selected_build_table_info);
+
+    // Figure out the output attributes.
+    const P::PhysicalPtr &probe_child = selected_probe_table_info->table;
+    const P::PhysicalPtr &build_child = selected_build_table_info->table;
     std::vector<E::NamedExpressionPtr> output_attributes;
-    for (const E::AttributeReferencePtr &left_attr : left_child->getOutputAttributes()) {
-      output_attributes.emplace_back(left_attr);
+    for (const E::AttributeReferencePtr &probe_attr : probe_child->getOutputAttributes()) {
+      output_attributes.emplace_back(probe_attr);
     }
-    for (const E::AttributeReferencePtr &right_attr : right_child->getOutputAttributes()) {
-      output_attributes.emplace_back(right_attr);
+    for (const E::AttributeReferencePtr &build_attr : build_child->getOutputAttributes()) {
+      output_attributes.emplace_back(build_attr);
     }
 
-    std::vector<E::AttributeReferencePtr> left_join_attributes;
-    std::vector<E::AttributeReferencePtr> right_join_attributes;
-    std::unordered_set<expressions::ExprId> new_joined_attribute_set;
-    for (const auto &join_attr_pair : first_table_info->join_attribute_pairs) {
-      if (second_table_info->join_attribute_pairs.find(join_attr_pair.second)
-              != second_table_info->join_attribute_pairs.end()) {
-        left_join_attributes.emplace_back(
-            attribute_id_to_reference_map[join_attr_pair.first]);
-        right_join_attributes.emplace_back(
-            attribute_id_to_reference_map[join_attr_pair.second]);
-
-        new_joined_attribute_set.emplace(join_attr_pair.first);
-        new_joined_attribute_set.emplace(join_attr_pair.second);
+    // Figure out the join attributes.
+    std::vector<E::AttributeReferencePtr> probe_attributes;
+    std::vector<E::AttributeReferencePtr> build_attributes;
+    const std::size_t probe_table_id = selected_probe_table_info->table_info_id;
+    const std::size_t build_table_id = selected_build_table_info->table_info_id;
+    for (const auto &attr_group_pair : join_attribute_groups) {
+      const auto &attr_group = attr_group_pair.second;
+      auto probe_it = attr_group.find(probe_table_id);
+      auto build_it = attr_group.find(build_table_id);
+      if (probe_it != attr_group.end() && build_it != attr_group.end()) {
+        probe_attributes.emplace_back(
+            attribute_id_to_reference_map.at(probe_it->second));
+        build_attributes.emplace_back(
+            attribute_id_to_reference_map.at(build_it->second));
       }
     }
-    DCHECK_GE(left_join_attributes.size(), static_cast<std::size_t>(1));
 
-    if (table_info_ordered_by_priority.size() > 0) {
+    // Create a hash join from the choosen probe/build pair and put it back to
+    // the table pool. Return the last table in the table pool if there is only
+    // one table left.
+    if (remaining_tables.size() > 0) {
       P::PhysicalPtr output =
-          P::HashJoin::Create(left_child,
-                              right_child,
-                              left_join_attributes,
-                              right_join_attributes,
+          P::HashJoin::Create(probe_child,
+                              build_child,
+                              probe_attributes,
+                              build_attributes,
                               nullptr,
                               output_attributes,
                               P::HashJoin::JoinType::kInnerJoin);
 
-      second_table_info->table = output;
+      selected_probe_table_info->table = output;
 
       // TODO(jianqiao): Cache the estimated cardinality for each plan in cost
       // model to avoid duplicated estimation.
-      second_table_info->estimated_cardinality = cost_model_->estimateCardinality(output);
-
-      second_table_info->join_attribute_pairs.insert(first_table_info->join_attribute_pairs.begin(),
-                                                     first_table_info->join_attribute_pairs.end());
-      second_table_info->joined_attribute_set.insert(first_table_info->joined_attribute_set.begin(),
-                                                     first_table_info->joined_attribute_set.end());
-      second_table_info->joined_attribute_set.insert(new_joined_attribute_set.begin(),
-                                                     new_joined_attribute_set.end());
-      table_info_ordered_by_priority.emplace(second_table_info);
-
-      join_graph[second_table_info->table_info_id].insert(join_graph[first_table_info_id].begin(),
-                                                          join_graph[first_table_info_id].end());
-
+      selected_probe_table_info->estimated_cardinality = cost_model_->estimateCardinality(output);
+      selected_probe_table_info->estimated_selectivity = cost_model_->estimateSelectivity(output);
+
+      selected_probe_table_info->estimated_num_output_attributes =
+          CountSharedAttributes(join_group.referenced_attributes,
+                                output->getOutputAttributes());
+
+      remaining_tables.emplace(selected_probe_table_info);
+
+      // Update join attribute groups.
+      for (auto &attr_group_pair : join_attribute_groups) {
+        auto &attr_group = attr_group_pair.second;
+        auto build_it = attr_group.find(build_table_id);
+        if (build_it != attr_group.end()) {
+          const E::ExprId attr_id = build_it->second;
+          attr_group.erase(build_it);
+          attr_group.emplace(probe_table_id, attr_id);
+        }
+      }
     } else {
-      return P::HashJoin::Create(left_child,
-                                 right_child,
-                                 left_join_attributes,
-                                 right_join_attributes,
+      return P::HashJoin::Create(probe_child,
+                                 build_child,
+                                 probe_attributes,
+                                 build_attributes,
                                  residual_predicate,
                                  project_expressions,
                                  P::HashJoin::JoinType::kInnerJoin);
@@ -307,5 +353,28 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
   }
 }
 
+std::size_t StarSchemaHashJoinOrderOptimization::CountSharedAttributes(
+    const std::unordered_set<expressions::ExprId> &attr_set1,
+    const std::vector<expressions::AttributeReferencePtr> &attr_set2) {
+  std::size_t cnt = 0;
+  for (const auto &attr : attr_set2) {
+    if (attr_set1.find(attr->id()) != attr_set1.end()) {
+      ++cnt;
+    }
+  }
+  return cnt;
+}
+
+std::size_t StarSchemaHashJoinOrderOptimization::getEstimatedNumGroups(
+    const physical::PhysicalPtr &input) {
+  P::AggregatePtr aggregate;
+  if (P::SomeAggregate::MatchesWithConditionalCast(input, &aggregate)) {
+    return cost_model_->estimateNumGroupsForAggregate(aggregate);
+  } else {
+    return 0;
+  }
+}
+
+
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7a464434/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
index c1a7bae..64e2478 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
@@ -20,16 +20,15 @@
 #ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_STAR_SCHEMA_HASH_JOIN_ORDER_OPTIMIZATION_HPP_
 #define QUICKSTEP_QUERY_OPTIMIZER_RULES_STAR_SCHEMA_HASH_JOIN_ORDER_OPTIMIZATION_HPP_
 
-#include <algorithm>
 #include <cstddef>
 #include <memory>
 #include <string>
-#include <unordered_map>
 #include <unordered_set>
 #include <utility>
 #include <vector>
 
 #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
@@ -45,7 +44,11 @@ namespace optimizer {
  */
 
 /**
- * @brief TODO
+ * @brief Rule that applies to a physical plan to optimize hash join orders.
+ *
+ * This optimization applies a greedy algorithm to favor smaller cardinality
+ * and selective tables to be joined first, which is suitable for queries on
+ * star-schema or snowflake-schema tables.
  */
 class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
  public:
@@ -64,6 +67,7 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
    * @brief A group of tables to form a hash join tree.
    */
   struct JoinGroupInfo {
+    std::unordered_set<expressions::ExprId> referenced_attributes;
     std::vector<physical::PhysicalPtr> tables;
     std::vector<std::pair<expressions::ExprId, expressions::ExprId>> join_attribute_pairs;
   };
@@ -72,49 +76,91 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
    * @brief Auxiliary information of a table for the optimizer.
    */
   struct TableInfo {
-    TableInfo(const std::size_t in_table_info_id,
-              const physical::PhysicalPtr &in_table,
-              const std::size_t in_estimated_cardinality,
-              const double in_estimated_selectivity)
-        : table_info_id(in_table_info_id),
-          table(in_table),
-          estimated_cardinality(in_estimated_cardinality),
-          estimated_selectivity(in_estimated_selectivity) {
+    TableInfo(const std::size_t table_info_id_in,
+              const physical::PhysicalPtr &table_in,
+              const std::size_t estimated_cardinality_in,
+              const double estimated_selectivity_in,
+              const std::size_t estimated_num_output_attributes_in,
+              const bool is_aggregation_in)
+        : table_info_id(table_info_id_in),
+          table(table_in),
+          estimated_cardinality(estimated_cardinality_in),
+          estimated_selectivity(estimated_selectivity_in),
+          estimated_num_output_attributes(estimated_num_output_attributes_in) {
     }
 
     const std::size_t table_info_id;
     physical::PhysicalPtr table;
     std::size_t estimated_cardinality;
     double estimated_selectivity;
-    std::unordered_multimap<expressions::ExprId, expressions::ExprId> join_attribute_pairs;
-    std::unordered_set<expressions::ExprId> joined_attribute_set;
+    std::size_t estimated_num_output_attributes;
   };
 
-  /**
-   * @brief Comparator that compares the join priorities between two tables.
-   */
-  struct TableInfoPtrLessComparator {
-    inline bool operator() (const TableInfo *lhs, const TableInfo *rhs) {
-      bool swapped = false;
-      if (lhs->estimated_cardinality > rhs->estimated_cardinality) {
-        std::swap(lhs, rhs);
-        swapped = true;
+  struct JoinPair {
+    JoinPair(TableInfo *probe_in,
+             TableInfo *build_in,
+             const bool build_side_unique_in,
+             const std::size_t num_join_attributes_in)
+        : probe(probe_in),
+          build(build_in),
+          build_side_unique(build_side_unique_in),
+          num_join_attributes(num_join_attributes_in) {
+    }
+
+    inline bool isBetterThan(const JoinPair &rhs) const {
+      const auto &lhs = *this;
+
+      // Avoid carrying too many output attributes all the way through a long
+      // chain of hash joins.
+      const bool lhs_has_large_output =
+          lhs.build->estimated_num_output_attributes
+              + lhs.probe->estimated_num_output_attributes > 5;
+      const bool rhs_has_large_output =
+          rhs.build->estimated_num_output_attributes
+              + rhs.probe->estimated_num_output_attributes > 5;
+      if (lhs_has_large_output != rhs_has_large_output) {
+        return rhs_has_large_output;
+      }
+
+      // Prefer foreign-key primary-key style hash joins.
+      if (lhs.build_side_unique != rhs.build_side_unique) {
+        return lhs.build_side_unique;
+      }
+
+      // Prefer hash joins where the build side table is small.
+      const bool lhs_has_small_build = lhs.build->estimated_cardinality < 0x100;
+      const bool rhs_has_small_build = rhs.build->estimated_cardinality < 0x100;
+      if (lhs_has_small_build != rhs_has_small_build) {
+        return lhs_has_small_build;
       }
 
-      if (lhs->estimated_selectivity < rhs->estimated_selectivity) {
-        return !swapped;
-      } else if (lhs->estimated_cardinality < 100u &&
-                 rhs->estimated_cardinality > 10000u &&
-                 lhs->estimated_selectivity < rhs->estimated_selectivity * 1.5) {
-        return !swapped;
-      } else if (lhs->estimated_selectivity > rhs->estimated_selectivity) {
-        return swapped;
-      } else if (lhs->estimated_cardinality != rhs->estimated_cardinality) {
-        return !swapped;
+      // Prefer hash joins where the probe side table is small. This is effective
+      // for TPCH style (snowflake schema) queries, with the help of LIPFilters.
+      if (lhs.probe->estimated_cardinality != rhs.probe->estimated_cardinality) {
+        return lhs.probe->estimated_cardinality < rhs.probe->estimated_cardinality;
+      }
+
+      // Prefer build side tables with better selectivity. This is effective
+      // for SSB style queries.
+      if (lhs.build->estimated_selectivity != rhs.build->estimated_selectivity) {
+        return lhs.build->estimated_selectivity < rhs.build->estimated_selectivity;
+      }
+
+      // Residual rules that help provide a total order.
+      if (lhs.build->estimated_cardinality != rhs.build->estimated_cardinality) {
+        return lhs.build->estimated_cardinality < rhs.build->estimated_cardinality;
+      }
+      if (lhs.probe->table != rhs.probe->table) {
+        return lhs.probe->table < rhs.probe->table;
       } else {
-        return swapped ^ (lhs->table < rhs->table);
+        return lhs.build->table < rhs.build->table;
       }
     }
+
+    TableInfo *probe;
+    TableInfo *build;
+    const bool build_side_unique;
+    const std::size_t num_join_attributes;
   };
 
   physical::PhysicalPtr applyInternal(const physical::PhysicalPtr &input,
@@ -125,6 +171,12 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
       const expressions::PredicatePtr &residual_predicate,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions);
 
+  std::size_t getEstimatedNumGroups(const physical::PhysicalPtr &input);
+
+  static std::size_t CountSharedAttributes(
+      const std::unordered_set<expressions::ExprId> &attr_set1,
+      const std::vector<expressions::AttributeReferencePtr> &attr_set2);
+
   std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
 
   DISALLOW_COPY_AND_ASSIGN(StarSchemaHashJoinOrderOptimization);



[6/7] incubator-quickstep git commit: Refectored bulk insertion to the SplitRow store

Posted by hb...@apache.org.
Refectored bulk insertion to the SplitRow store

The inner loop of the insert algorithm has been changed to reduce
function calls to only those that are absolutely necessary. Also, we
merge copies which come from other rowstore source, speeding up
insertion time.

Also adds support for the idea of 'partial inserts'. Partial
inserts are when you are only inserting a subset of the columns at a
time. Partial inserts will be used in a later commit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d3a09205
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d3a09205
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d3a09205

Branch: refs/heads/partitioned-aggregate-new
Commit: d3a0920595e4c40ba5e86907359b6e254b5b4958
Parents: 55480d8
Author: cramja <ma...@gmail.com>
Authored: Wed Oct 5 16:40:30 2016 -0500
Committer: jianqiao <ji...@node-2.jianqiao.quickstep-pg0.wisc.cloudlab.us>
Committed: Wed Oct 19 00:32:08 2016 -0500

----------------------------------------------------------------------
 storage/SplitRowStoreTupleStorageSubBlock.cpp   | 691 +++++++++----------
 storage/SplitRowStoreTupleStorageSubBlock.hpp   | 186 +++++
 ...litRowStoreTupleStorageSubBlock_unittest.cpp | 449 ++++++++++--
 utility/BitVector.hpp                           |  14 +
 4 files changed, 906 insertions(+), 434 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d3a09205/storage/SplitRowStoreTupleStorageSubBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreTupleStorageSubBlock.cpp b/storage/SplitRowStoreTupleStorageSubBlock.cpp
index f955c99..068e975 100644
--- a/storage/SplitRowStoreTupleStorageSubBlock.cpp
+++ b/storage/SplitRowStoreTupleStorageSubBlock.cpp
@@ -41,54 +41,61 @@ namespace quickstep {
 
 QUICKSTEP_REGISTER_TUPLE_STORE(SplitRowStoreTupleStorageSubBlock, SPLIT_ROW_STORE);
 
+using splitrow_internal::CopyGroupList;
+using splitrow_internal::ContiguousAttrs;
+using splitrow_internal::NullableAttr;
+using splitrow_internal::VarLenAttr;
+
+const std::size_t SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize = sizeof(std::uint32_t) * 2;
+
 namespace {
 
-template <typename ValueAccessorT, bool nullable_attrs>
-inline std::size_t CalculateVariableSize(
+  template<typename ValueAccessorT, bool nullable_attrs>
+  inline std::size_t CalculateVariableSize(
     const CatalogRelationSchema &relation,
     const ValueAccessorT &accessor) {
-  std::size_t total_size = 0;
-  attribute_id accessor_attr_id = 0;
-  for (CatalogRelationSchema::const_iterator attr_it = relation.begin();
-       attr_it != relation.end();
-       ++attr_it, ++accessor_attr_id) {
-    if (!attr_it->getType().isVariableLength()) {
-      continue;
-    }
+    std::size_t total_size = 0;
+    attribute_id accessor_attr_id = 0;
+    for (CatalogRelationSchema::const_iterator attr_it = relation.begin();
+         attr_it != relation.end();
+         ++attr_it, ++accessor_attr_id) {
+      if (!attr_it->getType().isVariableLength()) {
+        continue;
+      }
 
-    TypedValue value(accessor.getTypedValue(accessor_attr_id));
-    if (nullable_attrs && value.isNull()) {
-      continue;
+      TypedValue value(accessor.getTypedValue(accessor_attr_id));
+      if (nullable_attrs && value.isNull()) {
+        continue;
+      }
+      total_size += value.getDataSize();
     }
-    total_size += value.getDataSize();
+    return total_size;
   }
-  return total_size;
-}
 
-template <typename ValueAccessorT, bool nullable_attrs>
-inline std::size_t CalculateVariableSizeWithRemappedAttributes(
+  template<typename ValueAccessorT, bool nullable_attrs>
+  inline std::size_t CalculateVariableSizeWithRemappedAttributes(
     const CatalogRelationSchema &relation,
     const ValueAccessorT &accessor,
     const std::vector<attribute_id> &attribute_map) {
-  std::size_t total_size = 0;
-  std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
-  for (CatalogRelationSchema::const_iterator attr_it = relation.begin();
-       attr_it != relation.end();
-       ++attr_it, ++attr_map_it) {
-    if (!attr_it->getType().isVariableLength()) {
-      continue;
-    }
+    std::size_t total_size = 0;
+    std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
+    for (CatalogRelationSchema::const_iterator attr_it = relation.begin();
+         attr_it != relation.end();
+         ++attr_it, ++attr_map_it) {
+      if (!attr_it->getType().isVariableLength()) {
+        continue;
+      }
 
-    TypedValue value(accessor.getTypedValue(*attr_map_it));
-    if (nullable_attrs && value.isNull()) {
-      continue;
+      TypedValue value(accessor.getTypedValue(*attr_map_it));
+      if (nullable_attrs && value.isNull()) {
+        continue;
+      }
+      total_size += value.getDataSize();
     }
-    total_size += value.getDataSize();
+    return total_size;
   }
-  return total_size;
-}
 
-}  // namespace
+}  // anonymous namespace
 
 SplitRowStoreTupleStorageSubBlock::SplitRowStoreTupleStorageSubBlock(
     const CatalogRelationSchema &relation,
@@ -101,7 +108,10 @@ SplitRowStoreTupleStorageSubBlock::SplitRowStoreTupleStorageSubBlock(
                            new_block,
                            sub_block_memory,
                            sub_block_memory_size),
-      header_(static_cast<Header*>(sub_block_memory)) {
+      header_(static_cast<Header*>(sub_block_memory)),
+      num_null_attrs_(0),
+      num_fixed_attrs_(0),
+      num_var_attrs_(0) {
   if (!DescriptionIsValid(relation_, description_)) {
     FATAL_ERROR("Attempted to construct a SplitRowStoreTupleStorageSubBlock from an invalid description.");
   }
@@ -143,6 +153,21 @@ SplitRowStoreTupleStorageSubBlock::SplitRowStoreTupleStorageSubBlock(
                    + sizeof(Header) + occupancy_bitmap_bytes_;
   tuple_storage_bytes_ = sub_block_memory_size_ - (sizeof(Header) + occupancy_bitmap_bytes_);
 
+  // Some accounting information for bulk inserts.
+  for (attribute_id attr_id = 0;
+       attr_id < static_cast<attribute_id>(relation.size());
+       ++attr_id) {
+    const Type& attr_type = relation.getAttributeById(attr_id)->getType();
+    if (attr_type.isVariableLength()) {
+      fixed_len_attr_sizes_.push_back(kInvalidAttributeID);
+      num_var_attrs_++;
+    } else {
+      fixed_len_attr_sizes_.push_back(attr_type.maximumByteLength());
+      num_fixed_attrs_++;
+    }
+    num_null_attrs_ += attr_type.isNullable();
+  }
+
   if (new_block) {
     // Only need to initialize these fields, the rest of the block will be
     // zeroed-out by the StorageManager.
@@ -194,380 +219,217 @@ TupleStorageSubBlock::InsertResult SplitRowStoreTupleStorageSubBlock::insertTupl
 }
 
 tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuples(ValueAccessor *accessor) {
-  const tuple_id original_num_tuples = header_->num_tuples;
-  tuple_id pos = 0;
+  std::vector<attribute_id> simple_remap;
+  for (attribute_id attr_id = 0;
+      attr_id < static_cast<attribute_id>(relation_.size());
+      ++attr_id) {
+    simple_remap.push_back(attr_id);
+  }
+  return bulkInsertDispatcher(simple_remap, accessor, kCatalogMaxID, true);
+}
 
-  InvokeOnAnyValueAccessor(
-      accessor,
-      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertPartialTuples(
+  const std::vector<attribute_id> &attribute_map,
+  ValueAccessor *accessor,
+  const tuple_id max_num_tuples_to_insert) {
+  return bulkInsertDispatcher(attribute_map, accessor, max_num_tuples_to_insert, false);
+}
+
+tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertDispatcher(
+  const std::vector<attribute_id> &attribute_map,
+  ValueAccessor *accessor,
+  tuple_id max_num_tuples_to_insert,
+  bool finalize) {
+  const bool fill_to_capacity = max_num_tuples_to_insert == kCatalogMaxID;
+
+  CopyGroupList copy_groups;
+  getCopyGroupsForAttributeMap(attribute_map, &copy_groups);
+  auto impl = accessor->getImplementationType();
+  const bool is_rowstore_source =
+    (impl == ValueAccessor::Implementation::kPackedRowStore ||
+     impl == ValueAccessor::Implementation::kSplitRowStore);
+  if (is_rowstore_source) {
+    copy_groups.merge_contiguous();
+  }
+
+  const bool copy_nulls = copy_groups.nullable_attrs_.size() > 0;
+  const bool copy_varlen = copy_groups.varlen_attrs_.size() > 0;
+
+  if (fill_to_capacity) {
     if (relation_.hasNullableAttributes()) {
-      if (relation_.isVariableLength()) {
-        while (accessor->next()) {
-          // If packed, insert at the end of the slot array, otherwise find the
-          // first hole.
-          pos = this->isPacked() ? header_->num_tuples
-                                 : occupancy_bitmap_->firstZero(pos);
-          const std::size_t tuple_variable_bytes
-              = CalculateVariableSize<decltype(*accessor), true>(relation_, *accessor);
-          if (!this->spaceToInsert(pos, tuple_variable_bytes)) {
-            accessor->previous();
-            break;
-          }
-          // Allocate variable-length storage.
-          header_->variable_length_bytes_allocated += tuple_variable_bytes;
-
-          // Find the slot and locate its sub-structures.
-          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
-          BitVector<true> tuple_null_bitmap(tuple_slot,
-                                            relation_.numNullableAttributes());
-          tuple_null_bitmap.clear();
-          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
-          std::uint32_t *variable_length_info_array = reinterpret_cast<std::uint32_t*>(
-              fixed_length_attr_storage + relation_.getFixedByteLength());
-          // Start writing variable-length data at the beginning of the newly
-          // allocated range.
-          std::uint32_t current_variable_position
-              = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
-
-          attribute_id accessor_attr_id = 0;
-          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
-               attr_it != relation_.end();
-               ++attr_it, ++accessor_attr_id) {
-            const int nullable_idx = relation_.getNullableAttributeIndex(attr_it->getID());
-            const int variable_idx = relation_.getVariableLengthAttributeIndex(attr_it->getID());
-            TypedValue attr_value(accessor->getTypedValue(accessor_attr_id));
-            if ((nullable_idx != -1) && (attr_value.isNull())) {
-              // Set null bit and move on.
-              tuple_null_bitmap.setBit(nullable_idx, true);
-              continue;
-            }
-            if (variable_idx != -1) {
-              // Write offset and size into the slot, then copy the actual
-              // value into the variable-length storage region.
-              const std::size_t attr_size = attr_value.getDataSize();
-              variable_length_info_array[variable_idx << 1] = current_variable_position;
-              variable_length_info_array[(variable_idx << 1) + 1] = attr_size;
-              attr_value.copyInto(static_cast<char*>(tuple_storage_) + current_variable_position);
-              current_variable_position += attr_size;
-            } else {
-              // Copy fixed-length value directly into the slot.
-              attr_value.copyInto(fixed_length_attr_storage
-                                  + relation_.getFixedLengthAttributeOffset(attr_it->getID()));
-            }
-          }
-          // Update occupancy bitmap and header.
-          occupancy_bitmap_->setBit(pos, true);
-          ++(header_->num_tuples);
-          if (pos > header_->max_tid) {
-            header_->max_tid = pos;
-          }
-        }
+      // TODO(marc) This is an annoying gotcha: the insertion loop assumes the null
+      // bitmaps are zero'd for a fresh insert. We could clear the bit map on each tuple
+      // iteration, but that'd be costlier.
+      std::int64_t remaining_bytes = tuple_storage_bytes_ -
+                                     (header_->variable_length_bytes_allocated +
+                                      (header_->num_tuples * tuple_slot_bytes_));
+      memset(static_cast<char *>(tuple_storage_) + header_->num_tuples * tuple_slot_bytes_, 0x0, remaining_bytes);
+    }
+  }
+
+  tuple_id num_inserted = 0;
+  if (max_num_tuples_to_insert == kCatalogMaxID) {
+    max_num_tuples_to_insert = getInsertLowerBound();
+  }
+  if (copy_varlen) {
+    if (copy_nulls) {
+      if (fill_to_capacity) {
+        num_inserted = bulkInsertPartialTuplesImpl<true, true, true>(copy_groups, accessor,
+                                                                     max_num_tuples_to_insert);
       } else {
-        // Same as above, but skip variable-length checks.
-        while (accessor->next()) {
-          pos = this->isPacked() ? header_->num_tuples
-                                 : occupancy_bitmap_->firstZero(pos);
-          if (!this->spaceToInsert(pos, 0)) {
-            accessor->previous();
-            break;
-          }
-          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
-          BitVector<true> tuple_null_bitmap(tuple_slot,
-                                            relation_.numNullableAttributes());
-          tuple_null_bitmap.clear();
-          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
-
-          attribute_id accessor_attr_id = 0;
-          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
-               attr_it != relation_.end();
-               ++attr_it, ++accessor_attr_id) {
-            const int nullable_idx = relation_.getNullableAttributeIndex(attr_it->getID());
-            if (nullable_idx != -1) {
-              const void *attr_value = accessor->template getUntypedValue<true>(accessor_attr_id);
-              if (attr_value == nullptr) {
-                tuple_null_bitmap.setBit(nullable_idx, true);
-              } else {
-                std::memcpy(fixed_length_attr_storage
-                                + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
-                            attr_value,
-                            attr_it->getType().maximumByteLength());
-              }
-            } else {
-              const void *attr_value = accessor->template getUntypedValue<false>(accessor_attr_id);
-              std::memcpy(fixed_length_attr_storage
-                              + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
-                          attr_value,
-                          attr_it->getType().maximumByteLength());
-            }
-          }
-          occupancy_bitmap_->setBit(pos, true);
-          ++(header_->num_tuples);
-          if (pos > header_->max_tid) {
-            header_->max_tid = pos;
-          }
-        }
+        num_inserted = bulkInsertPartialTuplesImpl<true, true, false>(copy_groups, accessor,
+                                                                      max_num_tuples_to_insert);
       }
     } else {
-      if (relation_.isVariableLength()) {
-        // Same as most general case above, but skip null checks.
-        while (accessor->next()) {
-          pos = this->isPacked() ? header_->num_tuples
-                                 : occupancy_bitmap_->firstZero(pos);
-          const std::size_t tuple_variable_bytes
-              = CalculateVariableSize<decltype(*accessor), false>(relation_, *accessor);
-          if (!this->spaceToInsert(pos, tuple_variable_bytes)) {
-            accessor->previous();
-            break;
-          }
-          header_->variable_length_bytes_allocated += tuple_variable_bytes;
-
-          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
-          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
-          std::uint32_t *variable_length_info_array = reinterpret_cast<std::uint32_t*>(
-              fixed_length_attr_storage + relation_.getFixedByteLength());
-          std::uint32_t current_variable_position
-              = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
-
-          attribute_id accessor_attr_id = 0;
-          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
-               attr_it != relation_.end();
-               ++attr_it, ++accessor_attr_id) {
-            const int variable_idx = relation_.getVariableLengthAttributeIndex(attr_it->getID());
-            TypedValue attr_value(accessor->getTypedValue(accessor_attr_id));
-            if (variable_idx != -1) {
-              const std::size_t attr_size = attr_value.getDataSize();
-              variable_length_info_array[variable_idx << 1] = current_variable_position;
-              variable_length_info_array[(variable_idx << 1) + 1] = attr_size;
-              attr_value.copyInto(static_cast<char*>(tuple_storage_) + current_variable_position);
-              current_variable_position += attr_size;
-            } else {
-              attr_value.copyInto(fixed_length_attr_storage
-                                  + relation_.getFixedLengthAttributeOffset(attr_it->getID()));
-            }
-          }
-          occupancy_bitmap_->setBit(pos, true);
-          ++(header_->num_tuples);
-          if (pos > header_->max_tid) {
-            header_->max_tid = pos;
-          }
-        }
+      if (fill_to_capacity) {
+        num_inserted = bulkInsertPartialTuplesImpl<false, true, true>(copy_groups, accessor,
+                                                                      max_num_tuples_to_insert);
       } else {
-        // Simplest case: skip both null and variable-length checks.
-        while (accessor->next()) {
-          pos = this->isPacked() ? header_->num_tuples
-                                 : occupancy_bitmap_->firstZero(pos);
-          if (!this->spaceToInsert(pos, 0)) {
-            accessor->previous();
-            break;
-          }
-          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
-          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
-
-          attribute_id accessor_attr_id = 0;
-          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
-               attr_it != relation_.end();
-               ++attr_it, ++accessor_attr_id) {
-            const void *attr_value = accessor->template getUntypedValue<false>(accessor_attr_id);
-            std::memcpy(fixed_length_attr_storage
-                            + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
-                        attr_value,
-                        attr_it->getType().maximumByteLength());
-          }
-          occupancy_bitmap_->setBit(pos, true);
-          ++(header_->num_tuples);
-          if (pos > header_->max_tid) {
-            header_->max_tid = pos;
-          }
-        }
+        num_inserted = bulkInsertPartialTuplesImpl<false, true, false>(copy_groups, accessor,
+                                                                       max_num_tuples_to_insert);
       }
     }
-  });
+  } else {
+    if (copy_nulls) {
+      num_inserted = bulkInsertPartialTuplesImpl<true, false, false>(copy_groups, accessor, max_num_tuples_to_insert);
+    } else {
+      num_inserted = bulkInsertPartialTuplesImpl<false, false, false>(copy_groups, accessor, max_num_tuples_to_insert);
+    }
+  }
 
-  return header_->num_tuples - original_num_tuples;
+  if (finalize) {
+    bulkInsertPartialTuplesFinalize(num_inserted);
+  }
+  return num_inserted;
 }
 
-tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuplesWithRemappedAttributes(
-    const std::vector<attribute_id> &attribute_map,
-    ValueAccessor *accessor) {
-  DEBUG_ASSERT(attribute_map.size() == relation_.size());
-  const tuple_id original_num_tuples = header_->num_tuples;
-  tuple_id pos = 0;
+// copy_nulls is true if the incoming attributes include at least one nullable attribute
+// copy_varlen is true if the incoming attributes include at least one varlen attribute
+template<bool copy_nulls, bool copy_varlen, bool fill_to_capacity>
+tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertPartialTuplesImpl(
+  const CopyGroupList &copy_groups,
+  ValueAccessor *accessor,
+  std::size_t max_num_tuples_to_insert) {
+  std::size_t num_tuples_inserted = 0;
+
+  // We only append to the end of the block to cut down on complexity.
+  char *tuple_slot = static_cast<char *>(tuple_storage_) +  header_->num_tuples * tuple_slot_bytes_;
+
+  std::uint32_t varlen_heap_offset = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
+  std::uint32_t varlen_heap_offset_orig = varlen_heap_offset;
+
+  BitVector<true> tuple_null_bitmap(tuple_slot, num_null_attrs_);
+  char *fixed_len_cursor = tuple_slot + BitVector<true>::BytesNeeded(num_null_attrs_);
+
+
+
+  std::size_t storage_available = tuple_storage_bytes_ -
+                                    (header_->variable_length_bytes_allocated +
+                                     header_->num_tuples * tuple_slot_bytes_);
+
+  // The number of bytes that must be reserved per tuple inserted due to gaps.
+  std::size_t varlen_reserve = relation_.getMaximumVariableByteLength();
+  if (fill_to_capacity) {
+    for (std::size_t vattr_idx = 0; vattr_idx < copy_groups.varlen_attrs_.size(); vattr_idx++) {
+      varlen_reserve -= relation_.getAttributeById(
+        copy_groups.varlen_attrs_[vattr_idx].dst_attr_id_)->getType().maximumByteLength();
+    }
+  }
 
   InvokeOnAnyValueAccessor(
-      accessor,
-      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-    if (relation_.hasNullableAttributes()) {
-      if (relation_.isVariableLength()) {
-        while (accessor->next()) {
-          pos = this->isPacked() ? header_->num_tuples
-                                 : occupancy_bitmap_->firstZero(pos);
-          const std::size_t tuple_variable_bytes
-              = CalculateVariableSizeWithRemappedAttributes<decltype(*accessor), true>(
-                  relation_, *accessor, attribute_map);
-          if (!this->spaceToInsert(pos, tuple_variable_bytes)) {
-            accessor->previous();
-            break;
+    accessor,
+    [&](auto *accessor) -> void {  // NOLINT(build/c++11
+      do {
+        const std::size_t num_c_attr = copy_groups.contiguous_attrs_.size();
+        const std::size_t num_n_attr = copy_groups.nullable_attrs_.size();
+        const std::size_t num_v_attr = copy_groups.varlen_attrs_.size();
+
+        const std::size_t nullmap_size = BitVector<true>::BytesNeeded(num_null_attrs_);
+
+        while (num_tuples_inserted < max_num_tuples_to_insert && accessor->next()) {
+          for (std::size_t cattr_idx = 0; cattr_idx < num_c_attr; cattr_idx++) {
+            const ContiguousAttrs &cattr = copy_groups.contiguous_attrs_[cattr_idx];
+            fixed_len_cursor += cattr.bytes_to_advance_;
+            const void *attr_value = accessor->template getUntypedValue<false>(cattr.src_attr_id_);
+            std::memcpy(fixed_len_cursor, attr_value, cattr.bytes_to_copy_);
           }
-          header_->variable_length_bytes_allocated += tuple_variable_bytes;
-
-          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
-          BitVector<true> tuple_null_bitmap(tuple_slot,
-                                            relation_.numNullableAttributes());
-          tuple_null_bitmap.clear();
-          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
-          std::uint32_t *variable_length_info_array = reinterpret_cast<std::uint32_t*>(
-              fixed_length_attr_storage + relation_.getFixedByteLength());
-          std::uint32_t current_variable_position
-              = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
-
-          std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
-          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
-               attr_it != relation_.end();
-               ++attr_it, ++attr_map_it) {
-            const int nullable_idx = relation_.getNullableAttributeIndex(attr_it->getID());
-            const int variable_idx = relation_.getVariableLengthAttributeIndex(attr_it->getID());
-            TypedValue attr_value(accessor->getTypedValue(*attr_map_it));
-            if ((nullable_idx != -1) && (attr_value.isNull())) {
-              tuple_null_bitmap.setBit(nullable_idx, true);
-              continue;
-            }
-            if (variable_idx != -1) {
-              const std::size_t attr_size = attr_value.getDataSize();
-              variable_length_info_array[variable_idx << 1] = current_variable_position;
-              variable_length_info_array[(variable_idx << 1) + 1] = attr_size;
-              attr_value.copyInto(static_cast<char*>(tuple_storage_) + current_variable_position);
-              current_variable_position += attr_size;
-            } else {
-              attr_value.copyInto(fixed_length_attr_storage
-                                  + relation_.getFixedLengthAttributeOffset(attr_it->getID()));
-            }
-          }
-          occupancy_bitmap_->setBit(pos, true);
-          ++(header_->num_tuples);
-          if (pos > header_->max_tid) {
-            header_->max_tid = pos;
-          }
-        }
-      } else {
-        while (accessor->next()) {
-          pos = this->isPacked() ? header_->num_tuples
-                                 : occupancy_bitmap_->firstZero(pos);
-          if (!this->spaceToInsert(pos, 0)) {
-            accessor->previous();
-            break;
-          }
-          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
-          BitVector<true> tuple_null_bitmap(tuple_slot,
-                                            relation_.numNullableAttributes());
-          tuple_null_bitmap.clear();
-          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
-
-          std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
-          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
-               attr_it != relation_.end();
-               ++attr_it, ++attr_map_it) {
-            const int nullable_idx = relation_.getNullableAttributeIndex(attr_it->getID());
-            if (nullable_idx != -1) {
-              const void *attr_value = accessor->template getUntypedValue<true>(*attr_map_it);
+
+          if (copy_nulls) {
+            tuple_null_bitmap.setMemory(tuple_slot);
+            for (std::size_t nattr_idx = 0; nattr_idx < num_n_attr; nattr_idx++) {
+              const NullableAttr &nattr = copy_groups.nullable_attrs_[nattr_idx];
+              const void *attr_value = accessor->template getUntypedValue<true>(nattr.src_attr_id_);
               if (attr_value == nullptr) {
-                tuple_null_bitmap.setBit(nullable_idx, true);
-              } else {
-                std::memcpy(fixed_length_attr_storage
-                                + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
-                            attr_value,
-                            attr_it->getType().maximumByteLength());
+                tuple_null_bitmap.setBit(nattr.nullable_attr_idx_, true);
               }
-            } else {
-              const void *attr_value = accessor->template getUntypedValue<false>(*attr_map_it);
-              std::memcpy(fixed_length_attr_storage
-                              + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
-                          attr_value,
-                          attr_it->getType().maximumByteLength());
             }
           }
-          occupancy_bitmap_->setBit(pos, true);
-          ++(header_->num_tuples);
-          if (pos > header_->max_tid) {
-            header_->max_tid = pos;
-          }
-        }
-      }
-    } else {
-      if (relation_.isVariableLength()) {
-        while (accessor->next()) {
-          pos = this->isPacked() ? header_->num_tuples
-                                 : occupancy_bitmap_->firstZero(pos);
-          const std::size_t tuple_variable_bytes
-              = CalculateVariableSizeWithRemappedAttributes<decltype(*accessor), false>(
-                  relation_, *accessor, attribute_map);
-          if (!this->spaceToInsert(pos, tuple_variable_bytes)) {
-            accessor->previous();
-            break;
-          }
-          header_->variable_length_bytes_allocated += tuple_variable_bytes;
-
-          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
-          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
-          std::uint32_t *variable_length_info_array = reinterpret_cast<std::uint32_t*>(
-              fixed_length_attr_storage + relation_.getFixedByteLength());
-          std::uint32_t current_variable_position
-              = tuple_storage_bytes_ - header_->variable_length_bytes_allocated;
-
-          std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
-          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
-               attr_it != relation_.end();
-               ++attr_it, ++attr_map_it) {
-            const int variable_idx = relation_.getVariableLengthAttributeIndex(attr_it->getID());
-            TypedValue attr_value(accessor->getTypedValue(*attr_map_it));
-            if (variable_idx != -1) {
+
+          if (copy_varlen) {
+            for (std::size_t vattr_idx = 0; vattr_idx < num_v_attr; vattr_idx++) {
+              const VarLenAttr &vattr = copy_groups.varlen_attrs_[vattr_idx];
+              fixed_len_cursor += vattr.bytes_to_advance_;
+              // Typed value is necessary as we need the length.
+              const TypedValue &attr_value = accessor->template getTypedValue(vattr.src_attr_id_);
+              if (attr_value.isNull()) {
+                continue;
+              }
               const std::size_t attr_size = attr_value.getDataSize();
-              variable_length_info_array[variable_idx << 1] = current_variable_position;
-              variable_length_info_array[(variable_idx << 1) + 1] = attr_size;
-              attr_value.copyInto(static_cast<char*>(tuple_storage_) + current_variable_position);
-              current_variable_position += attr_size;
-            } else {
-              attr_value.copyInto(fixed_length_attr_storage
-                                  + relation_.getFixedLengthAttributeOffset(attr_it->getID()));
+              varlen_heap_offset -= attr_size;
+              std::memcpy(static_cast<char *>(tuple_storage_) + varlen_heap_offset, attr_value.getDataPtr(),
+                          attr_size);
+              reinterpret_cast<std::uint32_t *>(fixed_len_cursor)[0] = varlen_heap_offset;
+              reinterpret_cast<std::uint32_t *>(fixed_len_cursor)[1] = static_cast<std::uint32_t>(attr_size);
             }
           }
-          occupancy_bitmap_->setBit(pos, true);
-          ++(header_->num_tuples);
-          if (pos > header_->max_tid) {
-            header_->max_tid = pos;
-          }
+          tuple_slot += tuple_slot_bytes_;
+          fixed_len_cursor = tuple_slot + nullmap_size;
+          num_tuples_inserted++;
         }
-      } else {
-        while (accessor->next()) {
-          pos = this->isPacked() ? header_->num_tuples
-                                 : occupancy_bitmap_->firstZero(pos);
-          if (!this->spaceToInsert(pos, 0)) {
-            accessor->previous();
-            break;
-          }
-          void *tuple_slot = static_cast<char*>(tuple_storage_) + pos * tuple_slot_bytes_;
-          char *fixed_length_attr_storage = static_cast<char*>(tuple_slot) + per_tuple_null_bitmap_bytes_;
-
-          std::vector<attribute_id>::const_iterator attr_map_it = attribute_map.begin();
-          for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
-               attr_it != relation_.end();
-               ++attr_it, ++attr_map_it) {
-            const void *attr_value = accessor->template getUntypedValue<false>(*attr_map_it);
-            std::memcpy(fixed_length_attr_storage
-                            + relation_.getFixedLengthAttributeOffset(attr_it->getID()),
-                        attr_value,
-                        attr_it->getType().maximumByteLength());
-          }
-          occupancy_bitmap_->setBit(pos, true);
-          ++(header_->num_tuples);
-          if (pos > header_->max_tid) {
-            header_->max_tid = pos;
+        if (fill_to_capacity) {
+          std::int64_t remaining_storage_after_inserts = storage_available -
+                                                         (num_tuples_inserted * (tuple_slot_bytes_ + varlen_reserve) +
+                                                          (varlen_heap_offset_orig - varlen_heap_offset));
+          DCHECK_LE(0, remaining_storage_after_inserts);
+          std::size_t additional_tuples_insert =
+            remaining_storage_after_inserts / this->relation_.getMaximumByteLength();
+          // We want to avoid a situation where we have several short insert iterations
+          // near the end of an insertion cycle.
+          if (additional_tuples_insert > this->getInsertLowerBoundThreshold()) {
+            max_num_tuples_to_insert += additional_tuples_insert;
           }
         }
-      }
-    }
-  });
+      } while (fill_to_capacity && !accessor->iterationFinishedVirtual() &&
+               num_tuples_inserted < max_num_tuples_to_insert);
+    });
+
+  if (copy_varlen) {
+    header_->variable_length_bytes_allocated += (varlen_heap_offset_orig - varlen_heap_offset);
+  }
 
-  return header_->num_tuples - original_num_tuples;
+  return num_tuples_inserted;
+}
+
+void SplitRowStoreTupleStorageSubBlock::bulkInsertPartialTuplesFinalize(
+    const tuple_id num_tuples_inserted) {
+  occupancy_bitmap_->setBitRange(header_->max_tid + 1, num_tuples_inserted, true);
+  header_->num_tuples += num_tuples_inserted;
+  header_->max_tid += num_tuples_inserted;
+}
+
+std::size_t SplitRowStoreTupleStorageSubBlock::getInsertLowerBound() const {
+  const std::size_t remaining_storage_bytes = tuple_storage_bytes_ -
+                                              (header_->variable_length_bytes_allocated +
+                                               ((header_->max_tid + 1) * tuple_slot_bytes_));
+  const std::size_t tuple_max_size = tuple_slot_bytes_ + relation_.getMaximumVariableByteLength();
+  return remaining_storage_bytes / tuple_max_size;
+}
+
+tuple_id SplitRowStoreTupleStorageSubBlock::bulkInsertTuplesWithRemappedAttributes(
+    const std::vector<attribute_id> &attribute_map,
+    ValueAccessor *accessor) {
+  DCHECK_EQ(relation_.size(), attribute_map.size());
+  return bulkInsertDispatcher(attribute_map, accessor, kCatalogMaxID, true);
 }
 
 const void* SplitRowStoreTupleStorageSubBlock::getAttributeValue(
@@ -1002,4 +864,67 @@ TupleStorageSubBlock::InsertResult SplitRowStoreTupleStorageSubBlock::insertTupl
   return InsertResult(pos, false);
 }
 
+// Copy groups are used by insert algorithms to efficiently copy attributes from a
+// variety of source schemas with some matching attributes in the destination (this) store.
+// SplitRow has 3 distinct zones which define a physical tuple:
+//    [null_bitmap] [fixed_length_zone] [var_len_pairs]
+// When we do our insert algorithm, we first copy over fixed length attributes. Since there
+// can be gaps, and reorderings in the source schema, we need to know:
+//    * Where to copy the src attr into (ie offset from start of fixed_len_zone)
+//    * How many bytes to copy
+//    * Which src attr we are copying
+// When copying fixed length attributes, we calculate the offset into our tuple, do a memcpy for
+// the length of the data with the src attribute.
+//
+// Copying variable length attributes pairs is similar. Note that there is a heap at the end of
+// the SplitRow for actual data and the tuple contains pairs of (heap offset, length). Having to
+// copy varlen into the heap is the main difference from copying fixed length.
+void SplitRowStoreTupleStorageSubBlock::getCopyGroupsForAttributeMap(
+  const std::vector<attribute_id> &attribute_map,
+  CopyGroupList *copy_groups) {
+  DCHECK_EQ(attribute_map.size(), relation_.size());
+
+  attribute_id num_attrs = attribute_map.size();
+
+  std::size_t contig_adv = 0;
+  std::size_t varlen_adv = 0;
+  for (attribute_id attr_id = 0; attr_id < num_attrs; ++attr_id) {
+    attribute_id src_attr = attribute_map[attr_id];
+
+    // Attribute doesn't exist in src.
+    if (src_attr == kInvalidCatalogId) {
+      // create a placeholder for now
+      if (relation_.getVariableLengthAttributeIndex(attr_id) == -1) {
+        // fixed len
+        contig_adv += fixed_len_attr_sizes_[attr_id];
+      } else {
+        // var len
+        varlen_adv += kVarLenSlotSize;
+      }
+      continue;
+    }
+
+    // Attribute exists in src.
+    if (relation_.getVariableLengthAttributeIndex(attr_id) == -1) {
+      // fixed len
+      copy_groups->contiguous_attrs_.push_back(
+        ContiguousAttrs(src_attr, fixed_len_attr_sizes_[attr_id], contig_adv));
+      contig_adv = fixed_len_attr_sizes_[attr_id];
+    } else {
+      // var len
+      copy_groups->varlen_attrs_.push_back(VarLenAttr(src_attr, attr_id, varlen_adv));
+      varlen_adv = SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize;
+    }
+
+    if (relation_.getNullableAttributeIndex(attr_id) != -1) {
+      copy_groups->nullable_attrs_.push_back(
+        NullableAttr(src_attr, relation_.getNullableAttributeIndex(attr_id)));
+    }
+  }
+  // This will point us to the beginning of the varlen zone.
+  if (copy_groups->varlen_attrs_.size() > 0) {
+    copy_groups->varlen_attrs_[0].bytes_to_advance_ += contig_adv;
+  }
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d3a09205/storage/SplitRowStoreTupleStorageSubBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/SplitRowStoreTupleStorageSubBlock.hpp b/storage/SplitRowStoreTupleStorageSubBlock.hpp
index a930103..681001e 100644
--- a/storage/SplitRowStoreTupleStorageSubBlock.hpp
+++ b/storage/SplitRowStoreTupleStorageSubBlock.hpp
@@ -45,6 +45,150 @@ class ValueAccessor;
 
 QUICKSTEP_DECLARE_SUB_BLOCK_TYPE_REGISTERED(SplitRowStoreTupleStorageSubBlock);
 
+namespace splitrow_internal {
+// A CopyGroup contains information about ane run of attributes in the source
+// ValueAccessor that can be copied into the output block. The
+// getCopyGroupsForAttributeMap function below takes an attribute map for a source
+// and converts it into a sequence of runs. The goal is to minimize the number
+// of memcpy calls and address calculations that occur during bulk insertion.
+// Contiguous attributes from a rowstore source can be merged into a single copy group.
+//
+// A single ContiguousAttrs CopyGroup consists of contiguous attributes, nullable
+// or not. "Contiguous" here means that their attribute IDs are successive in both
+// the source and destination relations.
+//
+// A NullAttr refers to exactly one nullable attribute. Nullable columns are
+// represented using fixed length inline data as well as a null bitmap.
+// In a particular tuple, if the attribute has a null value, the inline data
+// has no meaning. So it is safe to copy it or not. We use this fact to merge
+// runs together aggressively, i.e., a ContiguousAttrs group may include a
+// nullable attribute. However, we also create a NullableAttr in that case in
+// order to check the null bitmap.
+//
+// A gap is a run of destination (output) attributes that don't come from a
+// particular source. This occurs during bulkInsertPartialTuples. They must be
+// skipped during the insert (not copied over). They are indicated by a
+// kInvalidCatalogId in the attribute map. For efficiency, the gap size
+// is merged into the bytes_to_advance_ of previous ContiguousAttrs copy group.
+// For gaps at the start of the attribute map, we just create a ContiguousAttrs
+// copy group with 0 bytes to copy and dummy (0) source attribute id.
+//
+// eg. For 4B integer attrs, from a row store source,
+// if the input attribute_map is {-1,0,5,6,7,-1,2,4,9,10,-1}
+// with input/output attributes 4 and 7 being nullable,
+// we will create the following ContiguousAttrs copy groups
+//
+//  ----------------------------------------------------
+//  |src_id_      |bytes_to_advance_| bytes_to_copy_   |
+//  |-------------|-----------------|------------------|
+//  |            0|                4|                 4|
+//  |            5|                4|                12|
+//  |            2|               16|                 4|
+//  |            4|                4|                 4|
+//  |            9|                4|                 8|
+//  ----------------------------------------------------
+// and two NullableAttrs with src_attr_id_ set to 4 and 7.
+//
+// In this example, we do 6 memcpy calls and 6 address calculations
+// as well as 2 bitvector lookups for each tuple. A naive copy algorithm
+// would do 11 memcpy calls and address calculations, along with the
+// bitvector lookups, not to mention the schema lookups,
+// all interspersed in a complex loop with lots of branches.
+//
+// If the source was a column store, then we can't merge contiguous
+// attributes (or gaps). So we would have 11 ContigousAttrs copy groups with
+// three of them having bytes_to_copy = 0 (corresponding to the gaps) and
+// the rest having bytes_to_copy_ = 4.
+//
+// SplitRowStore supports variable length attributes. Since the layout of the
+// tuple is like: [null bitmap][fixed length attributes][variable length offsets]
+// we do all the variable length copies after the fixed length copies.
+//
+struct CopyGroup {
+  attribute_id src_attr_id_;  // The attr_id of starting input attribute for run.
+
+  explicit CopyGroup(const attribute_id source_attr_id)
+    : src_attr_id_(source_attr_id) {}
+};
+
+struct ContiguousAttrs : public CopyGroup {
+  std::size_t bytes_to_advance_;  // Number of bytes to advance destination ptr
+                                  // to get to the location where we copy THIS attribute.
+  std::size_t bytes_to_copy_;     // Number of bytes to copy from source.
+
+  ContiguousAttrs(
+    const attribute_id source_attr_id,
+    const std::size_t bytes_to_copy,
+    const std::size_t bytes_to_advance)
+    : CopyGroup(source_attr_id),
+      bytes_to_advance_(bytes_to_advance),
+      bytes_to_copy_(bytes_to_copy) { }
+};
+
+struct VarLenAttr : public CopyGroup {
+  std::size_t bytes_to_advance_;
+  attribute_id dst_attr_id_;
+  VarLenAttr(const attribute_id source_attr_id,
+             const attribute_id dst_attr_id,
+             const std::size_t bytes_to_advance)
+    : CopyGroup(source_attr_id),
+      bytes_to_advance_(bytes_to_advance),
+      dst_attr_id_(dst_attr_id) {}
+};
+
+struct NullableAttr : public CopyGroup {
+  int nullable_attr_idx_;  // index into null bitmap
+
+  NullableAttr(attribute_id source_attr_id_,
+               int nullable_attr_idx)
+    : CopyGroup(source_attr_id_),
+      nullable_attr_idx_(nullable_attr_idx) {}
+};
+
+struct CopyGroupList {
+  CopyGroupList()
+    : contiguous_attrs_(),
+      nullable_attrs_(),
+      varlen_attrs_() {}
+
+  /**
+   * @brief Attributes which are exactly sequential are merged to a single copy.
+   */
+  void merge_contiguous() {
+    if (contiguous_attrs_.size() < 2) {
+      return;
+    }
+
+    int add_to_advance = 0;
+    for (std::size_t idx = 1; idx < contiguous_attrs_.size(); ++idx) {
+      ContiguousAttrs *current_attr = &contiguous_attrs_[idx];
+      ContiguousAttrs *previous_attr = &contiguous_attrs_[idx - 1];
+      if (add_to_advance > 0) {
+        current_attr->bytes_to_advance_ += add_to_advance;
+        add_to_advance = 0;
+      }
+      // The merge step:
+      if (previous_attr->src_attr_id_ + 1 == current_attr->src_attr_id_ &&
+            previous_attr->bytes_to_copy_ == current_attr->bytes_to_advance_) {
+        previous_attr->bytes_to_copy_ += current_attr->bytes_to_copy_;
+        add_to_advance += current_attr->bytes_to_advance_;
+        contiguous_attrs_.erase(contiguous_attrs_.begin() + idx);
+        idx--;
+      }
+    }
+
+    if (varlen_attrs_.size() > 0) {
+      varlen_attrs_[0].bytes_to_advance_ += add_to_advance;
+    }
+  }
+
+  std::vector<ContiguousAttrs> contiguous_attrs_;
+  std::vector<NullableAttr> nullable_attrs_;
+  std::vector<VarLenAttr> varlen_attrs_;
+};
+
+}  // namespace splitrow_internal
+
 /** \addtogroup Storage
  *  @{
  */
@@ -60,6 +204,8 @@ QUICKSTEP_DECLARE_SUB_BLOCK_TYPE_REGISTERED(SplitRowStoreTupleStorageSubBlock);
  *       storage can be reclaimed by calling rebuild().
  **/
 class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
+  static const std::size_t kVarLenSlotSize;
+
  public:
   SplitRowStoreTupleStorageSubBlock(const CatalogRelationSchema &relation,
                                     const TupleStorageSubBlockDescription &description,
@@ -155,6 +301,13 @@ class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
       const std::vector<attribute_id> &attribute_map,
       ValueAccessor *accessor) override;
 
+  tuple_id bulkInsertPartialTuples(
+    const std::vector<attribute_id> &attribute_map,
+    ValueAccessor *accessor,
+    const tuple_id max_num_tuples_to_insert);
+
+  void bulkInsertPartialTuplesFinalize(const tuple_id num_tuples_inserted);
+
   const void* getAttributeValue(const tuple_id tuple,
                                 const attribute_id attr) const override;
 
@@ -213,6 +366,33 @@ class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
   template <bool nullable_attrs, bool variable_length_attrs>
   InsertResult insertTupleImpl(const Tuple &tuple);
 
+  template<bool copy_nulls, bool copy_varlen, bool fill_to_capacity>
+  tuple_id bulkInsertPartialTuplesImpl(
+    const splitrow_internal::CopyGroupList &copy_groups,
+    ValueAccessor *accessor,
+    std::size_t max_num_tuples_to_insert);
+
+  tuple_id bulkInsertDispatcher(
+    const std::vector<attribute_id> &attribute_map,
+    ValueAccessor *accessor,
+    tuple_id max_num_tuples_to_insert,
+    bool finalize);
+
+  void getCopyGroupsForAttributeMap(
+    const std::vector<attribute_id> &attribute_map,
+    splitrow_internal::CopyGroupList *copy_groups);
+
+  std::size_t getInsertLowerBound() const;
+
+  // When varlen attributes are bulk inserted, the difference between the maximum
+  // possible size and the actual size of the tuples will cause an underestimate of
+  // the number of tuples we can insert. This threshold puts a limit on the number
+  // of tuples to attempt to insert. A smaller number will give more rounds of insertion
+  // and a more-packed block, but at the cost of insertion speed.
+  std::size_t getInsertLowerBoundThreshold() const {
+    return 10;
+  }
+
   Header *header_;
 
   std::unique_ptr<BitVector<false>> occupancy_bitmap_;
@@ -221,12 +401,18 @@ class SplitRowStoreTupleStorageSubBlock: public TupleStorageSubBlock {
   void *tuple_storage_;
   std::size_t tuple_storage_bytes_;
   std::size_t tuple_slot_bytes_;
+  std::vector<std::size_t> fixed_len_attr_sizes_;
+
+  std::size_t num_null_attrs_;
+  std::size_t num_fixed_attrs_;
+  std::size_t num_var_attrs_;
 
   std::size_t per_tuple_null_bitmap_bytes_;
 
   friend class SplitRowStoreTupleStorageSubBlockTest;
   friend class SplitRowStoreValueAccessor;
   FRIEND_TEST(SplitRowStoreTupleStorageSubBlockTest, InitializeTest);
+  FRIEND_TEST(SplitRowStoreTupleStorageSubBlockTest, GetCopyGroupsForAttributeMapTest);
 
   DISALLOW_COPY_AND_ASSIGN(SplitRowStoreTupleStorageSubBlock);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d3a09205/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp b/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
index 2943343..b953854 100644
--- a/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
+++ b/storage/tests/SplitRowStoreTupleStorageSubBlock_unittest.cpp
@@ -22,6 +22,7 @@
 #include <cstdio>
 #include <cstring>
 #include <memory>
+#include <string>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -61,6 +62,11 @@ using std::snprintf;
 
 namespace quickstep {
 
+using splitrow_internal::CopyGroupList;
+using splitrow_internal::ContiguousAttrs;
+using splitrow_internal::NullableAttr;
+using splitrow_internal::VarLenAttr;
+
 namespace {
 
 // Used to set up a value-parameterized test with certain features for
@@ -76,9 +82,11 @@ enum class AttributeTypeFeatures {
 
 class SplitRowStoreTupleStorageSubBlockTest
     : public ::testing::TestWithParam<AttributeTypeFeatures> {
- protected:
+ public:
   static const std::size_t kSubBlockSize = 0x100000;  // 1 MB
+  static const std::size_t kVarLenSize = 26;
 
+ protected:
   virtual void SetUp() {
     // Create a sample relation with a variety of attribute types.
     relation_.reset(new CatalogRelation(nullptr, "TestRelation"));
@@ -102,7 +110,7 @@ class SplitRowStoreTupleStorageSubBlockTest
         relation_.get(),
         "string_attr",
         TypeFactory::GetType(testVariableLength() ? kVarChar : kChar,
-                             26,
+                             kVarLenSize,
                              testNullable()));
     ASSERT_EQ(2, relation_->addAttribute(current_attr));
 
@@ -147,6 +155,14 @@ class SplitRowStoreTupleStorageSubBlockTest
     return tuple_store_->tuple_storage_bytes_;
   }
 
+  std::size_t getTupleInsertLowerBound() const {
+    return tuple_store_->getInsertLowerBound();
+  }
+
+  std::size_t getInsertLowerBoundThreshold() const {
+    return tuple_store_->getInsertLowerBoundThreshold();
+  }
+
   Tuple createSampleTuple(const int base_value) const {
     std::vector<TypedValue> attribute_values;
 
@@ -174,10 +190,10 @@ class SplitRowStoreTupleStorageSubBlockTest
       char string_buffer[13];
       int written = snprintf(string_buffer, sizeof(string_buffer), "%d", base_value);
       if (testVariableLength()) {
-        attribute_values.emplace_back((VarCharType::InstanceNonNullable(26).makeValue(string_buffer,
+        attribute_values.emplace_back((VarCharType::InstanceNonNullable(kVarLenSize).makeValue(string_buffer,
                                                                                       written + 1)));
       } else {
-        attribute_values.emplace_back((CharType::InstanceNonNullable(26).makeValue(string_buffer,
+        attribute_values.emplace_back((CharType::InstanceNonNullable(kVarLenSize).makeValue(string_buffer,
                                                                                    written + 1)));
       }
       attribute_values.back().ensureNotReference();
@@ -197,6 +213,11 @@ class SplitRowStoreTupleStorageSubBlockTest
     tuple_store_->rebuild();
   }
 
+  void getCopyGroupsForAttributeMap(const std::vector<attribute_id> &attribute_map,
+                                    CopyGroupList *copy_groups) {
+    tuple_store_->getCopyGroupsForAttributeMap(attribute_map, copy_groups);
+  }
+
   void checkTupleValuesUntyped(const tuple_id tid,
                                const int base_value) {
     ASSERT_TRUE(tuple_store_->hasTupleWithID(tid));
@@ -269,6 +290,135 @@ class SplitRowStoreTupleStorageSubBlockTest
 };
 typedef SplitRowStoreTupleStorageSubBlockTest SplitRowStoreTupleStorageSubBlockDeathTest;
 
+class SplitRowWrapper {
+ public:
+  enum AttrType {
+    kInt = 0,
+    kDouble,
+    kString,
+    kNumAttrTypes
+  };
+
+  /**
+   * Builds a catalog relation given a list of attributes.
+   *
+   * @param attribute_ordering The ordering of the attributes in the represented relation. Attribute #1 is an
+   *                integer attribute, #2 is a double, and #3 is a string.
+   * @param contains_nullable If the relation contains nullable attributes.
+   * @param contains_varlen If the relation contains variable length attributes.
+   * @return A caller-owned catalog relation.
+   */
+  static CatalogRelation *
+  GetRelationFromAttributeList(const std::vector<attribute_id> &attribute_ordering, bool contains_nullable,
+                               bool contains_varlen) {
+    // Create a unique name.
+    std::string rel_name("TempRelation");
+    for (auto attr_itr = attribute_ordering.begin();
+         attr_itr != attribute_ordering.end();
+         ++attr_itr) {
+      rel_name += "_" + std::to_string(*attr_itr);
+    }
+    CatalogRelation *relation = new CatalogRelation(nullptr, rel_name.c_str());
+
+    std::vector<int> attr_counts(AttrType::kNumAttrTypes);
+    std::string attr_name;
+    for (auto attr_itr = attribute_ordering.begin();
+         attr_itr != attribute_ordering.end();
+         ++attr_itr) {
+      switch (*attr_itr) {
+        case AttrType::kInt:
+          // An integer.
+          attr_name = "int_attr_" + std::to_string(attr_counts[AttrType::kInt]);
+          relation->addAttribute(new CatalogAttribute(
+            relation,
+            attr_name.c_str(),
+            TypeFactory::GetType(TypeID::kInt, contains_nullable)));
+          attr_counts[AttrType::kInt]++;
+          break;
+        case AttrType::kDouble:
+          // A double.
+          attr_name = "double_attr_" + std::to_string(attr_counts[AttrType::kDouble]);
+          relation->addAttribute(new CatalogAttribute(
+            relation,
+            attr_name.c_str(),
+            TypeFactory::GetType(TypeID::kDouble, contains_nullable)));
+          attr_counts[AttrType::kDouble]++;
+          break;
+        case AttrType::kString:
+          // A (possibly variable-length) string.
+          attr_name = "string_attr_" + std::to_string(attr_counts[AttrType::kString]);
+          relation->addAttribute(new CatalogAttribute(
+            relation,
+            attr_name.c_str(),
+            TypeFactory::GetType(contains_varlen ? TypeID::kVarChar : TypeID::kChar,
+                                 SplitRowStoreTupleStorageSubBlockTest::kVarLenSize,
+                                 contains_nullable)));
+          attr_counts[AttrType::kString]++;
+          break;
+        default:
+          LOG(FATAL) << "Unknown type was specified in SplitRowWrapper.";
+          break;
+      }
+    }
+    return relation;
+  }
+
+  /**
+   * A wrapper for an empty SplitRowstore.
+   *
+   * @param attribute_ordering The ordering of the attributes in the represented relation. Attribute #1 is an
+   *                integer attribute, #2 is a double, and #3 is a string.
+   * @param contains_nullable If the relation contains nullable attributes.
+   * @param contains_varlen If the relation contains variable length attributes.
+   */
+  SplitRowWrapper(const std::vector<attribute_id> &attribute_ordering, bool contains_nullable, bool contains_varlen)
+    : contains_nullable_(contains_nullable),
+      contains_varlen_(contains_varlen) {
+    initialize(attribute_ordering);
+  }
+
+  SplitRowWrapper(bool contains_nullable, bool contains_varlen)
+    : contains_nullable_(contains_nullable),
+      contains_varlen_(contains_varlen) {
+    // Make a clone of the Test Block type using the 3 basic attributes.
+    std::vector<attribute_id> attrs;
+    for (attribute_id attr = 0; attr < 3; ++attr) {
+      attrs.push_back(attr);
+    }
+    initialize(attrs);
+  }
+
+  SplitRowStoreTupleStorageSubBlock *operator->() {
+    return tuple_store_.get();
+  }
+
+  const bool contains_nullable_;
+  const bool contains_varlen_;
+
+  std::unique_ptr<CatalogRelation> relation_;
+  std::unique_ptr<TupleStorageSubBlockDescription> tuple_store_description_;
+  ScopedBuffer tuple_store_memory_;
+  std::unique_ptr<SplitRowStoreTupleStorageSubBlock> tuple_store_;
+
+ private:
+  void initialize(const std::vector<attribute_id> &attribute_ordering) {
+    // Create a sample relation with a variety of attribute types.
+    relation_.reset(GetRelationFromAttributeList(attribute_ordering, contains_nullable_, contains_varlen_));
+
+    tuple_store_description_.reset(new TupleStorageSubBlockDescription());
+    tuple_store_description_->set_sub_block_type(TupleStorageSubBlockDescription::SPLIT_ROW_STORE);
+
+    // Initialize the actual block.
+    tuple_store_memory_.reset(SplitRowStoreTupleStorageSubBlockTest::kSubBlockSize);
+    std::memset(tuple_store_memory_.get(), 0x0, SplitRowStoreTupleStorageSubBlockTest::kSubBlockSize);
+    tuple_store_.reset(new SplitRowStoreTupleStorageSubBlock(*relation_,
+                                                             *tuple_store_description_,
+                                                             true,
+                                                             tuple_store_memory_.get(),
+                                                             SplitRowStoreTupleStorageSubBlockTest::kSubBlockSize));
+  }
+};
+
 TEST_P(SplitRowStoreTupleStorageSubBlockTest, DescriptionIsValidTest) {
   // The descriptions we use for the other tests (which includes nullable and
   // variable-length attributes) should be valid.
@@ -458,37 +608,37 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertTest) {
   const std::size_t max_tuple_capacity = getTupleStorageSize() / getTupleSlotSize();
 
   NativeColumnVector *int_vector = new NativeColumnVector(
-      relation_->getAttributeById(0)->getType(),
-      max_tuple_capacity);
+    relation_->getAttributeById(0)->getType(),
+    max_tuple_capacity);
   NativeColumnVector *double_vector = new NativeColumnVector(
-      relation_->getAttributeById(1)->getType(),
-      max_tuple_capacity);
+    relation_->getAttributeById(1)->getType(),
+    max_tuple_capacity);
   ColumnVector *string_vector = testVariableLength() ?
-      static_cast<ColumnVector*>(new IndirectColumnVector(
-          relation_->getAttributeById(2)->getType(),
-          max_tuple_capacity))
-      : static_cast<ColumnVector*>(new NativeColumnVector(
-          relation_->getAttributeById(2)->getType(),
-          max_tuple_capacity));
+                                static_cast<ColumnVector*>(new IndirectColumnVector(
+                                  relation_->getAttributeById(2)->getType(),
+                                  max_tuple_capacity))
+                                                     : static_cast<ColumnVector*>(new NativeColumnVector(
+      relation_->getAttributeById(2)->getType(),
+      max_tuple_capacity));
 
   std::size_t storage_used = 0;
   int current_tuple_idx = 0;
   for (;;) {
     Tuple current_tuple(createSampleTuple(current_tuple_idx));
     const std::size_t current_tuple_storage_bytes
-        = getTupleSlotSize()
-          + (testVariableLength() ? (current_tuple.getAttributeValue(2).isNull() ?
-                                     0 : current_tuple.getAttributeValue(2).getDataSize())
-                                  : 0);
+      = getTupleSlotSize()
+        + (testVariableLength() ? (current_tuple.getAttributeValue(2).isNull() ?
+                                   0 : current_tuple.getAttributeValue(2).getDataSize())
+                                : 0);
     if (storage_used + current_tuple_storage_bytes <= getTupleStorageSize()) {
       int_vector->appendTypedValue(current_tuple.getAttributeValue(0));
       double_vector->appendTypedValue(current_tuple.getAttributeValue(1));
       if (testVariableLength()) {
         static_cast<IndirectColumnVector*>(string_vector)
-            ->appendTypedValue(current_tuple.getAttributeValue(2));
+          ->appendTypedValue(current_tuple.getAttributeValue(2));
       } else {
         static_cast<NativeColumnVector*>(string_vector)
-            ->appendTypedValue(current_tuple.getAttributeValue(2));
+          ->appendTypedValue(current_tuple.getAttributeValue(2));
       }
 
       storage_used += current_tuple_storage_bytes;
@@ -505,16 +655,21 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertTest) {
 
   // Actually do the bulk-insert.
   accessor.beginIteration();
-  EXPECT_EQ(current_tuple_idx, tuple_store_->bulkInsertTuples(&accessor));
-  EXPECT_TRUE(accessor.iterationFinished());
-
-  // Shouldn't be able to insert any more tuples.
-  accessor.beginIteration();
-  EXPECT_EQ(0, tuple_store_->bulkInsertTuples(&accessor));
+  tuple_id num_inserted = tuple_store_->bulkInsertTuples(&accessor);
+  if (testVariableLength()) {
+    EXPECT_LE(current_tuple_idx - num_inserted, getInsertLowerBoundThreshold());
+  } else {
+    EXPECT_EQ(current_tuple_idx, num_inserted);
+    ASSERT_TRUE(accessor.iterationFinished());
+    // Shouldn't be able to insert any more tuples.
+    accessor.beginIteration();
+    tuple_id num_inserted_second_round = tuple_store_->bulkInsertTuples(&accessor);
+    ASSERT_EQ(0, num_inserted_second_round);
+  }
 
   tuple_store_->rebuild();
-  EXPECT_EQ(current_tuple_idx, tuple_store_->numTuples());
-  EXPECT_EQ(current_tuple_idx - 1, tuple_store_->getMaxTupleID());
+  EXPECT_EQ(num_inserted, tuple_store_->numTuples());
+  EXPECT_EQ(num_inserted - 1, tuple_store_->getMaxTupleID());
 
   // Check the inserted values.
   ASSERT_TRUE(tuple_store_->isPacked());
@@ -525,6 +680,146 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertTest) {
   }
 }
 
+TEST_P(SplitRowStoreTupleStorageSubBlockTest, PartialBulkInsertTest) {
+  // Build up a ColumnVectorsValueAccessor to bulk-insert from. We'll reserve
+  // enough space for the maximum possible number of tuples in the block, even
+  // though we won't use all of it if testVariableLength() is true.
+  const std::size_t max_tuple_capacity = getTupleStorageSize() / getTupleSlotSize();
+
+  NativeColumnVector *int_vector = new NativeColumnVector(
+    relation_->getAttributeById(0)->getType(),
+    max_tuple_capacity);
+  NativeColumnVector *double_vector = new NativeColumnVector(
+    relation_->getAttributeById(1)->getType(),
+    max_tuple_capacity);
+  ColumnVector *string_vector = testVariableLength() ?
+                                static_cast<ColumnVector *>(new IndirectColumnVector(
+                                  relation_->getAttributeById(2)->getType(),
+                                  max_tuple_capacity))
+                                                     : static_cast<ColumnVector *>(new NativeColumnVector(
+      relation_->getAttributeById(2)->getType(),
+      max_tuple_capacity));
+
+  const int max_tuples_insert = 1000;
+  for (int tuple_idx = 0; tuple_idx < max_tuples_insert; ++tuple_idx) {
+    Tuple current_tuple(createSampleTuple(tuple_idx));
+    int_vector->appendTypedValue(current_tuple.getAttributeValue(0));
+    double_vector->appendTypedValue(current_tuple.getAttributeValue(1));
+    if (testVariableLength()) {
+      static_cast<IndirectColumnVector *>(string_vector)
+        ->appendTypedValue(current_tuple.getAttributeValue(2));
+    } else {
+      static_cast<NativeColumnVector *>(string_vector)
+        ->appendTypedValue(current_tuple.getAttributeValue(2));
+    }
+  }
+
+  std::vector<attribute_id> attr_map_pt1 = {kInvalidCatalogId, 0, kInvalidCatalogId};
+  std::vector<attribute_id> attr_map_pt2 = {0, kInvalidCatalogId, 1};
+
+  ColumnVectorsValueAccessor accessor_pt1;
+  accessor_pt1.addColumn(double_vector);
+
+  ColumnVectorsValueAccessor accessor_pt2;
+  accessor_pt2.addColumn(int_vector);
+  accessor_pt2.addColumn(string_vector);
+
+
+  // Actually do the bulk-insert.
+  accessor_pt1.beginIteration();
+  const tuple_id num_inserted_pt1 = tuple_store_->bulkInsertPartialTuples(attr_map_pt1, &accessor_pt1, kCatalogMaxID);
+  ASSERT_GT(num_inserted_pt1, 0);
+  const tuple_id num_inserted_pt2 = tuple_store_->bulkInsertPartialTuples(attr_map_pt2, &accessor_pt2,
+                                                                          num_inserted_pt1);
+  ASSERT_EQ(num_inserted_pt1, num_inserted_pt2);
+
+  tuple_store_->bulkInsertPartialTuplesFinalize(num_inserted_pt1);
+  ASSERT_EQ(max_tuples_insert, tuple_store_->getMaxTupleID() + 1);
+  ASSERT_EQ(num_inserted_pt1, tuple_store_->getMaxTupleID() + 1);
+  EXPECT_TRUE(accessor_pt2.iterationFinished());
+
+  tuple_store_->rebuild();
+
+  // Should be the same order as if we inserted them serially.
+  ASSERT_TRUE(tuple_store_->isPacked());
+  for (tuple_id tid = 0;
+       tid <= tuple_store_->getMaxTupleID();
+       ++tid) {
+    checkTupleValuesUntyped(tid, tid);
+  }
+}
+
+TEST_P(SplitRowStoreTupleStorageSubBlockTest, GetCopyGroupsForAttributeMapTest) {
+  const bool nullable_attrs = testNullable();
+  std::vector<attribute_id> relation_attrs = {
+    SplitRowWrapper::AttrType::kInt,
+    SplitRowWrapper::AttrType::kInt,
+    SplitRowWrapper::AttrType::kInt,
+    SplitRowWrapper::AttrType::kString,
+    SplitRowWrapper::AttrType::kString,
+    SplitRowWrapper::AttrType::kString};
+  SplitRowWrapper dst_store(relation_attrs, nullable_attrs, testVariableLength());
+  std::vector<attribute_id> attr_map = { kInvalidCatalogId, 0, 1, kInvalidCatalogId, 2, 1 };
+  CopyGroupList copy_groups;
+  dst_store->getCopyGroupsForAttributeMap(attr_map, &copy_groups);
+
+  std::vector<ContiguousAttrs>& contiguous_attrs = copy_groups.contiguous_attrs_;
+  std::vector<VarLenAttr>& varlen_attrs = copy_groups.varlen_attrs_;
+
+  const std::size_t size_of_string = dst_store->getRelation().getAttributeById(3)->getType().maximumByteLength();
+
+  // Fixed length attributes.
+  EXPECT_EQ(0, contiguous_attrs[0].src_attr_id_);
+  EXPECT_EQ(4, contiguous_attrs[0].bytes_to_advance_);
+  EXPECT_EQ(4, contiguous_attrs[0].bytes_to_copy_);
+
+  EXPECT_EQ(1, contiguous_attrs[1].src_attr_id_);
+  EXPECT_EQ(4, contiguous_attrs[1].bytes_to_advance_);
+  EXPECT_EQ(4, contiguous_attrs[1].bytes_to_copy_);
+
+  if (testVariableLength()) {
+    ASSERT_EQ(2, contiguous_attrs.size());
+    ASSERT_EQ(2, varlen_attrs.size());
+
+    EXPECT_EQ(2, varlen_attrs[0].src_attr_id_);
+    EXPECT_EQ(sizeof(int) + SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize, varlen_attrs[0].bytes_to_advance_);
+
+    EXPECT_EQ(1, varlen_attrs[1].src_attr_id_);
+    EXPECT_EQ(SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize, varlen_attrs[1].bytes_to_advance_);
+
+  } else {
+    ASSERT_EQ(4, copy_groups.contiguous_attrs_.size());
+    ASSERT_EQ(0, copy_groups.varlen_attrs_.size());
+
+    EXPECT_EQ(2, contiguous_attrs[2].src_attr_id_);
+    EXPECT_EQ(4 + size_of_string, contiguous_attrs[2].bytes_to_advance_);
+    EXPECT_EQ(size_of_string, contiguous_attrs[2].bytes_to_copy_);
+  }
+
+  int null_count =  copy_groups.nullable_attrs_.size();
+  if (testNullable()) {
+    // The relation contains 6 nullable attributes, but only 3 are inserted.
+    EXPECT_EQ(4, null_count);
+  } else {
+    EXPECT_EQ(0, null_count);
+  }
+
+  // test that merging works.
+  copy_groups.merge_contiguous();
+  EXPECT_EQ(0, contiguous_attrs[0].src_attr_id_);
+  EXPECT_EQ(4, contiguous_attrs[0].bytes_to_advance_);
+
+  if (testVariableLength()) {
+    EXPECT_EQ(1, contiguous_attrs.size());
+    EXPECT_EQ(sizeof(int) * 2 + SplitRowStoreTupleStorageSubBlock::kVarLenSlotSize,
+              varlen_attrs[0].bytes_to_advance_);
+  } else {
+    EXPECT_EQ(3, contiguous_attrs.size());
+    EXPECT_EQ(8, contiguous_attrs[0].bytes_to_copy_);
+    EXPECT_EQ(8 + size_of_string, contiguous_attrs[1].bytes_to_advance_);
+  }
+}
+
 TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertWithRemappedAttributesTest) {
   // This is similar to the above test, but we will reverse the order of the
   // ColumnVectors in the ColumnVectorsValueAccessor and remap them back to the
@@ -551,25 +846,26 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertWithRemappedAttributesTe
 
   std::size_t storage_used = 0;
   int current_tuple_idx = 0;
+  std::size_t tuple_max_size = relation_->getMaximumByteLength();
+  std::size_t tuple_slot_size = getTupleSlotSize();
   for (;;) {
     Tuple current_tuple(createSampleTuple(current_tuple_idx));
-    const std::size_t current_tuple_storage_bytes
-        = getTupleSlotSize()
-          + (testVariableLength() ? (current_tuple.getAttributeValue(2).isNull() ?
-                                     0 : current_tuple.getAttributeValue(2).getDataSize())
-                                  : 0);
-    if (storage_used + current_tuple_storage_bytes <= getTupleStorageSize()) {
+    if ((getTupleStorageSize() - storage_used) / tuple_max_size > 0) {
       int_vector->appendTypedValue(current_tuple.getAttributeValue(0));
       double_vector->appendTypedValue(current_tuple.getAttributeValue(1));
       if (testVariableLength()) {
         static_cast<IndirectColumnVector*>(string_vector)
-            ->appendTypedValue(current_tuple.getAttributeValue(2));
+          ->appendTypedValue(current_tuple.getAttributeValue(2));
       } else {
         static_cast<NativeColumnVector*>(string_vector)
-            ->appendTypedValue(current_tuple.getAttributeValue(2));
+          ->appendTypedValue(current_tuple.getAttributeValue(2));
+      }
+
+      storage_used += tuple_slot_size;
+      if (testVariableLength() && !current_tuple.getAttributeValue(2).isNull()) {
+        storage_used += current_tuple.getAttributeValue(2).getDataSize();
       }
 
-      storage_used += current_tuple_storage_bytes;
       ++current_tuple_idx;
     } else {
       break;
@@ -588,18 +884,21 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, BulkInsertWithRemappedAttributesTe
 
   // Actually do the bulk-insert.
   accessor.beginIteration();
-  EXPECT_EQ(current_tuple_idx,
-            tuple_store_->bulkInsertTuplesWithRemappedAttributes(attribute_map, &accessor));
-  EXPECT_TRUE(accessor.iterationFinished());
-
-  // Shouldn't be able to insert any more tuples.
-  accessor.beginIteration();
-  EXPECT_EQ(0,
-            tuple_store_->bulkInsertTuplesWithRemappedAttributes(attribute_map, &accessor));
+  tuple_id num_inserted = tuple_store_->bulkInsertTuplesWithRemappedAttributes(attribute_map, &accessor);
+  if (testVariableLength()) {
+    EXPECT_LE(current_tuple_idx - num_inserted, getInsertLowerBoundThreshold());
+  } else {
+    EXPECT_EQ(current_tuple_idx, num_inserted);
+    ASSERT_TRUE(accessor.iterationFinished());
+    // Shouldn't be able to insert any more tuples.
+    accessor.beginIteration();
+    tuple_id num_inserted_second_round = tuple_store_->bulkInsertTuplesWithRemappedAttributes(attribute_map, &accessor);
+    ASSERT_EQ(0, num_inserted_second_round);
+  }
 
   tuple_store_->rebuild();
-  EXPECT_EQ(current_tuple_idx, tuple_store_->numTuples());
-  EXPECT_EQ(current_tuple_idx - 1, tuple_store_->getMaxTupleID());
+  EXPECT_EQ(num_inserted, tuple_store_->numTuples());
+  EXPECT_EQ(num_inserted - 1, tuple_store_->getMaxTupleID());
 
   // Check the inserted values.
   ASSERT_TRUE(tuple_store_->isPacked());
@@ -632,6 +931,53 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, GetAttributeValueTypedTest) {
   }
 }
 
+TEST_P(SplitRowStoreTupleStorageSubBlockTest, SplitRowToSplitRowTest) {
+  // Test insertion of data from a SplitRow to a SplitRow with no reordering.
+  fillBlockWithSampleData();
+  std::vector<attribute_id> relation_attrs = {
+    SplitRowWrapper::AttrType::kInt,
+    SplitRowWrapper::AttrType::kDouble,
+    SplitRowWrapper::AttrType::kDouble,
+    SplitRowWrapper::AttrType::kInt,
+    SplitRowWrapper::AttrType::kString,
+    SplitRowWrapper::AttrType::kString,
+    SplitRowWrapper::AttrType::kString};
+  SplitRowWrapper dst_store(relation_attrs, testNullable(), testVariableLength());
+
+  std::vector<attribute_id> attribute_map = {0, kInvalidCatalogId, 1, 0, 2, kInvalidCatalogId, 2};
+
+  std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor());
+  ASSERT_EQ(ValueAccessor::Implementation::kSplitRowStore,
+            accessor->getImplementationType());
+  ASSERT_FALSE(accessor->isTupleIdSequenceAdapter());
+
+  SplitRowStoreValueAccessor &cast_accessor = static_cast<SplitRowStoreValueAccessor &>(*accessor);
+  std::size_t num_inserted = dst_store->bulkInsertPartialTuples(attribute_map, &cast_accessor, kCatalogMaxID);
+  attribute_map = {kInvalidCatalogId, 1, kInvalidCatalogId, kInvalidCatalogId, kInvalidCatalogId, 2, kInvalidCatalogId};
+  cast_accessor.beginIteration();
+  dst_store->bulkInsertPartialTuples(attribute_map, &cast_accessor, num_inserted);
+  dst_store->bulkInsertPartialTuplesFinalize(num_inserted);
+
+  EXPECT_EQ(num_inserted - 1, dst_store->getMaxTupleID());
+  // The inserted relation should hold roughly 1/3 the tuples of the src. The more varlen
+  // attributes, the fewer the relation will accept due to how it estimates.
+  EXPECT_LT(0.15 * tuple_store_->getMaxTupleID(), dst_store->getMaxTupleID());
+  EXPECT_GT(0.5 * tuple_store_->getMaxTupleID(), dst_store->getMaxTupleID());
+
+  attribute_map = {0, 1, 4};
+  for (tuple_id tid = 0; tid < dst_store->getMaxTupleID(); ++tid) {
+    for (attribute_id aid = 0; aid < tuple_store_->getRelation().getMaxAttributeId(); ++aid) {
+      const TypedValue &dst_value = dst_store->getAttributeValueTyped(tid, attribute_map[aid]);
+      const TypedValue &src_value = tuple_store_->getAttributeValueTyped(tid, aid);
+      if (src_value.isNull() || dst_value.isNull()) {
+        EXPECT_TRUE(src_value.isNull() && dst_value.isNull());
+      } else {
+        EXPECT_TRUE(src_value.fastEqualCheck(dst_value));
+      }
+    }
+  }
+}
+
 TEST_P(SplitRowStoreTupleStorageSubBlockTest, ValueAccessorTest) {
   fillBlockWithSampleData();
 
@@ -721,7 +1067,7 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, SetAttributeValueTypedTest) {
     // It's also OK to replace a variable-length value with a shorter value, or
     // with null.
     std::unordered_map<attribute_id, TypedValue> variable_new_values;
-    variable_new_values.emplace(2, VarCharType::InstanceNonNullable(26).makeValue("x", 2));
+    variable_new_values.emplace(2, VarCharType::InstanceNonNullable(kVarLenSize).makeValue("x", 2));
     ASSERT_TRUE(tuple_store_->canSetAttributeValuesInPlaceTyped(33, variable_new_values));
     tuple_store_->setAttributeValueInPlaceTyped(33, 2, variable_new_values[2]);
     EXPECT_STREQ("x", static_cast<const char*>(tuple_store_->getAttributeValue(33, 2)));
@@ -747,13 +1093,14 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, SetAttributeValueTypedTest) {
     EXPECT_TRUE(tuple_store_->insertTupleInBatch(createSampleTuple(0)));
     tuple_store_->rebuild();
 
-    variable_new_values[2] = VarCharType::InstanceNonNullable(26).makeValue("hello world", 12);
+    variable_new_values[2] = VarCharType::InstanceNonNullable(kVarLenSize).makeValue("hello world", 12);
     ASSERT_TRUE(tuple_store_->canSetAttributeValuesInPlaceTyped(0, variable_new_values));
     tuple_store_->setAttributeValueInPlaceTyped(0, 2, variable_new_values[2]);
     EXPECT_STREQ("hello world", static_cast<const char*>(tuple_store_->getAttributeValue(0, 2)));
   }
 }
 
+
 TEST_P(SplitRowStoreTupleStorageSubBlockTest, DeleteAndRebuildTest) {
   fillBlockWithSampleData();
   ASSERT_TRUE(tuple_store_->isPacked());
@@ -806,7 +1153,7 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, DeleteAndRebuildTest) {
       reinsert_attr_values.emplace_back(testVariableLength() ? kVarChar : kChar);
     } else {
       reinsert_attr_values.emplace_back(
-          CharType::InstanceNonNullable(26).makeValue("foo", 4));
+          CharType::InstanceNonNullable(kVarLenSize).makeValue("foo", 4));
       reinsert_attr_values.back().ensureNotReference();
     }
     Tuple reinsert_tuple(std::move(reinsert_attr_values));
@@ -831,7 +1178,7 @@ TEST_P(SplitRowStoreTupleStorageSubBlockTest, DeleteAndRebuildTest) {
     std::vector<TypedValue> extra_variable_attr_values;
     extra_variable_attr_values.emplace_back(-123);
     extra_variable_attr_values.emplace_back(static_cast<double>(-100.5));
-    extra_variable_attr_values.emplace_back((VarCharType::InstanceNonNullable(26).makeValue(
+    extra_variable_attr_values.emplace_back((VarCharType::InstanceNonNullable(kVarLenSize).makeValue(
         kExtraVarCharValue,
         27)));
     extra_variable_tuple = Tuple(std::move(extra_variable_attr_values));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d3a09205/utility/BitVector.hpp
----------------------------------------------------------------------
diff --git a/utility/BitVector.hpp b/utility/BitVector.hpp
index bb76315..2b56b8c 100644
--- a/utility/BitVector.hpp
+++ b/utility/BitVector.hpp
@@ -183,6 +183,20 @@ class BitVector {
   }
 
   /**
+   * @brief Assign this BitVector's contents to the pointed-to memory.
+   *
+   * @warning caller is responsible for ensuring the Bitvector has the correct
+   *          ownership and size.
+   *
+   * @param ptr Pointer to data representing a BitVector with the same parameters
+   *            as this BitVector.
+   **/
+  inline void setMemory(void *ptr) {
+    DCHECK(!owned_);
+    this->data_array_ = static_cast<std::size_t*>(ptr);
+  }
+
+  /**
    * @brief Similar to assignFrom(), but the other BitVector to assign from is
    *        allowed to be longer than this one.
    * @warning Only available when enable_short_version is false.


[5/7] incubator-quickstep git commit: Clean up the old bloom-filter implementation which are attached to HashTables.

Posted by hb...@apache.org.
Clean up the old bloom-filter implementation which are attached to HashTables.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/55480d8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/55480d8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/55480d8d

Branch: refs/heads/partitioned-aggregate-new
Commit: 55480d8d1719b5ff97b9562d53ddc63ba1c4d93d
Parents: 2ee5c1c
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Wed Sep 7 13:20:43 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Tue Oct 18 15:18:48 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt   |   1 -
 query_execution/QueryContext.cpp |   8 +-
 query_execution/QueryContext.hpp |  53 ------------
 storage/CMakeLists.txt           |   2 -
 storage/FastHashTable.hpp        | 152 +++++-----------------------------
 storage/FastHashTableFactory.hpp |  35 +-------
 storage/HashTable.hpp            | 103 -----------------------
 storage/HashTable.proto          |   6 --
 storage/HashTableFactory.hpp     |  34 +-------
 9 files changed, 23 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/55480d8d/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 6a84be1..dafdea4 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -189,7 +189,6 @@ target_link_libraries(quickstep_queryexecution_QueryContext
                       quickstep_storage_WindowAggregationOperationState
                       quickstep_types_TypedValue
                       quickstep_types_containers_Tuple
-                      quickstep_utility_BloomFilter
                       quickstep_utility_Macros
                       quickstep_utility_SortConfiguration)
 target_link_libraries(quickstep_queryexecution_QueryContext_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/55480d8d/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 2572e18..6612611 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -39,7 +39,6 @@
 #include "storage/InsertDestination.pb.h"
 #include "types/TypedValue.hpp"
 #include "types/containers/Tuple.hpp"
-#include "utility/BloomFilter.hpp"
 #include "utility/SortConfiguration.hpp"
 
 #include "glog/logging.h"
@@ -68,10 +67,6 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
                                                         storage_manager));
   }
 
-  for (int i = 0; i < proto.bloom_filters_size(); ++i) {
-    bloom_filters_.emplace_back(new BloomFilter(proto.bloom_filters(i)));
-  }
-
   for (int i = 0; i < proto.generator_functions_size(); ++i) {
     const GeneratorFunctionHandle *func_handle =
         GeneratorFunctionFactory::Instance().reconstructFromProto(proto.generator_functions(i));
@@ -83,8 +78,7 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
   for (int i = 0; i < proto.join_hash_tables_size(); ++i) {
     join_hash_tables_.emplace_back(
         JoinHashTableFactory::CreateResizableFromProto(proto.join_hash_tables(i),
-                                                       storage_manager,
-                                                       bloom_filters_));
+                                                       storage_manager));
   }
 
   for (int i = 0; i < proto.insert_destinations_size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/55480d8d/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 393b55e..78794f1 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -35,7 +35,6 @@
 #include "storage/InsertDestination.hpp"
 #include "storage/WindowAggregationOperationState.hpp"
 #include "types/containers/Tuple.hpp"
-#include "utility/BloomFilter.hpp"
 #include "utility/Macros.hpp"
 #include "utility/SortConfiguration.hpp"
 
@@ -67,11 +66,6 @@ class QueryContext {
   typedef std::uint32_t aggregation_state_id;
 
   /**
-   * @brief A unique identifier for a BloomFilter per query.
-   **/
-  typedef std::uint32_t bloom_filter_id;
-
-  /**
    * @brief A unique identifier for a GeneratorFunctionHandle per query.
    **/
   typedef std::uint32_t generator_function_id;
@@ -193,52 +187,6 @@ class QueryContext {
   }
 
   /**
-   * @brief Whether the given BloomFilter id is valid.
-   *
-   * @param id The BloomFilter id.
-   *
-   * @return True if valid, otherwise false.
-   **/
-  bool isValidBloomFilterId(const bloom_filter_id id) const {
-    return id < bloom_filters_.size();
-  }
-
-  /**
-   * @brief Get a mutable reference to the BloomFilter.
-   *
-   * @param id The BloomFilter id.
-   *
-   * @return The BloomFilter, already created in the constructor.
-   **/
-  inline BloomFilter* getBloomFilterMutable(const bloom_filter_id id) {
-    DCHECK_LT(id, bloom_filters_.size());
-    return bloom_filters_[id].get();
-  }
-
-  /**
-   * @brief Get a constant pointer to the BloomFilter.
-   *
-   * @param id The BloomFilter id.
-   *
-   * @return The constant pointer to BloomFilter that is
-   *         already created in the constructor.
-   **/
-  inline const BloomFilter* getBloomFilter(const bloom_filter_id id) const {
-    DCHECK_LT(id, bloom_filters_.size());
-    return bloom_filters_[id].get();
-  }
-
-  /**
-   * @brief Destory the given BloomFilter.
-   *
-   * @param id The id of the BloomFilter to destroy.
-   **/
-  inline void destroyBloomFilter(const bloom_filter_id id) {
-    DCHECK_LT(id, bloom_filters_.size());
-    bloom_filters_[id].reset();
-  }
-
-  /**
    * @brief Whether the given GeneratorFunctionHandle id is valid.
    *
    * @param id The GeneratorFunctionHandle id.
@@ -507,7 +455,6 @@ class QueryContext {
 
  private:
   std::vector<std::unique_ptr<AggregationOperationState>> aggregation_states_;
-  std::vector<std::unique_ptr<BloomFilter>> bloom_filters_;
   std::vector<std::unique_ptr<const GeneratorFunctionHandle>> generator_functions_;
   std::vector<std::unique_ptr<InsertDestination>> insert_destinations_;
   std::vector<std::unique_ptr<JoinHashTable>> join_hash_tables_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/55480d8d/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index f05cc46..e85e005 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -643,7 +643,6 @@ target_link_libraries(quickstep_storage_FastHashTable
                       quickstep_threading_SpinSharedMutex
                       quickstep_types_Type
                       quickstep_types_TypedValue
-                      quickstep_utility_BloomFilter
                       quickstep_utility_HashPair
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_FastHashTableFactory
@@ -659,7 +658,6 @@ target_link_libraries(quickstep_storage_FastHashTableFactory
                       quickstep_storage_SimpleScalarSeparateChainingHashTable
                       quickstep_storage_TupleReference
                       quickstep_types_TypeFactory
-                      quickstep_utility_BloomFilter
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_FastSeparateChainingHashTable
                       quickstep_storage_FastHashTable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/55480d8d/storage/FastHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTable.hpp b/storage/FastHashTable.hpp
index 4a95cd9..74d9ee3 100644
--- a/storage/FastHashTable.hpp
+++ b/storage/FastHashTable.hpp
@@ -39,7 +39,6 @@
 #include "threading/SpinSharedMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
-#include "utility/BloomFilter.hpp"
 #include "utility/HashPair.hpp"
 #include "utility/Macros.hpp"
 
@@ -958,62 +957,6 @@ class FastHashTable : public HashTableBase<resizable,
   template <typename FunctorT>
   std::size_t forEachCompositeKeyFast(FunctorT *functor, int index) const;
 
-  /**
-   * @brief A call to this function will cause a bloom filter to be built
-   *        during the build phase of this hash table.
-   **/
-  inline void enableBuildSideBloomFilter() {
-    has_build_side_bloom_filter_ = true;
-  }
-
-  /**
-   * @brief A call to this function will cause a set of bloom filters to be
-   *        probed during the probe phase of this hash table.
-   **/
-  inline void enableProbeSideBloomFilter() {
-    has_probe_side_bloom_filter_ = true;
-  }
-
-  /**
-   * @brief This function sets the pointer to the bloom filter to be
-   *        used during the build phase of this hash table.
-   * @warning Should call enable_build_side_bloom_filter() first to enable
-   *          bloom filter usage during build phase.
-   * @note The ownership of the bloom filter lies with the caller.
-   *
-   * @param bloom_filter The pointer to the bloom filter.
-   **/
-  inline void setBuildSideBloomFilter(BloomFilter *bloom_filter) {
-    build_bloom_filter_ = bloom_filter;
-  }
-
-  /**
-   * @brief This function adds a pointer to the list of bloom filters to be
-   *        used during the probe phase of this hash table.
-   * @warning Should call enable_probe_side_bloom_filter() first to enable
-   *          bloom filter usage during probe phase.
-   * @note The ownership of the bloom filter lies with the caller.
-   *
-   * @param bloom_filter The pointer to the bloom filter.
-   **/
-  inline void addProbeSideBloomFilter(const BloomFilter *bloom_filter) {
-    probe_bloom_filters_.emplace_back(bloom_filter);
-  }
-
-  /**
-   * @brief This function adds a vector of attribute ids corresponding to a
-   *        bloom filter used during the probe phase of this hash table.
-   * @warning Should call enable_probe_side_bloom_filter() first to enable
-   *          bloom filter usage during probe phase.
-   *
-   * @param probe_attribute_ids The vector of attribute ids to use for probing
-   *        the bloom filter.
-   **/
-  inline void addProbeSideAttributeIds(
-      std::vector<attribute_id> &&probe_attribute_ids) {
-    probe_attribute_ids_.push_back(probe_attribute_ids);
-  }
-
  protected:
   /**
    * @brief Constructor for new resizable hash table.
@@ -1318,12 +1261,6 @@ class FastHashTable : public HashTableBase<resizable,
                                    const attribute_id key_attr_id,
                                    FunctorT *functor) const;
 
-  // Data structures used for bloom filter optimized semi-joins.
-  bool has_build_side_bloom_filter_ = false;
-  bool has_probe_side_bloom_filter_ = false;
-  BloomFilter *build_bloom_filter_;
-  std::vector<const BloomFilter *> probe_bloom_filters_;
-  std::vector<std::vector<attribute_id>> probe_attribute_ids_;
   DISALLOW_COPY_AND_ASSIGN(FastHashTable);
 };
 
@@ -1449,13 +1386,6 @@ FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>::
                 total_entries, total_variable_key_size, &prealloc_state);
           }
         }
-        std::unique_ptr<BloomFilter> thread_local_bloom_filter;
-        if (has_build_side_bloom_filter_) {
-          thread_local_bloom_filter.reset(
-              new BloomFilter(build_bloom_filter_->getRandomSeed(),
-                              build_bloom_filter_->getNumberOfHashes(),
-                              build_bloom_filter_->getBitArraySize()));
-        }
         if (resizable) {
           while (result == HashTablePutResult::kOutOfSpace) {
             {
@@ -1474,12 +1404,6 @@ FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>::
                     variable_size,
                     (*functor)(*accessor),
                     using_prealloc ? &prealloc_state : nullptr);
-                // Insert into bloom filter, if enabled.
-                if (has_build_side_bloom_filter_) {
-                  thread_local_bloom_filter->insertUnSafe(
-                      static_cast<const std::uint8_t *>(key.getDataPtr()),
-                      key.getDataSize());
-                }
                 if (result == HashTablePutResult::kDuplicateKey) {
                   DEBUG_ASSERT(!using_prealloc);
                   return result;
@@ -1507,22 +1431,11 @@ FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>::
                                   variable_size,
                                   (*functor)(*accessor),
                                   using_prealloc ? &prealloc_state : nullptr);
-            // Insert into bloom filter, if enabled.
-            if (has_build_side_bloom_filter_) {
-              thread_local_bloom_filter->insertUnSafe(
-                  static_cast<const std::uint8_t *>(key.getDataPtr()),
-                  key.getDataSize());
-            }
             if (result != HashTablePutResult::kOK) {
               return result;
             }
           }
         }
-        // Update the build side bloom filter with thread local copy, if
-        // available.
-        if (has_build_side_bloom_filter_) {
-          build_bloom_filter_->bitwiseOr(thread_local_bloom_filter.get());
-        }
 
         return HashTablePutResult::kOK;
       });
@@ -2462,52 +2375,27 @@ void FastHashTable<resizable,
   InvokeOnAnyValueAccessor(
       accessor,
       [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-        while (accessor->next()) {
-          // Probe any bloom filters, if enabled.
-          if (has_probe_side_bloom_filter_) {
-            DCHECK_EQ(probe_bloom_filters_.size(), probe_attribute_ids_.size());
-            // Check if the key is contained in the BloomFilters or not.
-            bool bloom_miss = false;
-            for (std::size_t i = 0;
-                 i < probe_bloom_filters_.size() && !bloom_miss;
-                 ++i) {
-              const BloomFilter *bloom_filter = probe_bloom_filters_[i];
-              for (const attribute_id &attr_id : probe_attribute_ids_[i]) {
-                TypedValue bloom_key = accessor->getTypedValue(attr_id);
-                if (!bloom_filter->contains(static_cast<const std::uint8_t *>(
-                                                bloom_key.getDataPtr()),
-                                            bloom_key.getDataSize())) {
-                  bloom_miss = true;
-                  break;
-                }
-              }
-            }
-            if (bloom_miss) {
-              continue;  // On a bloom filter miss, probing the hash table can
-                         // be skipped.
-            }
-          }
-
-          TypedValue key = accessor->getTypedValue(key_attr_id);
-          if (check_for_null_keys && key.isNull()) {
-            continue;
-          }
-          const std::size_t true_hash = use_scalar_literal_hash_template
-                                            ? key.getHashScalarLiteral()
-                                            : key.getHash();
-          const std::size_t adjusted_hash =
-              adjust_hashes_template ? this->AdjustHash(true_hash) : true_hash;
-          std::size_t entry_num = 0;
-          const std::uint8_t *value;
-          while (this->getNextEntryForKey(
-              key, adjusted_hash, &value, &entry_num)) {
-            (*functor)(*accessor, *value);
-            if (!allow_duplicate_keys) {
-              break;
-            }
-          }
+    while (accessor->next()) {
+      TypedValue key = accessor->getTypedValue(key_attr_id);
+      if (check_for_null_keys && key.isNull()) {
+        continue;
+      }
+      const std::size_t true_hash = use_scalar_literal_hash_template
+                                        ? key.getHashScalarLiteral()
+                                        : key.getHash();
+      const std::size_t adjusted_hash =
+          adjust_hashes_template ? this->AdjustHash(true_hash) : true_hash;
+      std::size_t entry_num = 0;
+      const std::uint8_t *value;
+      while (this->getNextEntryForKey(
+          key, adjusted_hash, &value, &entry_num)) {
+        (*functor)(*accessor, *value);
+        if (!allow_duplicate_keys) {
+          break;
         }
-      });
+      }
+    }
+  });
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/55480d8d/storage/FastHashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/FastHashTableFactory.hpp b/storage/FastHashTableFactory.hpp
index 6d0b693..682cc2a 100644
--- a/storage/FastHashTableFactory.hpp
+++ b/storage/FastHashTableFactory.hpp
@@ -32,7 +32,6 @@
 #include "storage/SimpleScalarSeparateChainingHashTable.hpp"
 #include "storage/TupleReference.hpp"
 #include "types/TypeFactory.hpp"
-#include "utility/BloomFilter.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -183,14 +182,11 @@ class FastHashTableFactory {
    * @param proto A protobuf description of a resizable HashTable.
    * @param storage_manager The StorageManager to use (a StorageBlob will be
    *        allocated to hold the HashTable's contents).
-   * @param bloom_filters A vector of pointers to bloom filters that may be used
-   *        during hash table construction in build/probe phase.
    * @return A new resizable HashTable with parameters specified by proto.
    **/
   static FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys>*
       CreateResizableFromProto(const serialization::HashTable &proto,
-                               StorageManager *storage_manager,
-                               const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) {
+                               StorageManager *storage_manager) {
     DCHECK(ProtoIsValid(proto))
         << "Attempted to create HashTable from invalid proto description:\n"
         << proto.DebugString();
@@ -204,35 +200,6 @@ class FastHashTableFactory {
                                       key_types,
                                       proto.estimated_num_entries(),
                                       storage_manager);
-
-    // TODO(ssaurabh): These lazy initializations can be moved from here and pushed to the
-    //                 individual implementations of the hash table constructors.
-
-    // Check if there are any build side bloom filter defined on the hash table.
-    if (proto.build_side_bloom_filter_id_size() > 0) {
-      hash_table->enableBuildSideBloomFilter();
-      hash_table->setBuildSideBloomFilter(bloom_filters[proto.build_side_bloom_filter_id(0)].get());
-    }
-
-    // Check if there are any probe side bloom filters defined on the hash table.
-    if (proto.probe_side_bloom_filters_size() > 0) {
-      hash_table->enableProbeSideBloomFilter();
-      // Add as many probe bloom filters as defined by the proto.
-      for (int j = 0; j < proto.probe_side_bloom_filters_size(); ++j) {
-        // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
-        const auto probe_side_bloom_filter = proto.probe_side_bloom_filters(j);
-        hash_table->addProbeSideBloomFilter(bloom_filters[probe_side_bloom_filter.probe_side_bloom_filter_id()].get());
-
-        // Add the attribute ids corresponding to this probe bloom filter.
-        std::vector<attribute_id> probe_attribute_ids;
-        for (int k = 0; k < probe_side_bloom_filter.probe_side_attr_ids_size(); ++k) {
-          const attribute_id probe_attribute_id = probe_side_bloom_filter.probe_side_attr_ids(k);
-          probe_attribute_ids.push_back(probe_attribute_id);
-        }
-        hash_table->addProbeSideAttributeIds(std::move(probe_attribute_ids));
-      }
-    }
-
     return hash_table;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/55480d8d/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index f2dcb03..786a9bb 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -981,61 +981,6 @@ class HashTable : public HashTableBase<resizable,
   template <typename FunctorT>
   std::size_t forEachCompositeKey(FunctorT *functor) const;
 
-  /**
-   * @brief A call to this function will cause a bloom filter to be built
-   *        during the build phase of this hash table.
-   **/
-  inline void enableBuildSideBloomFilter() {
-    has_build_side_bloom_filter_ = true;
-  }
-
-  /**
-   * @brief A call to this function will cause a set of bloom filters to be
-   *        probed during the probe phase of this hash table.
-   **/
-  inline void enableProbeSideBloomFilter() {
-    has_probe_side_bloom_filter_ = true;
-  }
-
-  /**
-   * @brief This function sets the pointer to the bloom filter to be
-   *        used during the build phase of this hash table.
-   * @warning Should call enable_build_side_bloom_filter() first to enable
-   *          bloom filter usage during build phase.
-   * @note The ownership of the bloom filter lies with the caller.
-   *
-   * @param bloom_filter The pointer to the bloom filter.
-   **/
-  inline void setBuildSideBloomFilter(BloomFilter *bloom_filter) {
-    build_bloom_filter_ = bloom_filter;
-  }
-
-  /**
-   * @brief This function adds a pointer to the list of bloom filters to be
-   *        used during the probe phase of this hash table.
-   * @warning Should call enable_probe_side_bloom_filter() first to enable
-   *          bloom filter usage during probe phase.
-   * @note The ownership of the bloom filter lies with the caller.
-   *
-   * @param bloom_filter The pointer to the bloom filter.
-   **/
-  inline void addProbeSideBloomFilter(const BloomFilter *bloom_filter) {
-    probe_bloom_filters_.emplace_back(bloom_filter);
-  }
-
-  /**
-   * @brief This function adds a vector of attribute ids corresponding to a
-   *        bloom filter used during the probe phase of this hash table.
-   * @warning Should call enable_probe_side_bloom_filter() first to enable
-   *          bloom filter usage during probe phase.
-   *
-   * @param probe_attribute_ids The vector of attribute ids to use for probing
-   *        the bloom filter.
-   **/
-  inline void addProbeSideAttributeIds(std::vector<attribute_id> &&probe_attribute_ids) {
-    probe_attribute_ids_.push_back(probe_attribute_ids);
-  }
-
  protected:
   /**
    * @brief Constructor for new resizable hash table.
@@ -1316,13 +1261,6 @@ class HashTable : public HashTableBase<resizable,
                                    const attribute_id key_attr_id,
                                    FunctorT *functor) const;
 
-  // Data structures used for bloom filter optimized semi-joins.
-  bool has_build_side_bloom_filter_ = false;
-  bool has_probe_side_bloom_filter_ = false;
-  BloomFilter *build_bloom_filter_;
-  std::vector<const BloomFilter*> probe_bloom_filters_;
-  std::vector<std::vector<attribute_id>> probe_attribute_ids_;
-
   DISALLOW_COPY_AND_ASSIGN(HashTable);
 };
 
@@ -1467,12 +1405,6 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
                                                         &prealloc_state);
       }
     }
-    std::unique_ptr<BloomFilter> thread_local_bloom_filter;
-    if (has_build_side_bloom_filter_) {
-      thread_local_bloom_filter.reset(new BloomFilter(build_bloom_filter_->getRandomSeed(),
-                                                      build_bloom_filter_->getNumberOfHashes(),
-                                                      build_bloom_filter_->getBitArraySize()));
-    }
     if (resizable) {
       while (result == HashTablePutResult::kOutOfSpace) {
         {
@@ -1488,11 +1420,6 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
                                        variable_size,
                                        (*functor)(*accessor),
                                        using_prealloc ? &prealloc_state : nullptr);
-            // Insert into bloom filter, if enabled.
-            if (has_build_side_bloom_filter_) {
-              thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()),
-                                                      key.getDataSize());
-            }
             if (result == HashTablePutResult::kDuplicateKey) {
               DEBUG_ASSERT(!using_prealloc);
               return result;
@@ -1518,20 +1445,11 @@ HashTablePutResult HashTable<ValueT, resizable, serializable, force_key_copy, al
                                    variable_size,
                                    (*functor)(*accessor),
                                    using_prealloc ? &prealloc_state : nullptr);
-        // Insert into bloom filter, if enabled.
-        if (has_build_side_bloom_filter_) {
-          thread_local_bloom_filter->insertUnSafe(static_cast<const std::uint8_t *>(key.getDataPtr()),
-                                                  key.getDataSize());
-        }
         if (result != HashTablePutResult::kOK) {
           return result;
         }
       }
     }
-    // Update the build side bloom filter with thread local copy, if available.
-    if (has_build_side_bloom_filter_) {
-      build_bloom_filter_->bitwiseOr(thread_local_bloom_filter.get());
-    }
 
     return HashTablePutResult::kOK;
   });
@@ -2237,27 +2155,6 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
       accessor,
       [&](auto *accessor) -> void {  // NOLINT(build/c++11)
     while (accessor->next()) {
-      // Probe any bloom filters, if enabled.
-      if (has_probe_side_bloom_filter_) {
-        DCHECK_EQ(probe_bloom_filters_.size(), probe_attribute_ids_.size());
-        // Check if the key is contained in the BloomFilters or not.
-        bool bloom_miss = false;
-        for (std::size_t i = 0; i < probe_bloom_filters_.size() && !bloom_miss; ++i) {
-          const BloomFilter *bloom_filter = probe_bloom_filters_[i];
-          for (const attribute_id &attr_id : probe_attribute_ids_[i]) {
-            TypedValue bloom_key = accessor->getTypedValue(attr_id);
-            if (!bloom_filter->contains(static_cast<const std::uint8_t*>(bloom_key.getDataPtr()),
-                                        bloom_key.getDataSize())) {
-              bloom_miss = true;
-              break;
-            }
-          }
-        }
-        if (bloom_miss) {
-          continue;  // On a bloom filter miss, probing the hash table can be skipped.
-        }
-      }
-
       TypedValue key = accessor->getTypedValue(key_attr_id);
       if (check_for_null_keys && key.isNull()) {
         continue;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/55480d8d/storage/HashTable.proto
----------------------------------------------------------------------
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index ade30d8..1d4ccb0 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -34,10 +34,4 @@ message HashTable {
   required HashTableImplType hash_table_impl_type = 1;
   repeated Type key_types = 2;
   required uint64 estimated_num_entries = 3;
-  repeated uint32 build_side_bloom_filter_id = 4;
-  message ProbeSideBloomFilter {
-    required uint32 probe_side_bloom_filter_id = 1;
-    repeated uint32 probe_side_attr_ids = 2;
-  }
-  repeated ProbeSideBloomFilter probe_side_bloom_filters = 6;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/55480d8d/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index 40b39de..d690557 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -295,14 +295,11 @@ class HashTableFactory {
    * @param proto A protobuf description of a resizable HashTable.
    * @param storage_manager The StorageManager to use (a StorageBlob will be
    *        allocated to hold the HashTable's contents).
-   * @param bloom_filters A vector of pointers to bloom filters that may be used
-   *        during hash table construction in build/probe phase.
    * @return A new resizable HashTable with parameters specified by proto.
    **/
   static HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>*
       CreateResizableFromProto(const serialization::HashTable &proto,
-                               StorageManager *storage_manager,
-                               const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) {
+                               StorageManager *storage_manager) {
     DCHECK(ProtoIsValid(proto))
         << "Attempted to create HashTable from invalid proto description:\n"
         << proto.DebugString();
@@ -316,35 +313,6 @@ class HashTableFactory {
                                       key_types,
                                       proto.estimated_num_entries(),
                                       storage_manager);
-
-    // TODO(ssaurabh): These lazy initializations can be moved from here and pushed to the
-    //                 individual implementations of the hash table constructors.
-
-    // Check if there are any build side bloom filter defined on the hash table.
-    if (proto.build_side_bloom_filter_id_size() > 0) {
-      hash_table->enableBuildSideBloomFilter();
-      hash_table->setBuildSideBloomFilter(bloom_filters[proto.build_side_bloom_filter_id(0)].get());
-    }
-
-    // Check if there are any probe side bloom filters defined on the hash table.
-    if (proto.probe_side_bloom_filters_size() > 0) {
-      hash_table->enableProbeSideBloomFilter();
-      // Add as many probe bloom filters as defined by the proto.
-      for (int j = 0; j < proto.probe_side_bloom_filters_size(); ++j) {
-        // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
-        const auto probe_side_bloom_filter = proto.probe_side_bloom_filters(j);
-        hash_table->addProbeSideBloomFilter(bloom_filters[probe_side_bloom_filter.probe_side_bloom_filter_id()].get());
-
-        // Add the attribute ids corresponding to this probe bloom filter.
-        std::vector<attribute_id> probe_attribute_ids;
-        for (int k = 0; k < probe_side_bloom_filter.probe_side_attr_ids_size(); ++k) {
-          const attribute_id probe_attribute_id = probe_side_bloom_filter.probe_side_attr_ids(k);
-          probe_attribute_ids.push_back(probe_attribute_id);
-        }
-        hash_table->addProbeSideAttributeIds(std::move(probe_attribute_ids));
-      }
-    }
-
     return hash_table;
   }