You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/10/26 14:10:55 UTC

[1/3] incubator-quickstep git commit: Add backend support for LIPFilters. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/partitioned-aggregate-new 4089a21c0 -> 1e1434ad4 (forced update)


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index ec5990f..dd3e19d 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -340,7 +340,7 @@ void StorageBlock::sample(const bool is_block_sample,
 }
 
 void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
-                          const Predicate *predicate,
+                          const TupleIdSequence *filter,
                           InsertDestinationInterface *destination) const {
   ColumnVectorsValueAccessor temp_result;
   {
@@ -348,14 +348,8 @@ void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
                                       indices_,
                                       indices_consistent_);
 
-    std::unique_ptr<TupleIdSequence> matches;
-    std::unique_ptr<ValueAccessor> accessor;
-    if (predicate == nullptr) {
-      accessor.reset(tuple_store_->createValueAccessor());
-    } else {
-      matches.reset(getMatchesForPredicate(predicate));
-      accessor.reset(tuple_store_->createValueAccessor(matches.get()));
-    }
+    std::unique_ptr<ValueAccessor> accessor(
+        tuple_store_->createValueAccessor(filter));
 
     for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection.begin();
          selection_cit != selection.end();
@@ -370,16 +364,10 @@ void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
 }
 
 void StorageBlock::selectSimple(const std::vector<attribute_id> &selection,
-                                const Predicate *predicate,
+                                const TupleIdSequence *filter,
                                 InsertDestinationInterface *destination) const {
-  std::unique_ptr<ValueAccessor> accessor;
-  std::unique_ptr<TupleIdSequence> matches;
-  if (predicate == nullptr) {
-    accessor.reset(tuple_store_->createValueAccessor());
-  } else {
-    matches.reset(getMatchesForPredicate(predicate));
-    accessor.reset(tuple_store_->createValueAccessor(matches.get()));
-  }
+  std::unique_ptr<ValueAccessor> accessor(
+      tuple_store_->createValueAccessor(filter));
 
   destination->bulkInsertTuplesWithRemappedAttributes(selection,
                                                       accessor.get());
@@ -389,37 +377,28 @@ AggregationState* StorageBlock::aggregate(
     const AggregationHandle &handle,
     const std::vector<std::unique_ptr<const Scalar>> &arguments,
     const std::vector<attribute_id> *arguments_as_attributes,
-    const Predicate *predicate,
-    std::unique_ptr<TupleIdSequence> *reuse_matches) const {
-  // If there is a filter predicate that hasn't already been evaluated,
-  // evaluate it now and save the results for other aggregates on this same
-  // block.
-  if (predicate && !*reuse_matches) {
-    reuse_matches->reset(getMatchesForPredicate(predicate));
-  }
-
+    const TupleIdSequence *filter) const {
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   // If all the arguments to this aggregate are plain relation attributes,
   // aggregate directly on a ValueAccessor from this block to avoid a copy.
   if ((arguments_as_attributes != nullptr) && (!arguments_as_attributes->empty())) {
     DCHECK_EQ(arguments.size(), arguments_as_attributes->size())
         << "Mismatch between number of arguments and number of attribute_ids";
-    return aggregateHelperValueAccessor(handle, *arguments_as_attributes, reuse_matches->get());
+    return aggregateHelperValueAccessor(handle, *arguments_as_attributes, filter);
   }
   // TODO(shoban): We may want to optimize for ScalarLiteral here.
 #endif
 
   // Call aggregateHelperColumnVector() to materialize each argument as a
   // ColumnVector, then aggregate over those.
-  return aggregateHelperColumnVector(handle, arguments, reuse_matches->get());
+  return aggregateHelperColumnVector(handle, arguments, filter);
 }
 
 void StorageBlock::aggregateGroupBy(
     const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
     const std::vector<std::unique_ptr<const Scalar>> &group_by,
-    const Predicate *predicate,
+    const TupleIdSequence *filter,
     AggregationStateHashTableBase *hash_table,
-    std::unique_ptr<TupleIdSequence> *reuse_matches,
     std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
   DCHECK_GT(group_by.size(), 0u)
       << "Called aggregateGroupBy() with zero GROUP BY expressions";
@@ -438,23 +417,7 @@ void StorageBlock::aggregateGroupBy(
   // this aggregate, as well as the GROUP BY expression values.
   ColumnVectorsValueAccessor temp_result;
   {
-    std::unique_ptr<ValueAccessor> accessor;
-    if (predicate) {
-      if (!*reuse_matches) {
-        // If there is a filter predicate that hasn't already been evaluated,
-        // evaluate it now and save the results for other aggregates on this
-        // same block.
-        reuse_matches->reset(getMatchesForPredicate(predicate));
-      }
-
-      // Create a filtered ValueAccessor that only iterates over predicate
-      // matches.
-      accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
-    } else {
-      // Create a ValueAccessor that iterates over all tuples in this block
-      accessor.reset(tuple_store_->createValueAccessor());
-    }
-
+    std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
     attribute_id attr_id = 0;
 
     // First, put GROUP BY keys into 'temp_result'.
@@ -503,9 +466,8 @@ void StorageBlock::aggregateDistinct(
     const std::vector<std::unique_ptr<const Scalar>> &arguments,
     const std::vector<attribute_id> *arguments_as_attributes,
     const std::vector<std::unique_ptr<const Scalar>> &group_by,
-    const Predicate *predicate,
+    const TupleIdSequence *filter,
     AggregationStateHashTableBase *distinctify_hash_table,
-    std::unique_ptr<TupleIdSequence> *reuse_matches,
     std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
   DCHECK_GT(arguments.size(), 0u)
       << "Called aggregateDistinct() with zero argument expressions";
@@ -517,22 +479,7 @@ void StorageBlock::aggregateDistinct(
   // this aggregate, as well as the GROUP BY expression values.
   ColumnVectorsValueAccessor temp_result;
   {
-    std::unique_ptr<ValueAccessor> accessor;
-    if (predicate) {
-      if (!*reuse_matches) {
-        // If there is a filter predicate that hasn't already been evaluated,
-        // evaluate it now and save the results for other aggregates on this
-        // same block.
-        reuse_matches->reset(getMatchesForPredicate(predicate));
-      }
-
-      // Create a filtered ValueAccessor that only iterates over predicate
-      // matches.
-      accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
-    } else {
-      // Create a ValueAccessor that iterates over all tuples in this block
-      accessor.reset(tuple_store_->createValueAccessor());
-    }
+    std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
     // If all the arguments to this aggregate are plain relation attributes,
@@ -1246,23 +1193,36 @@ bool StorageBlock::rebuildIndexes(bool short_circuit) {
   return all_indices_consistent_;
 }
 
-TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate) const {
+TupleIdSequence* StorageBlock::getMatchesForPredicate(const Predicate *predicate,
+                                                      const TupleIdSequence *filter) const {
   if (predicate == nullptr) {
-    return tuple_store_->getExistenceMap();
+    TupleIdSequence *matches = tuple_store_->getExistenceMap();
+    if (filter != nullptr) {
+      matches->intersectWith(*filter);
+    }
+    return matches;
   }
 
   std::unique_ptr<ValueAccessor> value_accessor(tuple_store_->createValueAccessor());
-  std::unique_ptr<TupleIdSequence> existence_map;
-  if (!tuple_store_->isPacked()) {
-    existence_map.reset(tuple_store_->getExistenceMap());
-  }
   SubBlocksReference sub_blocks_ref(*tuple_store_,
                                     indices_,
                                     indices_consistent_);
-  return predicate->getAllMatches(value_accessor.get(),
-                                  &sub_blocks_ref,
-                                  nullptr,
-                                  existence_map.get());
+
+  if (!tuple_store_->isPacked()) {
+    std::unique_ptr<TupleIdSequence> existence_map(tuple_store_->getExistenceMap());
+    if (filter != nullptr) {
+      existence_map->intersectWith(*filter);
+    }
+    return predicate->getAllMatches(value_accessor.get(),
+                                    &sub_blocks_ref,
+                                    nullptr,
+                                    existence_map.get());
+  } else {
+    return predicate->getAllMatches(value_accessor.get(),
+                                    &sub_blocks_ref,
+                                    nullptr,
+                                    filter);
+  }
 }
 
 std::unordered_map<attribute_id, TypedValue>* StorageBlock::generateUpdatedValues(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index bab5bab..488efcc 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -313,6 +313,17 @@ class StorageBlock : public StorageBlockBase {
       ValueAccessor *accessor);
 
   /**
+   * @brief Get the IDs of tuples in this StorageBlock which match a given Predicate.
+   *
+   * @param predicate The predicate to match.
+   * @param filter If non-NULL, then only tuple IDs which are set in the
+   *        filter will be checked (all others will be assumed to be false).
+   * @return A TupleIdSequence which contains matching tuple IDs for predicate.
+   **/
+  TupleIdSequence* getMatchesForPredicate(const Predicate *predicate,
+                                          const TupleIdSequence *filter = nullptr) const;
+
+  /**
    * @brief Perform a random sampling of data on  the StorageBlock. The number
    *       of records sampled is determined by the sample percentage in case of
    *       tuple sample. For block sample all the records in a block are taken.
@@ -336,8 +347,8 @@ class StorageBlock : public StorageBlockBase {
    *
    * @param selection A list of scalars, which will be evaluated to obtain
    *        attribute values for each result tuple.
-   * @param predicate A predicate for selection. NULL indicates that all tuples
-   *        should be matched.
+   * @param filter If non-NULL, then only tuple IDs which are set in the
+   *        filter will be checked (all others will be assumed to be false).
    * @param destination Where to insert the tuples resulting from the SELECT
    *        query.
    * @exception TupleTooLargeForBlock A tuple produced by this selection was
@@ -348,18 +359,18 @@ class StorageBlock : public StorageBlockBase {
    *
    **/
   void select(const std::vector<std::unique_ptr<const Scalar>> &selection,
-              const Predicate *predicate,
+              const TupleIdSequence *filter,
               InsertDestinationInterface *destination) const;
 
   /**
    * @brief Perform a simple SELECT query on this StorageBlock which only
    *        projects attributes and does not evaluate expressions.
    *
+   * @param selection The attributes to project.
+   * @param filter If non-NULL, then only tuple IDs which are set in the
+   *        filter will be checked (all others will be assumed to be false).
    * @param destination Where to insert the tuples resulting from the SELECT
    *        query.
-   * @param selection The attributes to project.
-   * @param predicate A predicate for selection. NULL indicates that all tuples
-   *        should be matched.
    * @exception TupleTooLargeForBlock A tuple produced by this selection was
    *            too large to insert into an empty block provided by
    *            destination. Selection may be partially complete (with some
@@ -371,7 +382,7 @@ class StorageBlock : public StorageBlockBase {
    *         an inconsistent IndexSubBlock (see indicesAreConsistent()).
    **/
   void selectSimple(const std::vector<attribute_id> &selection,
-                    const Predicate *predicate,
+                    const TupleIdSequence *filter,
                     InsertDestinationInterface *destination) const;
 
   /**
@@ -384,23 +395,8 @@ class StorageBlock : public StorageBlockBase {
    * @param arguments_as_attributes If non-NULL, indicates a valid attribute_id
    *        for each of the elements in arguments, and is used to elide a copy.
    *        Has no effect if NULL, or if VECTOR_COPY_ELISION_LEVEL is NONE.
-   * @param predicate A predicate for selection. nullptr indicates that all
-   *        tuples should be aggregated on.
-   * @param reuse_matches This parameter is used to store and reuse tuple-id
-   *        sequence of matches pre-computed in an earlier invocations to
-   *        aggregate(). \c reuse_matches is never \c nullptr for ease of use.
-   *        Current invocation of aggregate() will reuse TupleIdSequence if
-   *        passed, otherwise compute a TupleIdSequence based on \c predicate
-   *        and store in \c reuse_matches. We use std::unique_ptr for each of
-   *        use, since the caller will not have to selective free.
-   *
-   * For example, see this relevant pseudo-C++ code:
-   * \code
-   * std::unique_ptr<TupleIdSequence> matches;
-   * for each aggregate {
-   *   block.aggregate(..., &matches);
-   * }
-   * \endcode
+   * @param filter If non-NULL, then only tuple IDs which are set in the
+   *        filter will be checked (all others will be assumed to be false).
    *
    * @return Aggregated state for this block in the form of an
    *         AggregationState. AggregationHandle::mergeStates() can be called
@@ -412,8 +408,7 @@ class StorageBlock : public StorageBlockBase {
       const AggregationHandle &handle,
       const std::vector<std::unique_ptr<const Scalar>> &arguments,
       const std::vector<attribute_id> *arguments_as_attributes,
-      const Predicate *predicate,
-      std::unique_ptr<TupleIdSequence> *reuse_matches) const;
+      const TupleIdSequence *filter) const;
 
   /**
    * @brief Perform GROUP BY aggregation on the tuples in the this storage
@@ -423,18 +418,10 @@ class StorageBlock : public StorageBlockBase {
    * @param group_by The list of GROUP BY attributes/expressions. The tuples in
    *        this storage block are grouped by these attributes before
    *        aggregation.
-   * @param predicate A predicate for selection. nullptr indicates that all
-   *        tuples should be aggregated on.
+   * @param filter If non-NULL, then only tuple IDs which are set in the
+   *        filter will be checked (all others will be assumed to be false).
    * @param hash_table Hash table to store aggregation state mapped based on
    *        GROUP BY value list (defined by \c group_by).
-   * @param reuse_matches This parameter is used to store and reuse tuple-id
-   *        sequence of matches pre-computed in an earlier invocations of
-   *        aggregateGroupBy(). \c reuse_matches is never \c nullptr for ease of
-   *        use.  Current invocation of aggregateGroupBy() will reuse
-   *        TupleIdSequence if passed, otherwise computes a TupleIdSequence based
-   *        on \c predicate and stores in \c reuse_matches. We use
-   *        std::unique_ptr for each of use, since the caller will not have to
-   *        selective free.
    * @param reuse_group_by_vectors This parameter is used to store and reuse
    *        GROUP BY attribute vectors pre-computed in an earlier invocation of
    *        aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr
@@ -444,10 +431,9 @@ class StorageBlock : public StorageBlockBase {
    *
    * For sample usage of aggregateGroupBy, see this relevant pseudo-C++ code:
    * \code
-   * std::unique_ptr<TupleIdSequence> matches;
    * std::vector<std::unique_ptr<ColumnVector>> group_by_vectors;
    * for each aggregate {
-   *   block.aggregateGroupBy(..., &matches, &group_by_vectors);
+   *   block.aggregateGroupBy(..., &group_by_vectors);
    * }
    * \endcode
    **/
@@ -461,9 +447,8 @@ class StorageBlock : public StorageBlockBase {
   void aggregateGroupBy(
       const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
       const std::vector<std::unique_ptr<const Scalar>> &group_by,
-      const Predicate *predicate,
+      const TupleIdSequence *filter,
       AggregationStateHashTableBase *hash_table,
-      std::unique_ptr<TupleIdSequence> *reuse_matches,
       std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
 
   /**
@@ -481,19 +466,11 @@ class StorageBlock : public StorageBlockBase {
    *        for each of the elements in arguments, and is used to elide a copy.
    *        Has no effect if NULL, or if VECTOR_COPY_ELISION_LEVEL is NONE.
    * @param group_by The list of GROUP BY attributes/expressions.
-   * @param predicate A predicate for selection. \c nullptr indicates that all
-   *        tuples should be aggregated on.
+   * @param filter If non-NULL, then only tuple IDs which are set in the
+   *        filter will be checked (all others will be assumed to be false).
    * @param distinctify_hash_table Hash table to store the arguments and GROUP
    *        BY expressions together as hash table key and a bool constant \c true
    *        as hash table value. (So the hash table actually serves as a hash set.)
-   * @param reuse_matches This parameter is used to store and reuse tuple-id
-   *        sequence of matches pre-computed in an earlier invocations of
-   *        aggregateGroupBy(). \c reuse_matches is never \c nullptr for ease of
-   *        use.  Current invocation of aggregateGroupBy() will reuse
-   *        TupleIdSequence if passed, otherwise computes a TupleIdSequence based
-   *        on \c predicate and stores in \c reuse_matches. We use
-   *        std::unique_ptr for each of use, since the caller will not have to
-   *        selective free.
    * @param reuse_group_by_vectors This parameter is used to store and reuse
    *        GROUP BY attribute vectors pre-computed in an earlier invocation of
    *        aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr
@@ -505,9 +482,8 @@ class StorageBlock : public StorageBlockBase {
                          const std::vector<std::unique_ptr<const Scalar>> &arguments,
                          const std::vector<attribute_id> *arguments_as_attributes,
                          const std::vector<std::unique_ptr<const Scalar>> &group_by,
-                         const Predicate *predicate,
+                         const TupleIdSequence *filter,
                          AggregationStateHashTableBase *distinctify_hash_table,
-                         std::unique_ptr<TupleIdSequence> *reuse_matches,
                          std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
 
   /**
@@ -627,8 +603,6 @@ class StorageBlock : public StorageBlockBase {
   // StorageBlock's header.
   bool rebuildIndexes(bool short_circuit);
 
-  TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const;
-
   std::unordered_map<attribute_id, TypedValue>* generateUpdatedValues(
       const ValueAccessor &accessor,
       const tuple_id tuple,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/utility/PlanVisualizer.cpp
----------------------------------------------------------------------
diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp
index 2adf674..5d70c86 100644
--- a/utility/PlanVisualizer.cpp
+++ b/utility/PlanVisualizer.cpp
@@ -132,9 +132,7 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
 
     for (const auto &attr : child->getOutputAttributes()) {
       if (referenced_ids.find(attr->id()) != referenced_ids.end()) {
-        edge_info.labels.emplace_back(
-            attr->attribute_alias() + ", est # distinct = " +
-            std::to_string(cost_model_->estimateNumDistinctValues(attr->id(), child)));
+        edge_info.labels.emplace_back(attr->attribute_alias());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index b7224d2..23b3763 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -25,6 +25,7 @@ add_library(quickstep_utility_lipfilter_LIPFilterAdaptiveProber ../../empty_src.
 add_library(quickstep_utility_lipfilter_LIPFilterBuilder ../../empty_src.cpp LIPFilterBuilder.hpp)
 add_library(quickstep_utility_lipfilter_LIPFilterDeployment LIPFilterDeployment.cpp LIPFilterDeployment.hpp)
 add_library(quickstep_utility_lipfilter_LIPFilterFactory LIPFilterFactory.cpp LIPFilterFactory.hpp)
+add_library(quickstep_utility_lipfilter_LIPFilterUtil ../../empty_src.cpp LIPFilterUtil.hpp)
 add_library(quickstep_utility_lipfilter_LIPFilter_proto
             ${utility_lipfilter_LIPFilter_proto_srcs})
 add_library(quickstep_utility_lipfilter_SingleIdentityHashFilter ../../empty_src.cpp SingleIdentityHashFilter.hpp)
@@ -58,6 +59,9 @@ target_link_libraries(quickstep_utility_lipfilter_LIPFilterFactory
                       quickstep_utility_lipfilter_LIPFilter_proto
                       quickstep_utility_lipfilter_SingleIdentityHashFilter
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_lipfilter_LIPFilterUtil
+                      quickstep_queryexecution_QueryContext
+                      quickstep_utility_lipfilter_LIPFilterDeployment)
 target_link_libraries(quickstep_utility_lipfilter_LIPFilter_proto
                       ${PROTOBUF_LIBRARY}
                       quickstep_types_Type_proto)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/utility/lip_filter/LIPFilterBuilder.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterBuilder.hpp b/utility/lip_filter/LIPFilterBuilder.hpp
index deb8f66..aa84a06 100644
--- a/utility/lip_filter/LIPFilterBuilder.hpp
+++ b/utility/lip_filter/LIPFilterBuilder.hpp
@@ -39,9 +39,6 @@ class ValueAccessor;
  *  @{
  */
 
-class LIPFilterBuilder;
-typedef std::shared_ptr<LIPFilterBuilder> LIPFilterBuilderPtr;
-
 /**
  * @brief Helper class for building LIPFilters from a relation (i.e. ValueAccessor).
  */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/utility/lip_filter/LIPFilterUtil.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterUtil.hpp b/utility/lip_filter/LIPFilterUtil.hpp
new file mode 100644
index 0000000..5d43c49
--- /dev/null
+++ b/utility/lip_filter/LIPFilterUtil.hpp
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_UTIL_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_UTIL_HPP_
+
+#include "query_execution/QueryContext.hpp"
+#include "utility/lip_filter/LIPFilterDeployment.hpp"
+
+namespace quickstep {
+
+class LIPFilterBuilder;
+class LIPFilterAdaptiveProber;
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+/**
+ * @brief Create a LIPFilterBuilder for the given LIPFilterDeployment in QueryContext.
+ *
+ * @param lip_deployment_index The id of the LIPFilterDeployment in QueryContext.
+ * @param query_context The QueryContext.
+ * @return A LIPFilterBuilder object, or nullptr if \p lip_deployment_index is invalid.
+ */
+inline LIPFilterBuilder* CreateLIPFilterBuilderHelper(
+    const QueryContext::lip_deployment_id lip_deployment_index,
+    const QueryContext *query_context) {
+  if (lip_deployment_index == QueryContext::kInvalidLIPDeploymentId) {
+    return nullptr;
+  } else {
+    const LIPFilterDeployment *lip_filter_deployment =
+        query_context->getLIPDeployment(lip_deployment_index);
+    return lip_filter_deployment->createLIPFilterBuilder();
+  }
+}
+
+/**
+ * @brief Create a LIPFilterAdaptiveProber for the given LIPFilterDeployment
+ *        in QueryContext.
+ *
+ * @param lip_deployment_index The id of the LIPFilterDeployment in QueryContext.
+ * @param query_context The QueryContext.
+ * @return A LIPFilterAdaptiveProber object, or nullptr if \p lip_deployment_index
+ *         is invalid.
+ */
+inline LIPFilterAdaptiveProber* CreateLIPFilterAdaptiveProberHelper(
+    const QueryContext::lip_deployment_id lip_deployment_index,
+    const QueryContext *query_context) {
+  if (lip_deployment_index == QueryContext::kInvalidLIPDeploymentId) {
+    return nullptr;
+  } else {
+    const LIPFilterDeployment *lip_filter_deployment =
+        query_context->getLIPDeployment(lip_deployment_index);
+    return lip_filter_deployment->createLIPFilterAdaptiveProber();
+  }
+}
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_LIP_FILTER_LIP_FILTER_UTIL_HPP_


[2/3] incubator-quickstep git commit: Add backend support for LIPFilters.

Posted by hb...@apache.org.
Add backend support for LIPFilters.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/393eba55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/393eba55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/393eba55

Branch: refs/heads/partitioned-aggregate-new
Commit: 393eba550efdabbdccb2bf166fe8deae8bd3ed77
Parents: 539a95a
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Wed Sep 7 13:20:43 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Mon Oct 24 17:00:55 2016 -0500

----------------------------------------------------------------------
 expressions/scalar/ScalarAttribute.cpp       |   2 +-
 query_execution/QueryContext.hpp             |   8 +-
 query_optimizer/PhysicalGenerator.cpp        |   2 +-
 query_optimizer/rules/AttachLIPFilters.cpp   |  17 ++-
 query_optimizer/rules/CMakeLists.txt         |   1 +
 query_optimizer/tests/OptimizerTextTest.cpp  |   6 +-
 relational_operators/AggregationOperator.cpp |  12 +-
 relational_operators/AggregationOperator.hpp |  10 +-
 relational_operators/BuildHashOperator.cpp   |  17 ++-
 relational_operators/BuildHashOperator.hpp   |  18 ++-
 relational_operators/CMakeLists.txt          |  11 ++
 relational_operators/HashJoinOperator.cpp    | 161 ++++++++++++++++------
 relational_operators/HashJoinOperator.hpp    |  64 ++++++---
 relational_operators/SelectOperator.cpp      |  87 +++++++++---
 relational_operators/SelectOperator.hpp      |  16 ++-
 relational_operators/WorkOrder.proto         |   7 +-
 relational_operators/WorkOrderFactory.cpp    |  73 +++++++++-
 storage/AggregationOperationState.cpp        |  49 ++++---
 storage/AggregationOperationState.hpp        |   9 +-
 storage/CMakeLists.txt                       |   5 +-
 storage/StorageBlock.cpp                     | 112 +++++----------
 storage/StorageBlock.hpp                     |  82 ++++-------
 utility/PlanVisualizer.cpp                   |   4 +-
 utility/lip_filter/CMakeLists.txt            |   4 +
 utility/lip_filter/LIPFilterBuilder.hpp      |   3 -
 utility/lip_filter/LIPFilterUtil.hpp         |  79 +++++++++++
 26 files changed, 588 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/expressions/scalar/ScalarAttribute.cpp
----------------------------------------------------------------------
diff --git a/expressions/scalar/ScalarAttribute.cpp b/expressions/scalar/ScalarAttribute.cpp
index b29286b..cc42084 100644
--- a/expressions/scalar/ScalarAttribute.cpp
+++ b/expressions/scalar/ScalarAttribute.cpp
@@ -168,7 +168,7 @@ ColumnVector* ScalarAttribute::getAllValuesForJoin(
   ValueAccessor *accessor = using_left_relation ? left_accessor
                                                 : right_accessor;
 
-  return InvokeOnValueAccessorNotAdapter(
+  return InvokeOnAnyValueAccessor(
       accessor,
       [&joined_tuple_ids,
        &attr_id,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 4ebb042..7ad8fa1 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -88,7 +88,7 @@ class QueryContext {
   /**
    * @brief A unique identifier for a LIPFilterDeployment per query.
    **/
-  typedef std::uint32_t lip_deployment_id;
+  typedef std::int32_t lip_deployment_id;
   static constexpr lip_deployment_id kInvalidLIPDeploymentId = static_cast<lip_deployment_id>(-1);
 
   /**
@@ -315,7 +315,7 @@ class QueryContext {
    * @return True if valid, otherwise false.
    **/
   bool isValidLIPDeploymentId(const lip_deployment_id id) const {
-    return id < lip_deployments_.size();
+    return static_cast<std::size_t>(id) < lip_deployments_.size();
   }
 
   /**
@@ -328,7 +328,7 @@ class QueryContext {
    **/
   inline const LIPFilterDeployment* getLIPDeployment(
       const lip_deployment_id id) const {
-    DCHECK_LT(id, lip_deployments_.size());
+    DCHECK_LT(static_cast<std::size_t>(id), lip_deployments_.size());
     return lip_deployments_[id].get();
   }
 
@@ -338,7 +338,7 @@ class QueryContext {
    * @param id The id of the LIPFilterDeployment to destroy.
    **/
   inline void destroyLIPDeployment(const lip_deployment_id id) {
-    DCHECK_LT(id, lip_deployments_.size());
+    DCHECK_LT(static_cast<std::size_t>(id), lip_deployments_.size());
     lip_deployments_[id].reset();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index 9db4037..7cb97dc 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -50,7 +50,7 @@ DEFINE_bool(reorder_hash_joins, true,
             "cardinality and selective tables to be joined first, which is suitable "
             "for queries on star-schema tables.");
 
-DEFINE_bool(use_lip_filters, false,
+DEFINE_bool(use_lip_filters, true,
             "If true, use LIP (Lookahead Information Passing) filters to accelerate "
             "query processing. LIP filters are effective for queries on star schema "
             "tables (e.g. the SSB benchmark) and snowflake schema tables (e.g. the "

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/query_optimizer/rules/AttachLIPFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachLIPFilters.cpp b/query_optimizer/rules/AttachLIPFilters.cpp
index 090fb8c..b3c57ab 100644
--- a/query_optimizer/rules/AttachLIPFilters.cpp
+++ b/query_optimizer/rules/AttachLIPFilters.cpp
@@ -36,6 +36,7 @@
 #include "query_optimizer/physical/PhysicalType.hpp"
 #include "query_optimizer/physical/Selection.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "types/TypedValue.hpp"
 #include "utility/lip_filter/LIPFilter.hpp"
 
 #include "glog/logging.h"
@@ -174,12 +175,16 @@ const std::vector<AttachLIPFilters::LIPFilterInfoPtr>& AttachLIPFilters
       if (selectivity < 1.0) {
         std::size_t cardinality = cost_model_->estimateCardinality(build_node);
         for (const auto &attr : hash_join->right_join_attributes()) {
-          lip_filters.emplace_back(
-              std::make_shared<LIPFilterInfo>(attr,
-                                              path.cdr()->node,
-                                              path.depth,
-                                              selectivity,
-                                              cardinality));
+          // NOTE(jianqiao): currently we only consider attributes of primitive
+          // fixed-length types.
+          if (TypedValue::HashIsReversible(attr->getValueType().getTypeID())) {
+            lip_filters.emplace_back(
+                std::make_shared<LIPFilterInfo>(attr,
+                                                path.cdr()->node,
+                                                path.depth,
+                                                selectivity,
+                                                cardinality));
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 29875f6..7fffadc 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -50,6 +50,7 @@ target_link_libraries(quickstep_queryoptimizer_rules_AttachLIPFilters
                       quickstep_queryoptimizer_physical_Selection
                       quickstep_queryoptimizer_physical_TopLevelPlan
                       quickstep_queryoptimizer_rules_Rule
+                      quickstep_types_TypedValue
                       quickstep_utility_Macros
                       quickstep_utility_lipfilter_LIPFilter)
 target_link_libraries(quickstep_queryoptimizer_rules_BottomUpRule

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/query_optimizer/tests/OptimizerTextTest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/OptimizerTextTest.cpp b/query_optimizer/tests/OptimizerTextTest.cpp
index b6be739..759c173 100644
--- a/query_optimizer/tests/OptimizerTextTest.cpp
+++ b/query_optimizer/tests/OptimizerTextTest.cpp
@@ -32,6 +32,7 @@ namespace quickstep {
 namespace optimizer {
 
 DECLARE_bool(reorder_hash_joins);
+DECLARE_bool(use_lip_filters);
 
 }
 }
@@ -57,9 +58,10 @@ int main(int argc, char** argv) {
   test_driver->registerOptions(
       quickstep::optimizer::OptimizerTextTestRunner::kTestOptions);
 
-  // Turn off join order optimization for optimizer test since it is up to change
-  // and affects a large number of test cases.
+  // Turn off join order optimization and LIPFilter for optimizer test since
+  // it is up to change and affects a large number of test cases.
   quickstep::optimizer::FLAGS_reorder_hash_joins = false;
+  quickstep::optimizer::FLAGS_use_lip_filters = false;
 
   ::testing::InitGoogleTest(&argc, argv);
   int success = RUN_ALL_TESTS();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/AggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.cpp b/relational_operators/AggregationOperator.cpp
index 056e76d..e111f5b 100644
--- a/relational_operators/AggregationOperator.cpp
+++ b/relational_operators/AggregationOperator.cpp
@@ -27,6 +27,8 @@
 #include "relational_operators/WorkOrder.pb.h"
 #include "storage/AggregationOperationState.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+#include "utility/lip_filter/LIPFilterUtil.hpp"
 
 #include "tmb/id_typedefs.h"
 
@@ -45,7 +47,8 @@ bool AggregationOperator::getAllWorkOrders(
             new AggregationWorkOrder(
                 query_id_,
                 input_block_id,
-                query_context->getAggregationState(aggr_state_index_)),
+                query_context->getAggregationState(aggr_state_index_),
+                CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
             op_index_);
       }
       started_ = true;
@@ -57,7 +60,8 @@ bool AggregationOperator::getAllWorkOrders(
           new AggregationWorkOrder(
               query_id_,
               input_relation_block_ids_[num_workorders_generated_],
-              query_context->getAggregationState(aggr_state_index_)),
+              query_context->getAggregationState(aggr_state_index_),
+              CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
           op_index_);
       ++num_workorders_generated_;
     }
@@ -92,13 +96,13 @@ serialization::WorkOrder* AggregationOperator::createWorkOrderProto(const block_
 
   proto->SetExtension(serialization::AggregationWorkOrder::block_id, block);
   proto->SetExtension(serialization::AggregationWorkOrder::aggr_state_index, aggr_state_index_);
+  proto->SetExtension(serialization::AggregationWorkOrder::lip_deployment_index, lip_deployment_index_);
 
   return proto;
 }
 
-
 void AggregationWorkOrder::execute() {
-  state_->aggregateBlock(input_block_id_);
+  state_->aggregateBlock(input_block_id_, lip_filter_adaptive_prober_.get());
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/AggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp
index 31c1da4..b5ed977 100644
--- a/relational_operators/AggregationOperator.hpp
+++ b/relational_operators/AggregationOperator.hpp
@@ -30,6 +30,7 @@
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
 
 #include "glog/logging.h"
 
@@ -137,13 +138,16 @@ class AggregationWorkOrder : public WorkOrder {
    * @param query_id The ID of this query.
    * @param input_block_id The block id.
    * @param state The AggregationState to use.
+   * @param lip_filter_adaptive_prober The attached LIP filter prober.
    **/
   AggregationWorkOrder(const std::size_t query_id,
                        const block_id input_block_id,
-                       AggregationOperationState *state)
+                       AggregationOperationState *state,
+                       LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
       : WorkOrder(query_id),
         input_block_id_(input_block_id),
-        state_(DCHECK_NOTNULL(state)) {}
+        state_(DCHECK_NOTNULL(state)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
 
   ~AggregationWorkOrder() override {}
 
@@ -153,6 +157,8 @@ class AggregationWorkOrder : public WorkOrder {
   const block_id input_block_id_;
   AggregationOperationState *state_;
 
+  std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
   DISALLOW_COPY_AND_ASSIGN(AggregationWorkOrder);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/BuildHashOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.cpp b/relational_operators/BuildHashOperator.cpp
index 465621c..60e091f 100644
--- a/relational_operators/BuildHashOperator.cpp
+++ b/relational_operators/BuildHashOperator.cpp
@@ -34,6 +34,8 @@
 #include "storage/TupleReference.hpp"
 #include "storage/TupleStorageSubBlock.hpp"
 #include "storage/ValueAccessor.hpp"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
+#include "utility/lip_filter/LIPFilterUtil.hpp"
 
 #include "glog/logging.h"
 
@@ -79,7 +81,8 @@ bool BuildHashOperator::getAllWorkOrders(
                                    any_join_key_attributes_nullable_,
                                    input_block_id,
                                    hash_table,
-                                   storage_manager),
+                                   storage_manager,
+                                   CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)),
             op_index_);
       }
       started_ = true;
@@ -95,7 +98,8 @@ bool BuildHashOperator::getAllWorkOrders(
               any_join_key_attributes_nullable_,
               input_relation_block_ids_[num_workorders_generated_],
               hash_table,
-              storage_manager),
+              storage_manager,
+              CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)),
           op_index_);
       ++num_workorders_generated_;
     }
@@ -136,17 +140,24 @@ serialization::WorkOrder* BuildHashOperator::createWorkOrderProto(const block_id
                       any_join_key_attributes_nullable_);
   proto->SetExtension(serialization::BuildHashWorkOrder::join_hash_table_index, hash_table_index_);
   proto->SetExtension(serialization::BuildHashWorkOrder::block_id, block);
+  proto->SetExtension(serialization::BuildHashWorkOrder::lip_deployment_index, lip_deployment_index_);
 
   return proto;
 }
 
-
 void BuildHashWorkOrder::execute() {
   BlockReference block(
       storage_manager_->getBlock(build_block_id_, input_relation_));
 
   TupleReferenceGenerator generator(build_block_id_);
   std::unique_ptr<ValueAccessor> accessor(block->getTupleStorageSubBlock().createValueAccessor());
+
+  // Build LIPFilters if enabled.
+  if (lip_filter_builder_ != nullptr) {
+    lip_filter_builder_->insertValueAccessor(accessor.get());
+    accessor->beginIterationVirtual();
+  }
+
   HashTablePutResult result;
   if (join_key_attributes_.size() == 1) {
     result = hash_table_->putValueAccessor(accessor.get(),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/BuildHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp
index 4a80a8a..0f96ef2 100644
--- a/relational_operators/BuildHashOperator.hpp
+++ b/relational_operators/BuildHashOperator.hpp
@@ -20,6 +20,7 @@
 #ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_HASH_OPERATOR_HPP_
 #define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_HASH_OPERATOR_HPP_
 
+#include <memory>
 #include <string>
 #include <utility>
 #include <vector>
@@ -31,6 +32,7 @@
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
 
 #include "glog/logging.h"
 
@@ -162,6 +164,7 @@ class BuildHashWorkOrder : public WorkOrder {
    * @param build_block_id The block id.
    * @param hash_table The JoinHashTable to use.
    * @param storage_manager The StorageManager to use.
+   * @param lip_filter_builder The attached LIP filter builer.
    **/
   BuildHashWorkOrder(const std::size_t query_id,
                      const CatalogRelationSchema &input_relation,
@@ -169,14 +172,16 @@ class BuildHashWorkOrder : public WorkOrder {
                      const bool any_join_key_attributes_nullable,
                      const block_id build_block_id,
                      JoinHashTable *hash_table,
-                     StorageManager *storage_manager)
+                     StorageManager *storage_manager,
+                     LIPFilterBuilder *lip_filter_builder = nullptr)
       : WorkOrder(query_id),
         input_relation_(input_relation),
         join_key_attributes_(join_key_attributes),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
         build_block_id_(build_block_id),
         hash_table_(DCHECK_NOTNULL(hash_table)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_builder_(lip_filter_builder) {}
 
   /**
    * @brief Constructor for the distributed version.
@@ -189,6 +194,7 @@ class BuildHashWorkOrder : public WorkOrder {
    * @param build_block_id The block id.
    * @param hash_table The JoinHashTable to use.
    * @param storage_manager The StorageManager to use.
+   * @param lip_filter_builder The attached LIP filter builer.
    **/
   BuildHashWorkOrder(const std::size_t query_id,
                      const CatalogRelationSchema &input_relation,
@@ -196,14 +202,16 @@ class BuildHashWorkOrder : public WorkOrder {
                      const bool any_join_key_attributes_nullable,
                      const block_id build_block_id,
                      JoinHashTable *hash_table,
-                     StorageManager *storage_manager)
+                     StorageManager *storage_manager,
+                     LIPFilterBuilder *lip_filter_builder = nullptr)
       : WorkOrder(query_id),
         input_relation_(input_relation),
         join_key_attributes_(std::move(join_key_attributes)),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
         build_block_id_(build_block_id),
         hash_table_(DCHECK_NOTNULL(hash_table)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_builder_(lip_filter_builder) {}
 
   ~BuildHashWorkOrder() override {}
 
@@ -222,6 +230,8 @@ class BuildHashWorkOrder : public WorkOrder {
   JoinHashTable *hash_table_;
   StorageManager *storage_manager_;
 
+  std::unique_ptr<LIPFilterBuilder> lip_filter_builder_;
+
   DISALLOW_COPY_AND_ASSIGN(BuildHashWorkOrder);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index a9645b4..8dd65d0 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -92,6 +92,8 @@ target_link_libraries(quickstep_relationaloperators_AggregationOperator
                       quickstep_storage_AggregationOperationState
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilterAdaptiveProber
+                      quickstep_utility_lipfilter_LIPFilterUtil
                       tmb)
 target_link_libraries(quickstep_relationaloperators_BuildHashOperator
                       glog
@@ -111,6 +113,8 @@ target_link_libraries(quickstep_relationaloperators_BuildHashOperator
                       quickstep_storage_TupleStorageSubBlock
                       quickstep_storage_ValueAccessor
                       quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilterBuilder
+                      quickstep_utility_lipfilter_LIPFilterUtil
                       tmb)
 target_link_libraries(quickstep_relationaloperators_CreateIndexOperator
                       glog
@@ -223,6 +227,8 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilterAdaptiveProber
+                      quickstep_utility_lipfilter_LIPFilterUtil
                       tmb)
 target_link_libraries(quickstep_relationaloperators_InsertOperator
                       glog
@@ -322,7 +328,11 @@ target_link_libraries(quickstep_relationaloperators_SelectOperator
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
+                      quickstep_storage_TupleIdSequence
+                      quickstep_storage_ValueAccessor
                       quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilterAdaptiveProber
+                      quickstep_utility_lipfilter_LIPFilterUtil
                       tmb)
 if(QUICKSTEP_HAVE_LIBNUMA)
 target_link_libraries(quickstep_relationaloperators_SelectOperator
@@ -492,6 +502,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
                       quickstep_relationaloperators_WorkOrder_proto
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilterUtil
                       tmb)
 target_link_libraries(quickstep_relationaloperators_WorkOrder_proto
                       quickstep_relationaloperators_SortMergeRunOperator_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 779c0fe..4a91f86 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -48,6 +48,8 @@
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+#include "utility/lip_filter/LIPFilterUtil.hpp"
 
 #include "gflags/gflags.h"
 #include "glog/logging.h"
@@ -95,28 +97,22 @@ class MapBasedJoinedTupleCollector {
 
 class SemiAntiJoinTupleCollector {
  public:
-  explicit SemiAntiJoinTupleCollector(const TupleStorageSubBlock &tuple_store) {
-    filter_.reset(tuple_store.getExistenceMap());
-  }
+  explicit SemiAntiJoinTupleCollector(TupleIdSequence *filter)
+      : filter_(filter) {}
 
   template <typename ValueAccessorT>
   inline void operator()(const ValueAccessorT &accessor) {
     filter_->set(accessor.getCurrentPosition(), false);
   }
 
-  const TupleIdSequence* filter() const {
-    return filter_.get();
-  }
-
  private:
-  std::unique_ptr<TupleIdSequence> filter_;
+  TupleIdSequence *filter_;
 };
 
 class OuterJoinTupleCollector {
  public:
-  explicit OuterJoinTupleCollector(const TupleStorageSubBlock &tuple_store) {
-    filter_.reset(tuple_store.getExistenceMap());
-  }
+  explicit OuterJoinTupleCollector(TupleIdSequence *filter)
+      : filter_(filter) {}
 
   template <typename ValueAccessorT>
   inline void operator()(const ValueAccessorT &accessor,
@@ -134,14 +130,10 @@ class OuterJoinTupleCollector {
     return &joined_tuples_;
   }
 
-  const TupleIdSequence* filter() const {
-    return filter_.get();
-  }
-
  private:
   std::unordered_map<block_id, std::vector<std::pair<tuple_id, tuple_id>>> joined_tuples_;
   // BitVector on the probe relation. 1 if the corresponding tuple has no match.
-  std::unique_ptr<TupleIdSequence> filter_;
+  TupleIdSequence *filter_;
 };
 
 }  // namespace
@@ -203,7 +195,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
                                      selection,
                                      hash_table,
                                      output_destination,
-                                     storage_manager),
+                                     storage_manager,
+                                     CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
               op_index_);
         }
         started_ = true;
@@ -223,7 +216,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
                 selection,
                 hash_table,
                 output_destination,
-                storage_manager),
+                storage_manager,
+                CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
             op_index_);
         ++num_workorders_generated_;
       }  // end while
@@ -264,7 +258,8 @@ bool HashJoinOperator::getAllOuterJoinWorkOrders(
                   is_selection_on_build_,
                   hash_table,
                   output_destination,
-                  storage_manager),
+                  storage_manager,
+                  CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
               op_index_);
         }
         started_ = true;
@@ -284,7 +279,8 @@ bool HashJoinOperator::getAllOuterJoinWorkOrders(
                 is_selection_on_build_,
                 hash_table,
                 output_destination,
-                storage_manager),
+                storage_manager,
+                CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
             op_index_);
         ++num_workorders_generated_;
       }
@@ -360,6 +356,7 @@ serialization::WorkOrder* HashJoinOperator::createNonOuterJoinWorkOrderProto(
   proto->SetExtension(serialization::HashJoinWorkOrder::selection_index, selection_index_);
   proto->SetExtension(serialization::HashJoinWorkOrder::block_id, block);
   proto->SetExtension(serialization::HashJoinWorkOrder::residual_predicate_index, residual_predicate_index_);
+  proto->SetExtension(serialization::HashJoinWorkOrder::lip_deployment_index, lip_deployment_index_);
 
   return proto;
 }
@@ -408,6 +405,7 @@ serialization::WorkOrder* HashJoinOperator::createOuterJoinWorkOrderProto(const
   proto->SetExtension(serialization::HashJoinWorkOrder::join_hash_table_index, hash_table_index_);
   proto->SetExtension(serialization::HashJoinWorkOrder::selection_index, selection_index_);
   proto->SetExtension(serialization::HashJoinWorkOrder::block_id, block);
+  proto->SetExtension(serialization::HashJoinWorkOrder::lip_deployment_index, lip_deployment_index_);
 
   for (const bool is_attribute_on_build : is_selection_on_build_) {
     proto->AddExtension(serialization::HashJoinWorkOrder::is_selection_on_build, is_attribute_on_build);
@@ -421,8 +419,19 @@ void HashInnerJoinWorkOrder::execute() {
   BlockReference probe_block(
       storage_manager_->getBlock(block_id_, probe_relation_));
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
-
   std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+
+  // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled.
+  std::unique_ptr<TupleIdSequence> existence_map;
+  std::unique_ptr<ValueAccessor> base_accessor;
+  if (lip_filter_adaptive_prober_ != nullptr) {
+    base_accessor.reset(probe_accessor.release());
+    existence_map.reset(
+        lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
+    probe_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+  }
+
   MapBasedJoinedTupleCollector collector;
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessor(
@@ -526,9 +535,19 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
   BlockReference probe_block = storage_manager_->getBlock(block_id_,
                                                           probe_relation_);
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
-
   std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
 
+  // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled.
+  std::unique_ptr<TupleIdSequence> existence_map;
+  std::unique_ptr<ValueAccessor> base_accessor;
+  if (lip_filter_adaptive_prober_ != nullptr) {
+    base_accessor.reset(probe_accessor.release());
+    existence_map.reset(
+        lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
+    probe_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+  }
+
   // We collect all the matching probe relation tuples, as there's a residual
   // preidcate that needs to be applied after collecting these matches.
   MapBasedJoinedTupleCollector collector;
@@ -548,7 +567,6 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
 
   // Get a filter for tuples in the given probe block.
   TupleIdSequence filter(probe_store.getMaxTupleID() + 1);
-  filter.setRange(0, filter.length(), false);
   for (const std::pair<const block_id,
                        std::vector<std::pair<tuple_id, tuple_id>>>
            &build_block_entry : *collector.getJoinedTuples()) {
@@ -607,9 +625,24 @@ void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() {
   BlockReference probe_block = storage_manager_->getBlock(block_id_,
                                                           probe_relation_);
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
-
   std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
-  SemiAntiJoinTupleCollector collector(probe_store);
+
+  // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled.
+  std::unique_ptr<TupleIdSequence> existence_map;
+  std::unique_ptr<ValueAccessor> base_accessor;
+  if (lip_filter_adaptive_prober_ != nullptr) {
+    base_accessor.reset(probe_accessor.release());
+    existence_map.reset(
+        lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
+    probe_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+  }
+
+  if (existence_map == nullptr) {
+    existence_map.reset(probe_store.getExistenceMap());
+  }
+
+  SemiAntiJoinTupleCollector collector(existence_map.get());
   // We collect all the probe relation tuples which have at least one matching
   // tuple in the build relation. As a performance optimization, the hash table
   // just looks for the existence of the probing key in the hash table and sets
@@ -637,7 +670,7 @@ void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() {
                                     probe_block->getIndicesConsistent());
 
   std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
-      probe_store.createValueAccessor(collector.filter()));
+      probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
   ColumnVectorsValueAccessor temp_result;
   for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
        selection_it != selection_.end(); ++selection_it) {
@@ -654,9 +687,24 @@ void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() {
   BlockReference probe_block = storage_manager_->getBlock(block_id_,
                                                           probe_relation_);
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
-
   std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
-  SemiAntiJoinTupleCollector collector(probe_store);
+
+  // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled.
+  std::unique_ptr<TupleIdSequence> existence_map;
+  std::unique_ptr<ValueAccessor> base_accessor;
+  if (lip_filter_adaptive_prober_ != nullptr) {
+    base_accessor.reset(probe_accessor.release());
+    existence_map.reset(
+        lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
+    probe_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+  }
+
+  if (existence_map == nullptr) {
+    existence_map.reset(probe_store.getExistenceMap());
+  }
+
+  SemiAntiJoinTupleCollector collector(existence_map.get());
   // We probe the hash table to find the keys which have an entry in the
   // hash table.
   if (join_key_attributes_.size() == 1) {
@@ -680,7 +728,7 @@ void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() {
                                     probe_block->getIndicesConsistent());
 
   std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
-      probe_store.createValueAccessor(collector.filter()));
+      probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
   ColumnVectorsValueAccessor temp_result;
   for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
        selection_it != selection_.end(); ++selection_it) {
@@ -698,8 +746,19 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
   BlockReference probe_block = storage_manager_->getBlock(block_id_,
                                                           probe_relation_);
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
-
   std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+
+  // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled.
+  std::unique_ptr<TupleIdSequence> existence_map;
+  std::unique_ptr<ValueAccessor> base_accessor;
+  if (lip_filter_adaptive_prober_ != nullptr) {
+    base_accessor.reset(probe_accessor.release());
+    existence_map.reset(
+        lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
+    probe_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+  }
+
   MapBasedJoinedTupleCollector collector;
   // We probe the hash table and get all the matches. Unlike
   // executeWithoutResidualPredicate(), we have to collect all the matching
@@ -719,8 +778,12 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
         &collector);
   }
 
-  // Create a filter for all the tuples from the given probe block.
-  std::unique_ptr<TupleIdSequence> filter(probe_store.getExistenceMap());
+  // If the existence map has not been initialized by the pre-filtering LIPFilters.
+  // Then create it for all the tuples from the given probe block.
+  if (existence_map == nullptr) {
+    existence_map.reset(probe_store.getExistenceMap());
+  }
+
   for (const std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>
            &build_block_entry : *collector.getJoinedTuples()) {
     // First element of the pair build_block_entry is the build block ID
@@ -733,7 +796,7 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
     std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor());
     for (const std::pair<tuple_id, tuple_id> &hash_match
          : build_block_entry.second) {
-      if (!filter->get(hash_match.second)) {
+      if (!existence_map->get(hash_match.second)) {
         // We have already seen this tuple, skip it.
         continue;
       }
@@ -743,9 +806,9 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
                                                       *probe_accessor,
                                                       probe_relation_id,
                                                       hash_match.second)) {
-        // Note that the filter marks a match as false, as needed by the anti
-        // join definition.
-        filter->set(hash_match.second, false);
+        // Note that the existence map marks a match as false, as needed by the
+        // anti join definition.
+        existence_map->set(hash_match.second, false);
       }
     }
   }
@@ -755,7 +818,7 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
                                     probe_block->getIndicesConsistent());
 
   std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
-      probe_store.createValueAccessor(filter.get()));
+      probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
   ColumnVectorsValueAccessor temp_result;
   for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
        selection_it != selection_.end();
@@ -775,9 +838,24 @@ void HashOuterJoinWorkOrder::execute() {
   const BlockReference probe_block = storage_manager_->getBlock(block_id_,
                                                                 probe_relation_);
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
-
   std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
-  OuterJoinTupleCollector collector(probe_store);
+
+  // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled.
+  std::unique_ptr<TupleIdSequence> existence_map;
+  std::unique_ptr<ValueAccessor> base_accessor;
+  if (lip_filter_adaptive_prober_ != nullptr) {
+    base_accessor.reset(probe_accessor.release());
+    existence_map.reset(
+        lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
+    probe_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+  }
+
+  if (existence_map == nullptr) {
+    existence_map.reset(probe_store.getExistenceMap());
+  }
+
+  OuterJoinTupleCollector collector(existence_map.get());
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessorWithExtraWorkForFirstMatch(
         probe_accessor.get(),
@@ -822,11 +900,10 @@ void HashOuterJoinWorkOrder::execute() {
                                     probe_block->getIndicesConsistent());
 
   // Populate the output tuples for non-matches.
-  const TupleIdSequence *filter = collector.filter();
-  const TupleIdSequence::size_type num_tuples_without_matches = filter->size();
+  const TupleIdSequence::size_type num_tuples_without_matches = existence_map->size();
   if (num_tuples_without_matches > 0) {
     std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
-        probe_store.createValueAccessor(filter));
+        probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
     ColumnVectorsValueAccessor temp_result;
 
     for (std::size_t i = 0; i < selection_.size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index fa393b6..0ed1eeb 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -35,6 +35,7 @@
 #include "storage/HashTable.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
 
 #include "glog/logging.h"
 
@@ -295,6 +296,7 @@ class HashInnerJoinWorkOrder : public WorkOrder {
    * @param hash_table The JoinHashTable to use.
    * @param output_destination The InsertDestination to insert the join results.
    * @param storage_manager The StorageManager to use.
+   * @param lip_filter_adaptive_prober The attached LIP filter prober.
    **/
   HashInnerJoinWorkOrder(
       const std::size_t query_id,
@@ -307,7 +309,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
+      StorageManager *storage_manager,
+      LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
       : WorkOrder(query_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
@@ -318,7 +321,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
 
   /**
    * @brief Constructor for the distributed version.
@@ -342,6 +346,7 @@ class HashInnerJoinWorkOrder : public WorkOrder {
    * @param hash_table The JoinHashTable to use.
    * @param output_destination The InsertDestination to insert the join results.
    * @param storage_manager The StorageManager to use.
+   * @param lip_filter_adaptive_prober The attached LIP filter prober.
    **/
   HashInnerJoinWorkOrder(
       const std::size_t query_id,
@@ -354,7 +359,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
+      StorageManager *storage_manager,
+      LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
       : WorkOrder(query_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
@@ -365,7 +371,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
 
   ~HashInnerJoinWorkOrder() override {}
 
@@ -392,6 +399,8 @@ class HashInnerJoinWorkOrder : public WorkOrder {
   InsertDestination *output_destination_;
   StorageManager *storage_manager_;
 
+  std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
   DISALLOW_COPY_AND_ASSIGN(HashInnerJoinWorkOrder);
 };
 
@@ -423,6 +432,7 @@ class HashSemiJoinWorkOrder : public WorkOrder {
    * @param hash_table The JoinHashTable to use.
    * @param output_destination The InsertDestination to insert the join results.
    * @param storage_manager The StorageManager to use.
+   * @param lip_filter_adaptive_prober The attached LIP filter prober.
    **/
   HashSemiJoinWorkOrder(
       const std::size_t query_id,
@@ -435,7 +445,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
+      StorageManager *storage_manager,
+      LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
       : WorkOrder(query_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
@@ -446,7 +457,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
 
   /**
    * @brief Constructor for the distributed version.
@@ -470,6 +482,7 @@ class HashSemiJoinWorkOrder : public WorkOrder {
    * @param hash_table The JoinHashTable to use.
    * @param output_destination The InsertDestination to insert the join results.
    * @param storage_manager The StorageManager to use.
+   * @param lip_filter_adaptive_prober The attached LIP filter prober.
    **/
   HashSemiJoinWorkOrder(
       const std::size_t query_id,
@@ -482,7 +495,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
+      StorageManager *storage_manager,
+      LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
       : WorkOrder(query_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
@@ -493,7 +507,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
 
   ~HashSemiJoinWorkOrder() override {}
 
@@ -516,6 +531,8 @@ class HashSemiJoinWorkOrder : public WorkOrder {
   InsertDestination *output_destination_;
   StorageManager *storage_manager_;
 
+  std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
   DISALLOW_COPY_AND_ASSIGN(HashSemiJoinWorkOrder);
 };
 
@@ -547,6 +564,7 @@ class HashAntiJoinWorkOrder : public WorkOrder {
    * @param hash_table The JoinHashTable to use.
    * @param output_destination The InsertDestination to insert the join results.
    * @param storage_manager The StorageManager to use.
+   * @param lip_filter_adaptive_prober The attached LIP filter prober.
    **/
   HashAntiJoinWorkOrder(
       const std::size_t query_id,
@@ -559,7 +577,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
+      StorageManager *storage_manager,
+      LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
       : WorkOrder(query_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
@@ -570,7 +589,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
 
   /**
    * @brief Constructor for the distributed version.
@@ -594,6 +614,7 @@ class HashAntiJoinWorkOrder : public WorkOrder {
    * @param hash_table The JoinHashTable to use.
    * @param output_destination The InsertDestination to insert the join results.
    * @param storage_manager The StorageManager to use.
+   * @param lip_filter_adaptive_prober The attached LIP filter prober.
    **/
   HashAntiJoinWorkOrder(
       const std::size_t query_id,
@@ -606,7 +627,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
+      StorageManager *storage_manager,
+      LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr)
       : WorkOrder(query_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
@@ -617,7 +639,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
 
   ~HashAntiJoinWorkOrder() override {}
 
@@ -646,6 +669,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
   InsertDestination *output_destination_;
   StorageManager *storage_manager_;
 
+  std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
   DISALLOW_COPY_AND_ASSIGN(HashAntiJoinWorkOrder);
 };
 
@@ -675,6 +700,7 @@ class HashOuterJoinWorkOrder : public WorkOrder {
    * @param hash_table The JoinHashTable to use.
    * @param output_destination The InsertDestination to insert the join results.
    * @param storage_manager The StorageManager to use.
+   * @param lip_filter_adaptive_prober The attached LIP filter prober.
    **/
   HashOuterJoinWorkOrder(
       const std::size_t query_id,
@@ -687,7 +713,8 @@ class HashOuterJoinWorkOrder : public WorkOrder {
       const std::vector<bool> &is_selection_on_build,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
+      StorageManager *storage_manager,
+      LIPFilterAdaptiveProber *lip_filter_adaptive_prober)
       : WorkOrder(query_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
@@ -698,7 +725,8 @@ class HashOuterJoinWorkOrder : public WorkOrder {
         is_selection_on_build_(is_selection_on_build),
         hash_table_(hash_table),
         output_destination_(output_destination),
-        storage_manager_(storage_manager) {}
+        storage_manager_(storage_manager),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
 
   /**
    * @brief Constructor for the distributed version.
@@ -733,7 +761,8 @@ class HashOuterJoinWorkOrder : public WorkOrder {
       std::vector<bool> &&is_selection_on_build,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
+      StorageManager *storage_manager,
+      LIPFilterAdaptiveProber *lip_filter_adaptive_prober)
       : WorkOrder(query_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
@@ -744,7 +773,8 @@ class HashOuterJoinWorkOrder : public WorkOrder {
         is_selection_on_build_(std::move(is_selection_on_build)),
         hash_table_(hash_table),
         output_destination_(output_destination),
-        storage_manager_(storage_manager) {}
+        storage_manager_(storage_manager),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}
 
   ~HashOuterJoinWorkOrder() override {}
 
@@ -763,6 +793,8 @@ class HashOuterJoinWorkOrder : public WorkOrder {
   InsertDestination *output_destination_;
   StorageManager *storage_manager_;
 
+  std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
   DISALLOW_COPY_AND_ASSIGN(HashOuterJoinWorkOrder);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/SelectOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp
index d56326e..236ee7c 100644
--- a/relational_operators/SelectOperator.cpp
+++ b/relational_operators/SelectOperator.cpp
@@ -30,6 +30,10 @@
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+#include "utility/lip_filter/LIPFilterUtil.hpp"
 
 #include "glog/logging.h"
 
@@ -40,22 +44,26 @@ namespace quickstep {
 class Predicate;
 
 void SelectOperator::addWorkOrders(WorkOrdersContainer *container,
+                                   QueryContext *query_context,
                                    StorageManager *storage_manager,
                                    const Predicate *predicate,
                                    const std::vector<std::unique_ptr<const Scalar>> *selection,
                                    InsertDestination *output_destination) {
   if (input_relation_is_stored_) {
     for (const block_id input_block_id : input_relation_block_ids_) {
-      container->addNormalWorkOrder(new SelectWorkOrder(query_id_,
-                                                        input_relation_,
-                                                        input_block_id,
-                                                        predicate,
-                                                        simple_projection_,
-                                                        simple_selection_,
-                                                        selection,
-                                                        output_destination,
-                                                        storage_manager),
-                                    op_index_);
+      container->addNormalWorkOrder(
+          new SelectWorkOrder(
+              query_id_,
+              input_relation_,
+              input_block_id,
+              predicate,
+              simple_projection_,
+              simple_selection_,
+              selection,
+              output_destination,
+              storage_manager,
+              CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
+          op_index_);
     }
   } else {
     while (num_workorders_generated_ < input_relation_block_ids_.size()) {
@@ -69,7 +77,8 @@ void SelectOperator::addWorkOrders(WorkOrdersContainer *container,
               simple_selection_,
               selection,
               output_destination,
-              storage_manager),
+              storage_manager,
+              CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
           op_index_);
       ++num_workorders_generated_;
     }
@@ -78,6 +87,7 @@ void SelectOperator::addWorkOrders(WorkOrdersContainer *container,
 
 #ifdef QUICKSTEP_HAVE_LIBNUMA
 void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
+                                                 QueryContext *query_context,
                                                  StorageManager *storage_manager,
                                                  const Predicate *predicate,
                                                  const std::vector<std::unique_ptr<const Scalar>> *selection,
@@ -99,6 +109,7 @@ void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
                 selection,
                 output_destination,
                 storage_manager,
+                CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context),
                 placement_scheme_->getNUMANodeForBlock(input_block_id)),
             op_index_);
       }
@@ -120,6 +131,7 @@ void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
                 selection,
                 output_destination,
                 storage_manager,
+                CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context),
                 placement_scheme_->getNUMANodeForBlock(block_in_partition)),
             op_index_);
         ++num_workorders_generated_in_partition_[part_id];
@@ -151,11 +163,21 @@ bool SelectOperator::getAllWorkOrders(
       if (input_relation_.hasPartitionScheme()) {
 #ifdef QUICKSTEP_HAVE_LIBNUMA
         if (input_relation_.hasNUMAPlacementScheme()) {
-          addPartitionAwareWorkOrders(container, storage_manager, predicate, selection, output_destination);
+          addPartitionAwareWorkOrders(container,
+                                      query_context,
+                                      storage_manager,
+                                      predicate,
+                                      selection,
+                                      output_destination);
         }
 #endif
       } else {
-        addWorkOrders(container, storage_manager, predicate, selection, output_destination);
+        addWorkOrders(container,
+                      query_context,
+                      storage_manager,
+                      predicate,
+                      selection,
+                      output_destination);
       }
       started_ = true;
     }
@@ -164,11 +186,21 @@ bool SelectOperator::getAllWorkOrders(
     if (input_relation_.hasPartitionScheme()) {
 #ifdef QUICKSTEP_HAVE_LIBNUMA
         if (input_relation_.hasNUMAPlacementScheme()) {
-          addPartitionAwareWorkOrders(container, storage_manager, predicate, selection, output_destination);
+          addPartitionAwareWorkOrders(container,
+                                      query_context,
+                                      storage_manager,
+                                      predicate,
+                                      selection,
+                                      output_destination);
         }
 #endif
     } else {
-        addWorkOrders(container, storage_manager, predicate, selection, output_destination);
+        addWorkOrders(container,
+                      query_context,
+                      storage_manager,
+                      predicate,
+                      selection,
+                      output_destination);
     }
     return done_feeding_input_relation_;
   }
@@ -210,22 +242,41 @@ serialization::WorkOrder* SelectOperator::createWorkOrderProto(const block_id bl
     }
   }
   proto->SetExtension(serialization::SelectWorkOrder::selection_index, selection_index_);
+  proto->SetExtension(serialization::SelectWorkOrder::lip_deployment_index, lip_deployment_index_);
 
   return proto;
 }
 
-
 void SelectWorkOrder::execute() {
   BlockReference block(
       storage_manager_->getBlock(input_block_id_, input_relation_, getPreferredNUMANodes()[0]));
 
+  // NOTE(jianqiao): For SSB and TPCH queries, experiments show that it is almost
+  // always better to apply the predicate BEFORE the LIPFilters. But as a future
+  // work this ordering may even be adaptive.
+  std::unique_ptr<TupleIdSequence> predicate_matches;
+  if (predicate_ != nullptr) {
+    predicate_matches.reset(
+        block->getMatchesForPredicate(predicate_));
+  }
+
+  std::unique_ptr<TupleIdSequence> matches;
+  if (lip_filter_adaptive_prober_ != nullptr) {
+    std::unique_ptr<ValueAccessor> accessor(
+        block->getTupleStorageSubBlock().createValueAccessor(predicate_matches.get()));
+    matches.reset(
+        lip_filter_adaptive_prober_->filterValueAccessor(accessor.get()));
+  } else {
+    matches.reset(predicate_matches.release());
+  }
+
   if (simple_projection_) {
     block->selectSimple(simple_selection_,
-                        predicate_,
+                        matches.get(),
                         output_destination_);
   } else {
     block->select(*DCHECK_NOTNULL(selection_),
-                  predicate_,
+                  matches.get(),
                   output_destination_);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index 0f5c712..2ace458 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -38,6 +38,7 @@
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
 
 #include "glog/logging.h"
 
@@ -49,6 +50,7 @@ namespace quickstep {
 
 class CatalogRelationSchema;
 class InsertDestination;
+class LIPFilterDeployment;
 class Predicate;
 class Scalar;
 class StorageManager;
@@ -247,12 +249,14 @@ class SelectOperator : public RelationalOperator {
   }
 
   void addWorkOrders(WorkOrdersContainer *container,
+                     QueryContext *query_context,
                      StorageManager *storage_manager,
                      const Predicate *predicate,
                      const std::vector<std::unique_ptr<const Scalar>> *selection,
                      InsertDestination *output_destination);
 
   void addPartitionAwareWorkOrders(WorkOrdersContainer *container,
+                                   QueryContext *query_context,
                                    StorageManager *storage_manager,
                                    const Predicate *predicate,
                                    const std::vector<std::unique_ptr<const Scalar>> *selection,
@@ -318,6 +322,7 @@ class SelectWorkOrder : public WorkOrder {
    * @param output_destination The InsertDestination to insert the selection
    *        results.
    * @param storage_manager The StorageManager to use.
+   * @param lip_filter_adaptive_prober The attached LIP filter prober.
    **/
   SelectWorkOrder(const std::size_t query_id,
                   const CatalogRelationSchema &input_relation,
@@ -328,6 +333,7 @@ class SelectWorkOrder : public WorkOrder {
                   const std::vector<std::unique_ptr<const Scalar>> *selection,
                   InsertDestination *output_destination,
                   StorageManager *storage_manager,
+                  LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr,
                   const numa_node_id numa_node = 0)
       : WorkOrder(query_id),
         input_relation_(input_relation),
@@ -337,7 +343,8 @@ class SelectWorkOrder : public WorkOrder {
         simple_selection_(simple_selection),
         selection_(selection),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {
     preferred_numa_nodes_.push_back(numa_node);
   }
 
@@ -360,6 +367,7 @@ class SelectWorkOrder : public WorkOrder {
    * @param output_destination The InsertDestination to insert the selection
    *        results.
    * @param storage_manager The StorageManager to use.
+   * @param lip_filter_adaptive_prober The attached LIP filter prober.
    **/
   SelectWorkOrder(const std::size_t query_id,
                   const CatalogRelationSchema &input_relation,
@@ -370,6 +378,7 @@ class SelectWorkOrder : public WorkOrder {
                   const std::vector<std::unique_ptr<const Scalar>> *selection,
                   InsertDestination *output_destination,
                   StorageManager *storage_manager,
+                  LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr,
                   const numa_node_id numa_node = 0)
       : WorkOrder(query_id),
         input_relation_(input_relation),
@@ -379,7 +388,8 @@ class SelectWorkOrder : public WorkOrder {
         simple_selection_(std::move(simple_selection)),
         selection_(selection),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {
     preferred_numa_nodes_.push_back(numa_node);
   }
 
@@ -407,6 +417,8 @@ class SelectWorkOrder : public WorkOrder {
   InsertDestination *output_destination_;
   StorageManager *storage_manager_;
 
+  std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+
   DISALLOW_COPY_AND_ASSIGN(SelectWorkOrder);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 3eed379..86f34b8 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -59,6 +59,7 @@ message AggregationWorkOrder {
     // All required.
     optional uint32 aggr_state_index = 16;
     optional fixed64 block_id = 17;
+    optional int32 lip_deployment_index = 18;
   }
 }
 
@@ -70,6 +71,7 @@ message BuildHashWorkOrder {
     optional bool any_join_key_attributes_nullable = 34;
     optional uint32 join_hash_table_index = 35;
     optional fixed64 block_id = 36;
+    optional int32 lip_deployment_index = 37;
   }
 }
 
@@ -131,6 +133,8 @@ message HashJoinWorkOrder {
     optional int32 residual_predicate_index = 169;
     // Used by HashOuterJoinWorkOrder only.
     repeated bool is_selection_on_build = 170;
+
+    optional int32 lip_deployment_index = 171;
   }
 }
 
@@ -185,9 +189,10 @@ message SelectWorkOrder {
 
     // When 'simple_projection' is true.
     repeated int32 simple_selection = 245;
-
     // Otherwise.
     optional int32 selection_index = 246;
+
+    optional int32 lip_deployment_index = 247;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 2356bab..91a717f 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -50,6 +50,7 @@
 #include "relational_operators/WindowAggregationOperator.hpp"
 #include "relational_operators/WorkOrder.pb.h"
 #include "storage/StorageBlockInfo.hpp"
+#include "utility/lip_filter/LIPFilterUtil.hpp"
 
 #include "glog/logging.h"
 
@@ -61,6 +62,7 @@ using std::vector;
 namespace quickstep {
 
 class InsertDestination;
+class LIPFilterAdaptiveProber;
 class Predicate;
 class Scalar;
 
@@ -82,7 +84,9 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           proto.query_id(),
           proto.GetExtension(serialization::AggregationWorkOrder::block_id),
           query_context->getAggregationState(
-              proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)));
+              proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)),
+          CreateLIPFilterAdaptiveProberHelper(
+              proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index), query_context));
     }
     case serialization::BUILD_HASH: {
       LOG(INFO) << "Creating BuildHashWorkOrder";
@@ -101,7 +105,9 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           proto.GetExtension(serialization::BuildHashWorkOrder::block_id),
           query_context->getJoinHashTable(
               proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index)),
-          storage_manager);
+          storage_manager,
+          CreateLIPFilterBuilderHelper(
+              proto.GetExtension(serialization::BuildHashWorkOrder::lip_deployment_index), query_context));
     }
     case serialization::DELETE: {
       LOG(INFO) << "Creating DeleteWorkOrder";
@@ -200,6 +206,9 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       InsertDestination *output_destination =
           query_context->getInsertDestination(
               proto.GetExtension(serialization::HashJoinWorkOrder::insert_destination_index));
+      LIPFilterAdaptiveProber *lip_filter_adaptive_prober =
+          CreateLIPFilterAdaptiveProberHelper(
+              proto.GetExtension(serialization::HashJoinWorkOrder::lip_deployment_index), query_context);
 
       switch (hash_join_work_order_type) {
         case serialization::HashJoinWorkOrder::HASH_ANTI_JOIN: {
@@ -215,7 +224,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               selection,
               hash_table,
               output_destination,
-              storage_manager);
+              storage_manager,
+              lip_filter_adaptive_prober);
         }
         case serialization::HashJoinWorkOrder::HASH_INNER_JOIN: {
           LOG(INFO) << "Creating HashInnerJoinWorkOrder";
@@ -230,7 +240,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               selection,
               hash_table,
               output_destination,
-              storage_manager);
+              storage_manager,
+              lip_filter_adaptive_prober);
         }
         case serialization::HashJoinWorkOrder::HASH_OUTER_JOIN: {
           vector<bool> is_selection_on_build;
@@ -253,7 +264,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               move(is_selection_on_build),
               hash_table,
               output_destination,
-              storage_manager);
+              storage_manager,
+              lip_filter_adaptive_prober);
         }
         case serialization::HashJoinWorkOrder::HASH_SEMI_JOIN: {
           LOG(INFO) << "Creating HashSemiJoinWorkOrder";
@@ -268,7 +280,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               selection,
               hash_table,
               output_destination,
-              storage_manager);
+              storage_manager,
+              lip_filter_adaptive_prober);
         }
         default:
           LOG(FATAL) << "Unknown HashJoinWorkOrder Type in WorkOrderFactory::ReconstructFromProto";
@@ -346,7 +359,9 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
                                   proto.GetExtension(serialization::SelectWorkOrder::selection_index)),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::SelectWorkOrder::insert_destination_index)),
-          storage_manager);
+          storage_manager,
+          CreateLIPFilterAdaptiveProberHelper(
+              proto.GetExtension(serialization::HashJoinWorkOrder::lip_deployment_index), query_context));
     }
     case serialization::SORT_MERGE_RUN: {
       LOG(INFO) << "Creating SortMergeRunWorkOrder";
@@ -459,6 +474,17 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
 
   switch (proto.work_order_type()) {
     case serialization::AGGREGATION: {
+      if (!proto.HasExtension(serialization::AggregationWorkOrder::lip_deployment_index)) {
+        return false;
+      } else {
+        const QueryContext::lip_deployment_id lip_deployment_index =
+            proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index);
+        if (lip_deployment_index != QueryContext::kInvalidLIPDeploymentId &&
+            !query_context.isValidLIPDeploymentId(lip_deployment_index)) {
+          return false;
+        }
+      }
+
       return proto.HasExtension(serialization::AggregationWorkOrder::block_id) &&
              proto.HasExtension(serialization::AggregationWorkOrder::aggr_state_index) &&
              query_context.isValidAggregationStateId(
@@ -482,6 +508,17 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
         }
       }
 
+      if (!proto.HasExtension(serialization::BuildHashWorkOrder::lip_deployment_index)) {
+        return false;
+      } else {
+        const QueryContext::lip_deployment_id lip_deployment_index =
+            proto.GetExtension(serialization::BuildHashWorkOrder::lip_deployment_index);
+        if (lip_deployment_index != QueryContext::kInvalidLIPDeploymentId &&
+            !query_context.isValidLIPDeploymentId(lip_deployment_index)) {
+          return false;
+        }
+      }
+
       return proto.HasExtension(serialization::BuildHashWorkOrder::any_join_key_attributes_nullable) &&
              proto.HasExtension(serialization::BuildHashWorkOrder::block_id) &&
              proto.HasExtension(serialization::BuildHashWorkOrder::join_hash_table_index) &&
@@ -556,6 +593,17 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
         }
       }
 
+      if (!proto.HasExtension(serialization::HashJoinWorkOrder::lip_deployment_index)) {
+        return false;
+      } else {
+        const QueryContext::lip_deployment_id lip_deployment_index =
+            proto.GetExtension(serialization::HashJoinWorkOrder::lip_deployment_index);
+        if (lip_deployment_index != QueryContext::kInvalidLIPDeploymentId &&
+            !query_context.isValidLIPDeploymentId(lip_deployment_index)) {
+          return false;
+        }
+      }
+
       if (hash_join_work_order_type == serialization::HashJoinWorkOrder::HASH_OUTER_JOIN) {
         if (!proto.HasExtension(serialization::HashJoinWorkOrder::is_selection_on_build)) {
           return false;
@@ -655,6 +703,17 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
         return false;
       }
 
+      if (!proto.HasExtension(serialization::SelectWorkOrder::lip_deployment_index)) {
+        return false;
+      } else {
+        const QueryContext::lip_deployment_id lip_deployment_index =
+            proto.GetExtension(serialization::SelectWorkOrder::lip_deployment_index);
+        if (lip_deployment_index != QueryContext::kInvalidLIPDeploymentId &&
+            !query_context.isValidLIPDeploymentId(lip_deployment_index)) {
+          return false;
+        }
+      }
+
       return proto.HasExtension(serialization::SelectWorkOrder::insert_destination_index) &&
              query_context.isValidInsertDestinationId(
                  proto.GetExtension(serialization::SelectWorkOrder::insert_destination_index)) &&

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 249026d..eb7ca79 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -46,10 +46,13 @@
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/ValueAccessor.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "types/containers/Tuple.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
 
 #include "glog/logging.h"
 
@@ -331,11 +334,12 @@ bool AggregationOperationState::ProtoIsValid(
   return true;
 }
 
-void AggregationOperationState::aggregateBlock(const block_id input_block) {
+void AggregationOperationState::aggregateBlock(const block_id input_block,
+                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
   if (group_by_list_.empty()) {
     aggregateBlockSingleState(input_block);
   } else {
-    aggregateBlockHashTable(input_block);
+    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
   }
 }
 
@@ -367,10 +371,13 @@ void AggregationOperationState::aggregateBlockSingleState(
   BlockReference block(
       storage_manager_->getBlock(input_block, input_relation_));
 
-  // If there is a filter predicate, 'reuse_matches' holds the set of matching
-  // tuples so that it can be reused across multiple aggregates (i.e. we only
-  // pay the cost of evaluating the predicate once).
-  std::unique_ptr<TupleIdSequence> reuse_matches;
+  std::unique_ptr<TupleIdSequence> matches;
+  if (predicate_ != nullptr) {
+    std::unique_ptr<ValueAccessor> accessor(
+        block->getTupleStorageSubBlock().createValueAccessor());
+    matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get()));
+  }
+
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -387,9 +394,8 @@ void AggregationOperationState::aggregateBlockSingleState(
                                arguments_[agg_idx],
                                local_arguments_as_attributes,
                                {}, /* group_by */
-                               predicate_.get(),
+                               matches.get(),
                                distinctify_hashtables_[agg_idx].get(),
-                               &reuse_matches,
                                nullptr /* reuse_group_by_vectors */);
       local_state.emplace_back(nullptr);
     } else {
@@ -397,8 +403,7 @@ void AggregationOperationState::aggregateBlockSingleState(
       local_state.emplace_back(block->aggregate(*handles_[agg_idx],
                                                 arguments_[agg_idx],
                                                 local_arguments_as_attributes,
-                                                predicate_.get(),
-                                                &reuse_matches));
+                                                matches.get()));
     }
   }
 
@@ -407,14 +412,22 @@ void AggregationOperationState::aggregateBlockSingleState(
 }
 
 void AggregationOperationState::aggregateBlockHashTable(
-    const block_id input_block) {
+    const block_id input_block,
+    LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
   BlockReference block(
       storage_manager_->getBlock(input_block, input_relation_));
 
-  // If there is a filter predicate, 'reuse_matches' holds the set of matching
-  // tuples so that it can be reused across multiple aggregates (i.e. we only
-  // pay the cost of evaluating the predicate once).
-  std::unique_ptr<TupleIdSequence> reuse_matches;
+  // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
+  // as the existence map for the tuples.
+  std::unique_ptr<TupleIdSequence> matches;
+  if (predicate_ != nullptr) {
+    matches.reset(block->getMatchesForPredicate(predicate_.get()));
+  }
+  if (lip_filter_adaptive_prober != nullptr) {
+    std::unique_ptr<ValueAccessor> accessor(
+        block->getTupleStorageSubBlock().createValueAccessor(matches.get()));
+    matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get()));
+  }
 
   // This holds values of all the GROUP BY attributes so that the can be reused
   // across multiple aggregates (i.e. we only pay the cost of evaluatin the
@@ -431,9 +444,8 @@ void AggregationOperationState::aggregateBlockHashTable(
                                arguments_[agg_idx],
                                nullptr, /* arguments_as_attributes */
                                group_by_list_,
-                               predicate_.get(),
+                               matches.get(),
                                distinctify_hashtables_[agg_idx].get(),
-                               &reuse_matches,
                                &reuse_group_by_vectors);
     }
   }
@@ -447,9 +459,8 @@ void AggregationOperationState::aggregateBlockHashTable(
   DCHECK(agg_hash_table != nullptr);
   block->aggregateGroupBy(arguments_,
                           group_by_list_,
-                          predicate_.get(),
+                          matches.get(),
                           agg_hash_table,
-                          &reuse_matches,
                           &reuse_group_by_vectors);
   group_by_hashtable_pool_->returnHashTable(agg_hash_table);
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 3b0f286..c251983 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -41,6 +41,7 @@ class AggregateFunction;
 class CatalogDatabaseLite;
 class CatalogRelationSchema;
 class InsertDestination;
+class LIPFilterAdaptiveProber;
 class StorageManager;
 
 /** \addtogroup Storage
@@ -155,8 +156,11 @@ class AggregationOperationState {
    *
    * @param input_block The block ID of the storage block where the aggreates
    *        are going to be computed.
+   * @param lip_filter_adaptive_prober The LIPFilter prober for pre-filtering
+   *        the block.
    **/
-  void aggregateBlock(const block_id input_block);
+  void aggregateBlock(const block_id input_block,
+                      LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr);
 
   /**
    * @brief Generate the final results for the aggregates managed by this
@@ -185,7 +189,8 @@ class AggregationOperationState {
 
   // Aggregate on input block.
   void aggregateBlockSingleState(const block_id input_block);
-  void aggregateBlockHashTable(const block_id input_block);
+  void aggregateBlockHashTable(const block_id input_block,
+                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
 
   void finalizeSingleState(InsertDestination *output_destination);
   void finalizeHashTable(InsertDestination *output_destination);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 325a7cb..0e32cc1 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -292,11 +292,14 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
+                      quickstep_storage_TupleIdSequence
+                      quickstep_storage_ValueAccessor
                       quickstep_types_TypedValue
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_types_containers_Tuple
-                      quickstep_utility_Macros)
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilterAdaptiveProber)
 target_link_libraries(quickstep_storage_AggregationOperationState_proto
                       quickstep_expressions_Expressions_proto
                       quickstep_expressions_aggregation_AggregateFunction_proto


[3/3] incubator-quickstep git commit: Support for performing partitioned aggregation.

Posted by hb...@apache.org.
Support for performing partitioned aggregation.

- Used for creating a pool of hash tables such that each hash table
  belongs to a unique partition.
- The partitioning is done on the group-by keys.
- Wrote a utility function to compute composite hash of a group of
  TypedValues.
- Added a check for whether the aggregation is partitioned or not.
- The conditions for whether the aggregation can be partitioned
  are as follows:
  1. The query has a GROUP BY clause.
  2. There are no aggrgeations with a DISTINCT clause.
  3. The estimated number of groups are greater than a pre-defined
  threshold.
  4. The query has at least one aggregation function.
- Method for partitioned aggregation with GROUP BY
- StorageBlock now provides a method for performing GROUP BY aggregation
  in a partitioned way.
- The Tuple class now supports a method to compute the hash of the entire
  tuple (i.e. hash key is the composite key made up of all the
  attributes in the tuple).
- AggregationOperationState calls appropriate method (i.e.
  aggregateGroupBy or aggregateGroupByPartitioned) based on the way in
  which aggregation is being performed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1e1434ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1e1434ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1e1434ad

Branch: refs/heads/partitioned-aggregate-new
Commit: 1e1434ad41db59009b4f30578f3832ff62799b35
Parents: 393eba5
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Wed Oct 26 09:09:52 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Wed Oct 26 09:10:09 2016 -0500

----------------------------------------------------------------------
 .../FinalizeAggregationOperator.cpp             |  30 ++-
 .../FinalizeAggregationOperator.hpp             |   9 +-
 storage/AggregationOperationState.cpp           | 195 +++++++++++++++---
 storage/AggregationOperationState.hpp           |  74 ++++++-
 storage/CMakeLists.txt                          |  12 ++
 storage/HashTablePool.hpp                       |   2 +-
 storage/PartitionedHashTablePool.hpp            | 204 +++++++++++++++++++
 storage/StorageBlock.cpp                        |  55 +++++
 storage/StorageBlock.hpp                        |  50 +++++
 types/containers/CMakeLists.txt                 |   1 +
 types/containers/Tuple.hpp                      |   8 +
 utility/CMakeLists.txt                          |   6 +
 utility/CompositeHash.hpp                       |  52 +++++
 13 files changed, 657 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 7e337de..0cbf635 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -41,16 +41,27 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
 
   if (blocking_dependencies_met_ && !started_) {
     started_ = true;
-    container->addNormalWorkOrder(
-        new FinalizeAggregationWorkOrder(
-            query_id_,
-            query_context->getAggregationState(aggr_state_index_),
-            query_context->getInsertDestination(output_destination_index_)),
-        op_index_);
+    AggregationOperationState *agg_state =
+        query_context->getAggregationState(aggr_state_index_);
+    DCHECK(agg_state != nullptr);
+    for (int part_id = 0;
+         part_id < static_cast<int>(agg_state->getNumPartitions());
+         ++part_id) {
+      container->addNormalWorkOrder(
+          new FinalizeAggregationWorkOrder(
+              query_id_,
+              agg_state,
+              query_context->getInsertDestination(output_destination_index_),
+              part_id),
+          op_index_);
+    }
   }
   return started_;
 }
 
+// TODO(quickstep-team) : Think about how the number of partitions could be
+// accessed in this function. Until then, we can't use partitioned aggregation
+// with the distributed version.
 bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
   if (blocking_dependencies_met_ && !started_) {
     started_ = true;
@@ -68,9 +79,12 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
   return started_;
 }
 
-
 void FinalizeAggregationWorkOrder::execute() {
-  state_->finalizeAggregate(output_destination_);
+  if (state_->isAggregatePartitioned()) {
+    state_->finalizeAggregatePartitioned(part_id_, output_destination_);
+  } else {
+    state_->finalizeAggregate(output_destination_);
+  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index 0aeac2a..ae7127a 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -119,13 +119,17 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
    * @param state The AggregationState to use.
    * @param output_destination The InsertDestination to insert aggregation
    *        results.
+   * @param part_id The partition ID for which the Finalize aggregation work
+   *        order is issued. Ignore if aggregation is not partitioned.
    */
   FinalizeAggregationWorkOrder(const std::size_t query_id,
                                AggregationOperationState *state,
-                               InsertDestination *output_destination)
+                               InsertDestination *output_destination,
+                               const int part_id = -1)
       : WorkOrder(query_id),
         state_(DCHECK_NOTNULL(state)),
-        output_destination_(DCHECK_NOTNULL(output_destination)) {}
+        output_destination_(DCHECK_NOTNULL(output_destination)),
+        part_id_(part_id) {}
 
   ~FinalizeAggregationWorkOrder() override {}
 
@@ -134,6 +138,7 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
  private:
   AggregationOperationState *state_;
   InsertDestination *output_destination_;
+  const int part_id_;
 
   DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index eb7ca79..b942c1b 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -60,6 +60,14 @@ using std::unique_ptr;
 
 namespace quickstep {
 
+DEFINE_int32(num_aggregation_partitions,
+             41,
+             "The number of partitions used for performing the aggregation");
+DEFINE_int32(partition_aggregation_num_groups_threshold,
+             500000,
+             "The threshold used for deciding whether the aggregation is done "
+             "in a partitioned way or not");
+
 AggregationOperationState::AggregationOperationState(
     const CatalogRelationSchema &input_relation,
     const std::vector<const AggregateFunction *> &aggregate_functions,
@@ -72,6 +80,8 @@ AggregationOperationState::AggregationOperationState(
     const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
     StorageManager *storage_manager)
     : input_relation_(input_relation),
+      is_aggregate_partitioned_(checkAggregatePartitioned(
+          estimated_num_entries, is_distinct, group_by, aggregate_functions)),
       predicate_(predicate),
       group_by_list_(std::move(group_by)),
       arguments_(std::move(arguments)),
@@ -175,11 +185,10 @@ AggregationOperationState::AggregationOperationState(
         key_types.insert(
             key_types.end(), argument_types.begin(), argument_types.end());
         // TODO(jianqiao): estimated_num_entries is quite inaccurate for
-        // estimating
-        // the number of entries in the distinctify hash table. We may estimate
-        // for each distinct aggregation an estimated_num_distinct_keys value
-        // during
-        // query optimization, if it worths.
+        // estimating the number of entries in the distinctify hash table.
+        // We may estimate for each distinct aggregation an
+        // estimated_num_distinct_keys value during query optimization, if it's
+        // worth.
         distinctify_hashtables_.emplace_back(
             AggregationStateFastHashTableFactory::CreateResizable(
                 *distinctify_hash_table_impl_types_it,
@@ -195,14 +204,24 @@ AggregationOperationState::AggregationOperationState(
     }
 
     if (!group_by_handles.empty()) {
-      // Aggregation with GROUP BY: create a HashTable pool for per-group
-      // states.
-      group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
-                                                       hash_table_impl_type,
-                                                       group_by_types,
-                                                       payload_sizes,
-                                                       group_by_handles,
-                                                       storage_manager));
+      // Aggregation with GROUP BY: create a HashTable pool.
+      if (!is_aggregate_partitioned_) {
+        group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+                                                         hash_table_impl_type,
+                                                         group_by_types,
+                                                         payload_sizes,
+                                                         group_by_handles,
+                                                         storage_manager));
+      } else {
+        partitioned_group_by_hashtable_pool_.reset(
+            new PartitionedHashTablePool(estimated_num_entries,
+                                         FLAGS_num_aggregation_partitions,
+                                         hash_table_impl_type,
+                                         group_by_types,
+                                         payload_sizes,
+                                         group_by_handles,
+                                         storage_manager));
+      }
     }
   }
 }
@@ -450,19 +469,71 @@ void AggregationOperationState::aggregateBlockHashTable(
     }
   }
 
-  // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
-  // directly into the (threadsafe) shared global HashTable for this
-  // aggregate.
-  DCHECK(group_by_hashtable_pool_ != nullptr);
-  AggregationStateHashTableBase *agg_hash_table =
+  if (!is_aggregate_partitioned_) {
+    // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
+    // directly into the (threadsafe) shared global HashTable for this
+    // aggregate.
+    DCHECK(group_by_hashtable_pool_ != nullptr);
+    AggregationStateHashTableBase *agg_hash_table =
       group_by_hashtable_pool_->getHashTableFast();
-  DCHECK(agg_hash_table != nullptr);
-  block->aggregateGroupBy(arguments_,
-                          group_by_list_,
-                          matches.get(),
-                          agg_hash_table,
-                          &reuse_group_by_vectors);
-  group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+    DCHECK(agg_hash_table != nullptr);
+    block->aggregateGroupBy(arguments_,
+                            group_by_list_,
+                            matches.get(),
+                            agg_hash_table,
+                            &reuse_group_by_vectors);
+    group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+  } else {
+    ColumnVectorsValueAccessor temp_result;
+    // IDs of 'arguments' as attributes in the ValueAccessor we create below.
+    std::vector<attribute_id> argument_ids;
+
+    // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
+    std::vector<attribute_id> key_ids;
+    const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
+    block->aggregateGroupByPartitioned(
+        arguments_,
+        group_by_list_,
+        matches.get(),
+        num_partitions,
+        &temp_result,
+        &argument_ids,
+        &key_ids,
+        &reuse_group_by_vectors);
+    // Compute the partitions for the tuple formed by group by values.
+    std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
+    partition_membership.resize(num_partitions);
+
+    // Create a tuple-id sequence for each partition.
+    for (std::size_t partition = 0; partition < num_partitions; ++partition) {
+      partition_membership[partition].reset(
+          new TupleIdSequence(temp_result.getEndPosition()));
+    }
+
+    // Iterate over ValueAccessor for each tuple,
+    // set a bit in the appropriate TupleIdSequence.
+    temp_result.beginIteration();
+    while (temp_result.next()) {
+      // We need a unique_ptr because getTupleWithAttributes() uses "new".
+      std::unique_ptr<Tuple> curr_tuple(temp_result.getTupleWithAttributes(key_ids));
+      const std::size_t curr_tuple_partition_id =
+          curr_tuple->getTupleHash() % num_partitions;
+      partition_membership[curr_tuple_partition_id]->set(
+          temp_result.getCurrentPosition(), true);
+    }
+    // For each partition, create an adapter around Value Accessor and
+    // TupleIdSequence.
+    std::vector<std::unique_ptr<
+        TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
+    adapter.resize(num_partitions);
+    for (std::size_t partition = 0; partition < num_partitions; ++partition) {
+      adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
+          *(partition_membership)[partition]));
+      partitioned_group_by_hashtable_pool_->getHashTable(partition)
+          ->upsertValueAccessorCompositeKeyFast(
+              argument_ids, adapter[partition].get(), key_ids, true);
+    }
+  }
 }
 
 void AggregationOperationState::finalizeSingleState(
@@ -606,13 +677,81 @@ void AggregationOperationState::finalizeHashTable(
 }
 
 void AggregationOperationState::destroyAggregationHashTablePayload() {
-  if (group_by_hashtable_pool_ != nullptr) {
-    auto all_hash_tables = group_by_hashtable_pool_->getAllHashTables();
-    DCHECK(all_hash_tables != nullptr);
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>> *all_hash_tables =
+      nullptr;
+  if (!is_aggregate_partitioned_) {
+    if (group_by_hashtable_pool_ != nullptr) {
+      all_hash_tables = group_by_hashtable_pool_->getAllHashTables();
+    }
+  } else {
+    if (partitioned_group_by_hashtable_pool_ != nullptr) {
+      all_hash_tables = partitioned_group_by_hashtable_pool_->getAllHashTables();
+    }
+  }
+  if (all_hash_tables != nullptr) {
     for (std::size_t ht_index = 0; ht_index < all_hash_tables->size(); ++ht_index) {
       (*all_hash_tables)[ht_index]->destroyPayload();
     }
   }
 }
 
+void AggregationOperationState::finalizeAggregatePartitioned(
+    const std::size_t partition_id, InsertDestination *output_destination) {
+  // Each element of 'group_by_keys' is a vector of values for a particular
+  // group (which is also the prefix of the finalized Tuple for that group).
+  std::vector<std::vector<TypedValue>> group_by_keys;
+
+  // Collect per-aggregate finalized values.
+  std::vector<std::unique_ptr<ColumnVector>> final_values;
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+    AggregationStateHashTableBase *hash_table =
+        partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
+    ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
+        *hash_table, &group_by_keys, agg_idx);
+    if (agg_result_col != nullptr) {
+      final_values.emplace_back(agg_result_col);
+    }
+  }
+
+  // Reorganize 'group_by_keys' in column-major order so that we can make a
+  // ColumnVectorsValueAccessor to bulk-insert results.
+  //
+  // TODO(chasseur): Shuffling around the GROUP BY keys like this is suboptimal
+  // if there is only one aggregate. The need to do this should hopefully go
+  // away when we work out storing composite structures for multiple aggregates
+  // in a single HashTable.
+  std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
+  std::size_t group_by_element_idx = 0;
+  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
+    const Type &group_by_type = group_by_element->getType();
+    if (NativeColumnVector::UsableForType(group_by_type)) {
+      NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size());
+      group_by_cvs.emplace_back(element_cv);
+      for (std::vector<TypedValue> &group_key : group_by_keys) {
+        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+      }
+    } else {
+      IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size());
+      group_by_cvs.emplace_back(element_cv);
+      for (std::vector<TypedValue> &group_key : group_by_keys) {
+        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+      }
+    }
+    ++group_by_element_idx;
+  }
+
+  // Stitch together a ColumnVectorsValueAccessor combining the GROUP BY keys
+  // and the finalized aggregates.
+  ColumnVectorsValueAccessor complete_result;
+  for (std::unique_ptr<ColumnVector> &group_by_cv : group_by_cvs) {
+    complete_result.addColumn(group_by_cv.release());
+  }
+  for (std::unique_ptr<ColumnVector> &final_value_cv : final_values) {
+    complete_result.addColumn(final_value_cv.release());
+  }
+
+  // Bulk-insert the complete result.
+  output_destination->bulkInsertTuples(&complete_result);
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index c251983..e0826b0 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -32,9 +32,12 @@
 #include "storage/AggregationOperationState.pb.h"
 #include "storage/HashTableBase.hpp"
 #include "storage/HashTablePool.hpp"
+#include "storage/PartitionedHashTablePool.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
 
+#include "gflags/gflags.h"
+
 namespace quickstep {
 
 class AggregateFunction;
@@ -44,6 +47,9 @@ class InsertDestination;
 class LIPFilterAdaptiveProber;
 class StorageManager;
 
+DECLARE_int32(num_aggregation_partitions);
+DECLARE_int32(partition_aggregation_num_groups_threshold);
+
 /** \addtogroup Storage
  *  @{
  */
@@ -176,9 +182,39 @@ class AggregationOperationState {
    **/
   void destroyAggregationHashTablePayload();
 
+  /**
+   * @brief Generate the final results for the aggregates managed by this
+   *        AggregationOperationState and write them out to StorageBlock(s).
+   *        In this implementation, each thread picks a hash table belonging to
+   *        a partition and writes its values to StorageBlock(s). There is no
+   *        need to merge multiple hash tables in one, because there is no
+   *        overlap in the keys across two hash tables.
+   *
+   * @param partition_id The ID of the partition for which finalize is being
+   *        performed.
+   * @param output_destination An InsertDestination where the finalized output
+   *        tuple(s) from this aggregate are to be written.
+   **/
+  void finalizeAggregatePartitioned(
+      const std::size_t partition_id, InsertDestination *output_destination);
+
   static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
                                      AggregationStateHashTableBase *dst);
 
+  bool isAggregatePartitioned() const {
+    return is_aggregate_partitioned_;
+  }
+
+  /**
+   * @brief Get the number of partitions to be used for the aggregation.
+   *        For non-partitioned aggregations, we return 1.
+   **/
+  std::size_t getNumPartitions() const {
+    return is_aggregate_partitioned_
+               ? partitioned_group_by_hashtable_pool_->getNumPartitions()
+               : 1;
+  }
+
   int dflag;
 
  private:
@@ -195,12 +231,41 @@ class AggregationOperationState {
   void finalizeSingleState(InsertDestination *output_destination);
   void finalizeHashTable(InsertDestination *output_destination);
 
-  // A vector of group by hash table pools.
-  std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
+  bool checkAggregatePartitioned(
+      const std::size_t estimated_num_groups,
+      const std::vector<bool> &is_distinct,
+      const std::vector<std::unique_ptr<const Scalar>> &group_by,
+      const std::vector<const AggregateFunction *> &aggregate_functions) const {
+    // If there's no aggregation, return false.
+    if (aggregate_functions.empty()) {
+      return false;
+    }
+    // Check if there's a distinct operation involved in any aggregate, if so
+    // the aggregate can't be partitioned.
+    for (auto distinct : is_distinct) {
+      if (distinct) {
+        return false;
+      }
+    }
+    // There's no distinct aggregation involved, Check if there's at least one
+    // GROUP BY operation.
+    if (group_by.empty()) {
+      return false;
+    }
+    // There are GROUP BYs without DISTINCT. Check if the estimated number of
+    // groups is large enough to warrant a partitioned aggregation.
+    return estimated_num_groups >
+           static_cast<std::size_t>(
+               FLAGS_partition_aggregation_num_groups_threshold);
+  }
 
   // Common state for all aggregates in this operation: the input relation, the
   // filter predicate (if any), and the list of GROUP BY expressions (if any).
   const CatalogRelationSchema &input_relation_;
+
+  // Whether the aggregation is partitioned or not.
+  const bool is_aggregate_partitioned_;
+
   std::unique_ptr<const Predicate> predicate_;
   std::vector<std::unique_ptr<const Scalar>> group_by_list_;
 
@@ -233,6 +298,11 @@ class AggregationOperationState {
   std::vector<std::unique_ptr<AggregationStateHashTableBase>>
       group_by_hashtables_;
 
+  // A vector of group by hash table pools.
+  std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
+
+  std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
+
   StorageManager *storage_manager_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 0e32cc1..be60662 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -235,6 +235,7 @@ add_library(quickstep_storage_PackedRowStoreTupleStorageSubBlock
 add_library(quickstep_storage_PackedRowStoreValueAccessor
             ../empty_src.cpp
             PackedRowStoreValueAccessor.hpp)
+add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp PartitionedHashTablePool.hpp)
 add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp PreloaderThread.hpp)
 add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp SMAIndexSubBlock.hpp)
 add_library(quickstep_storage_SeparateChainingHashTable ../empty_src.cpp SeparateChainingHashTable.hpp)
@@ -270,6 +271,7 @@ add_library(quickstep_storage_WindowAggregationOperationState_proto ${storage_Wi
 
 # Link dependencies:
 target_link_libraries(quickstep_storage_AggregationOperationState
+                      ${GFLAGS_LIB_NAME}
                       glog
                       quickstep_catalog_CatalogDatabaseLite
                       quickstep_catalog_CatalogRelationSchema
@@ -289,6 +291,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_storage_HashTableFactory
                       quickstep_storage_HashTablePool
                       quickstep_storage_InsertDestination
+                      quickstep_storage_PartitionedHashTablePool
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
@@ -852,6 +855,14 @@ target_link_libraries(quickstep_storage_PackedRowStoreValueAccessor
                       quickstep_types_TypedValue
                       quickstep_utility_BitVector
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_PartitionedHashTablePool
+                      glog
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
+                      quickstep_storage_FastHashTableFactory
+                      quickstep_storage_HashTableBase
+                      quickstep_utility_Macros
+                      quickstep_utility_StringUtil)                    
 target_link_libraries(quickstep_storage_PreloaderThread
                       glog
                       quickstep_catalog_CatalogDatabase
@@ -1169,6 +1180,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_LinearOpenAddressingHashTable
                       quickstep_storage_PackedRowStoreTupleStorageSubBlock
                       quickstep_storage_PackedRowStoreValueAccessor
+                      quickstep_storage_PartitionedHashTablePool
                       quickstep_storage_PreloaderThread
                       quickstep_storage_SMAIndexSubBlock
                       quickstep_storage_SeparateChainingHashTable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/storage/HashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp
index 3cdfcb3..96cf849 100644
--- a/storage/HashTablePool.hpp
+++ b/storage/HashTablePool.hpp
@@ -173,7 +173,7 @@ class HashTablePool {
    * @param All the hash tables in the pool.
    *
    **/
-  const std::vector<std::unique_ptr<AggregationStateHashTableBase>>*
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>>*
       getAllHashTables() {
     return &hash_tables_;
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/storage/PartitionedHashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/PartitionedHashTablePool.hpp b/storage/PartitionedHashTablePool.hpp
new file mode 100644
index 0000000..bcc4d23
--- /dev/null
+++ b/storage/PartitionedHashTablePool.hpp
@@ -0,0 +1,204 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_PARTITIONED_HASH_TABLE_POOL_HPP_
+#define QUICKSTEP_STORAGE_PARTITIONED_HASH_TABLE_POOL_HPP_
+
+#include <algorithm>
+#include <chrono>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
+#include "storage/FastHashTableFactory.hpp"
+#include "utility/Macros.hpp"
+#include "utility/StringUtil.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageManager;
+class Type;
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+/**
+ * @brief A pool of HashTables used for a single aggregation handle. Each
+ *        HashTable represents values from a given partition, which is
+ *        determined by the keys in the group by clause.
+ **/
+class PartitionedHashTablePool {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param estimated_num_entries The maximum number of entries in a hash table.
+   * @param num_partitions The number of partitions (i.e. number of HashTables)
+   * @param hash_table_impl_type The type of hash table implementation.
+   * @param group_by_types A vector of pointer of types which form the group by
+   *        key.
+   * @param agg_handle The aggregation handle.
+   * @param storage_manager A pointer to the storage manager.
+   **/
+  PartitionedHashTablePool(const std::size_t estimated_num_entries,
+                           const std::size_t num_partitions,
+                           const HashTableImplType hash_table_impl_type,
+                           const std::vector<const Type *> &group_by_types,
+                           AggregationHandle *agg_handle,
+                           StorageManager *storage_manager)
+      : estimated_num_entries_(
+            setHashTableSize(estimated_num_entries, num_partitions)),
+        num_partitions_(num_partitions),
+        hash_table_impl_type_(hash_table_impl_type),
+        group_by_types_(group_by_types),
+        agg_handle_(DCHECK_NOTNULL(agg_handle)),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+    initializeAllHashTables();
+  }
+
+  /**
+   * @brief Constructor.
+   *
+   * @note This constructor is relevant for the HashTable specialized for
+   *       aggregation.
+   *
+   * @param estimated_num_entries The maximum number of entries in a hash table.
+   * @param num_partitions The number of partitions (i.e. number of HashTables)
+   * @param hash_table_impl_type The type of hash table implementation.
+   * @param group_by_types A vector of pointer of types which form the group by
+   *        key.
+   * @param payload_sizes The sizes of the payload elements (i.e.
+   *        AggregationStates).
+   * @param handles The aggregation handles.
+   * @param storage_manager A pointer to the storage manager.
+   **/
+  PartitionedHashTablePool(const std::size_t estimated_num_entries,
+                           const std::size_t num_partitions,
+                           const HashTableImplType hash_table_impl_type,
+                           const std::vector<const Type *> &group_by_types,
+                           const std::vector<std::size_t> &payload_sizes,
+                           const std::vector<AggregationHandle *> &handles,
+                           StorageManager *storage_manager)
+      : estimated_num_entries_(
+            setHashTableSize(estimated_num_entries, num_partitions)),
+        num_partitions_(num_partitions),
+        hash_table_impl_type_(hash_table_impl_type),
+        group_by_types_(group_by_types),
+        payload_sizes_(payload_sizes),
+        handles_(handles),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+    initializeAllHashTables();
+  }
+
+  /**
+   * @brief Check out a hash table for insertion.
+   *
+   * @param partition_id The ID of the partitioned HashTable.
+   *
+   * @return A hash table pointer for the given HashTable.
+   **/
+  AggregationStateHashTableBase* getHashTable(const std::size_t partition_id) {
+    DCHECK_LT(partition_id, num_partitions_);
+    DCHECK_LT(partition_id, hash_tables_.size());
+    return hash_tables_[partition_id].get();
+  }
+
+  /**
+   * @brief Get all the hash tables from the pool.
+   *
+   * @warning The caller should ensure that this call is made when no hash table
+   *          is being checked in or checked out from the pool. In other words
+   *          the hash table pool is in read-only state.
+   *
+   * @param All the hash tables in the pool.
+   *
+   **/
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>>*
+      getAllHashTables() {
+    return &hash_tables_;
+  }
+
+  /**
+   * @brief Get the number of partitions used for the aggregation.
+   **/
+  inline std::size_t getNumPartitions() const {
+    return num_partitions_;
+  }
+
+ private:
+  void initializeAllHashTables() {
+    for (std::size_t part_num = 0; part_num < num_partitions_; ++part_num) {
+      AggregationStateHashTableBase *part_hash_table = createNewHashTableFast();
+      hash_tables_.push_back(
+          std::unique_ptr<AggregationStateHashTableBase>(part_hash_table));
+    }
+  }
+
+  AggregationStateHashTableBase* createNewHashTable() {
+    return agg_handle_->createGroupByHashTable(hash_table_impl_type_,
+                                               group_by_types_,
+                                               estimated_num_entries_,
+                                               storage_manager_);
+  }
+
+  AggregationStateHashTableBase* createNewHashTableFast() {
+    return AggregationStateFastHashTableFactory::CreateResizable(
+                hash_table_impl_type_,
+                group_by_types_,
+                estimated_num_entries_,
+                payload_sizes_,
+                handles_,
+                storage_manager_);
+  }
+
+  inline std::size_t setHashTableSize(const std::size_t overall_estimate,
+                                      const std::size_t num_partitions) const {
+    CHECK_NE(num_partitions, 0Lu);
+    // The minimum size of the hash table is set to 100.
+    return std::max(static_cast<std::size_t>(overall_estimate / num_partitions),
+                    100Lu);
+  }
+
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>> hash_tables_;
+
+  const std::size_t estimated_num_entries_;
+  const std::size_t num_partitions_;
+
+  const HashTableImplType hash_table_impl_type_;
+
+  const std::vector<const Type *> group_by_types_;
+
+  std::vector<std::size_t> payload_sizes_;
+
+  AggregationHandle *agg_handle_;
+  const std::vector<AggregationHandle *> handles_;
+  StorageManager *storage_manager_;
+
+  DISALLOW_COPY_AND_ASSIGN(PartitionedHashTablePool);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_HASH_TABLE_POOL_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index dd3e19d..ea74ee6 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -1329,4 +1329,59 @@ const std::size_t StorageBlock::getNumTuples() const {
   return tuple_store_->numTuples();
 }
 
+void StorageBlock::aggregateGroupByPartitioned(
+    const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+    const std::vector<std::unique_ptr<const Scalar>> &group_by,
+    const TupleIdSequence *filter,
+    const std::size_t num_partitions,
+    ColumnVectorsValueAccessor *temp_result,
+    std::vector<attribute_id> *argument_ids,
+    std::vector<attribute_id> *key_ids,
+    std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
+  DCHECK(!group_by.empty())
+      << "Called aggregateGroupByPartitioned() with zero GROUP BY expressions";
+
+  SubBlocksReference sub_blocks_ref(*tuple_store_,
+                                    indices_,
+                                    indices_consistent_);
+
+  std::unique_ptr<ValueAccessor> accessor(
+      tuple_store_->createValueAccessor(filter));
+
+  attribute_id attr_id = 0;
+
+  // First, put GROUP BY keys into 'temp_result'.
+  if (reuse_group_by_vectors->empty()) {
+    // Compute GROUP BY values from group_by Scalars, and store them in
+    // reuse_group_by_vectors for reuse by other aggregates on this same
+    // block.
+    reuse_group_by_vectors->reserve(group_by.size());
+    for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+      reuse_group_by_vectors->emplace_back(
+          group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
+      temp_result->addColumn(reuse_group_by_vectors->back().get(), false);
+      key_ids->push_back(attr_id++);
+    }
+  } else {
+    // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
+    DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
+        << "Wrong number of reuse_group_by_vectors";
+    for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) {
+      temp_result->addColumn(reuse_cv.get(), false);
+      key_ids->push_back(attr_id++);
+    }
+  }
+
+  // Compute argument vectors and add them to 'temp_result'.
+  for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) {
+    for (const std::unique_ptr<const Scalar> &args : argument) {
+      temp_result->addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref));
+      argument_ids->push_back(attr_id++);
+    }
+    if (argument.empty()) {
+      argument_ids->push_back(kInvalidAttributeID);
+    }
+  }
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 488efcc..56b3bdc 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -43,6 +43,7 @@ class AggregationHandle;
 class AggregationState;
 class CatalogRelationSchema;
 class ColumnVector;
+class ColumnVectorsValueAccessor;
 class InsertDestinationInterface;
 class Predicate;
 class Scalar;
@@ -451,6 +452,55 @@ class StorageBlock : public StorageBlockBase {
       AggregationStateHashTableBase *hash_table,
       std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
 
+
+  /**
+   * @brief Perform the GROUP BY aggregation for the case when aggregation is
+   *        partitioned.
+   *
+   * TODO(harshad) - Refactor this class to use only one function
+   *       aggregateGroupBy.
+   * @note The difference between this method and the aggregateGroupBy method
+   *       is that in this method, the tuples are routed to different HashTables
+   *       based on the partition to which they belong to. The partition is
+   *       determined by the GROUP BY attributes. Right now hash based
+   *       partitioning is performed.
+   *
+   * @note This function only creates the ColumnVectorsValueAccessor needed for
+   *       the insertion in the hash table. The actual insertion in respective
+   *       hash tables should be handled by the caller. See
+   *       AggregationOperationState::aggregateHashTable() for one such
+   *       implementation.
+   *
+   * @param arguments The arguments to the aggregation function as Scalars.
+   * @param group_by The list of GROUP BY attributes/expressions. The tuples in
+   *        this storage block are grouped by these attributes before
+   *        aggregation.
+   * @param filter If non-NULL, then only tuple IDs which are set in the
+   *        filter will be checked (all others will be assumed to be false).
+   * @param num_partitions The number of partitions used for the aggregation.
+   * @param temp_result The ColumnVectorsValueAccessor used for collecting
+   *        the attribute values from this StorageBlock.
+   * @param arguments_ids The attribute IDs used for the aggregation, which
+   *        come from the arguments vector. If arguments is empty, this vector
+   *        is filled with invalid attribute IDs.
+   * @param key_ids The attribute IDs of the group by attributes.
+   * @param reuse_group_by_vectors This parameter is used to store and reuse
+   *        GROUP BY attribute vectors pre-computed in an earlier invocation of
+   *        aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr
+   *        for ease of use. Current invocation of aggregateGroupBy() will reuse
+   *        ColumnVectors if non-empty, otherwise computes ColumnVectors based
+   *        on \c group_by and stores them in \c reuse_group_by_vectors.
+   **/
+  void aggregateGroupByPartitioned(
+      const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+      const std::vector<std::unique_ptr<const Scalar>> &group_by,
+      const TupleIdSequence *filter,
+      const std::size_t num_partitions,
+      ColumnVectorsValueAccessor *temp_result,
+      std::vector<attribute_id> *argument_ids,
+      std::vector<attribute_id> *key_ids,
+      std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
+
   /**
    * @brief Inserts the GROUP BY expressions and aggregation arguments together
    *        as keys into the distinctify hash table.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/types/containers/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/types/containers/CMakeLists.txt b/types/containers/CMakeLists.txt
index aacb63a..c2a6623 100644
--- a/types/containers/CMakeLists.txt
+++ b/types/containers/CMakeLists.txt
@@ -49,6 +49,7 @@ target_link_libraries(quickstep_types_containers_Tuple
                       quickstep_catalog_CatalogTypedefs
                       quickstep_types_TypedValue
                       quickstep_types_containers_Tuple_proto
+                      quickstep_utility_CompositeHash
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_types_containers_Tuple_proto
                       quickstep_types_TypedValue_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/types/containers/Tuple.hpp
----------------------------------------------------------------------
diff --git a/types/containers/Tuple.hpp b/types/containers/Tuple.hpp
index 60f832c..6237d54 100644
--- a/types/containers/Tuple.hpp
+++ b/types/containers/Tuple.hpp
@@ -28,6 +28,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/Tuple.pb.h"
+#include "utility/CompositeHash.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -218,6 +219,13 @@ class Tuple {
     return attribute_values_.size();
   }
 
+  /**
+   * @brief Get the hash value of the tuple.
+   **/
+  std::size_t getTupleHash() const {
+    return HashCompositeKey(attribute_values_);
+  }
+
  private:
   /**
    * @brief Constructor which does not create any attributes, nor pre-reserve

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index 395e264..e9be2ec 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -169,6 +169,7 @@ add_library(quickstep_utility_BloomFilter_proto
 add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp)
 add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
 add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
+add_library(quickstep_utility_CompositeHash ../empty_src.cpp CompositeHash.hpp)
 add_library(quickstep_utility_DAG ../empty_src.cpp DAG.hpp)
 add_library(quickstep_utility_DisjointTreeForest ../empty_src.cpp DisjointTreeForest.hpp)
 add_library(quickstep_utility_EqualsAnyConstant ../empty_src.cpp EqualsAnyConstant.hpp)
@@ -230,6 +231,10 @@ target_link_libraries(quickstep_utility_CalculateInstalledMemory
                       glog)
 target_link_libraries(quickstep_utility_CheckSnprintf
                       glog)
+target_link_libraries(quickstep_utility_CompositeHash
+                      quickstep_types_TypedValue
+                      quickstep_utility_HashPair
+                      glog)
 target_link_libraries(quickstep_utility_DAG
                       glog
                       quickstep_utility_Macros)
@@ -325,6 +330,7 @@ target_link_libraries(quickstep_utility
                       quickstep_utility_CalculateInstalledMemory
                       quickstep_utility_Cast
                       quickstep_utility_CheckSnprintf
+                      quickstep_utility_CompositeHash
                       quickstep_utility_DAG
                       quickstep_utility_DisjointTreeForest
                       quickstep_utility_EqualsAnyConstant

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e1434ad/utility/CompositeHash.hpp
----------------------------------------------------------------------
diff --git a/utility/CompositeHash.hpp b/utility/CompositeHash.hpp
new file mode 100644
index 0000000..517bc96
--- /dev/null
+++ b/utility/CompositeHash.hpp
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_
+#define QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_
+
+#include <cstddef>
+#include <vector>
+
+#include "types/TypedValue.hpp"
+#include "utility/HashPair.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/**
+ * @brief Compute the hash value of a composite key.
+ *
+ * @param key A vector of TypedValues which together form the composite key.
+ * @return The hash value.
+ **/
+static std::size_t HashCompositeKey(const std::vector<TypedValue> &key) {
+  DCHECK(!key.empty());
+  std::size_t hash = key.front().getHash();
+  for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1;
+       key_it != key.end();
+       ++key_it) {
+    hash = CombineHashes(hash, key_it->getHash());
+  }
+  return hash;
+}
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_