You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/03/29 00:08:30 UTC

[26/40] incubator-quickstep git commit: QUICKSTEP-73: Multi-attribute PartitionSchemeHeader.

QUICKSTEP-73: Multi-attribute PartitionSchemeHeader.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/05b47b5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/05b47b5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/05b47b5a

Branch: refs/heads/new-op
Commit: 05b47b5a9fa7bd550c284846c705765c99d409ba
Parents: 42bf626
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Mar 18 03:28:38 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Mar 26 15:55:35 2017 -0700

----------------------------------------------------------------------
 catalog/CMakeLists.txt                          |   5 +-
 catalog/Catalog.proto                           |  10 +-
 catalog/CatalogRelation.cpp                     |   6 +-
 catalog/PartitionSchemeHeader.cpp               |  97 ++++--
 catalog/PartitionSchemeHeader.hpp               | 170 ++++++----
 catalog/tests/NUMAPlacementScheme_unittest.cpp  |   2 +-
 catalog/tests/PartitionScheme_unittest.cpp      | 321 +++++++++----------
 parser/tests/Create.test                        |  38 +++
 query_optimizer/ExecutionGenerator.cpp          |   2 +-
 query_optimizer/resolver/Resolver.cpp           |  35 +-
 .../tests/execution_generator/Create.test       |  17 +
 .../tests/logical_generator/Create.test         |  21 ++
 .../tests/physical_generator/Create.test        |  39 +++
 query_optimizer/tests/resolver/Create.test      |  21 ++
 .../tests/HashJoinOperator_unittest.cpp         |   4 +-
 storage/CMakeLists.txt                          |   1 +
 storage/InsertDestination.cpp                   |  48 +--
 storage/InsertDestination.hpp                   |   6 +-
 storage/InsertDestinationInterface.hpp          |   9 +-
 storage/StorageBlock.cpp                        |   3 +-
 20 files changed, 536 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/catalog/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/catalog/CMakeLists.txt b/catalog/CMakeLists.txt
index 3c64e97..9684cfe 100644
--- a/catalog/CMakeLists.txt
+++ b/catalog/CMakeLists.txt
@@ -174,7 +174,9 @@ target_link_libraries(quickstep_catalog_PartitionSchemeHeader
                       quickstep_types_TypedValue
                       quickstep_types_TypedValue_proto
                       quickstep_types_operations_comparisons_Comparison
+                      quickstep_types_operations_comparisons_EqualComparison
                       quickstep_types_operations_comparisons_LessComparison
+                      quickstep_utility_CompositeHash
                       quickstep_utility_Macros)
 
 # Module all-in-one library:
@@ -276,7 +278,8 @@ target_link_libraries(PartitionScheme_unittest
                       quickstep_types_TypeID
                       quickstep_types_TypedValue
                       quickstep_types_operations_comparisons_Comparison
-                      quickstep_types_operations_comparisons_EqualComparison)
+                      quickstep_types_operations_comparisons_EqualComparison
+                      quickstep_utility_CompositeHash)
 add_test(PartitionScheme_unittest PartitionScheme_unittest)
 
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/catalog_relation_statistics_test_data)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/catalog/Catalog.proto
----------------------------------------------------------------------
diff --git a/catalog/Catalog.proto b/catalog/Catalog.proto
index 89cb7e5..4e2fafe 100644
--- a/catalog/Catalog.proto
+++ b/catalog/Catalog.proto
@@ -39,18 +39,22 @@ message PartitionSchemeHeader {
   required PartitionType partition_type = 1;
 
   required uint64 num_partitions = 2;
-  required uint32 partition_attribute_id = 3;
+  repeated uint32 partition_attribute_ids = 3;
 
   // The convention for extension numbering is that extensions for a particular
   // PartitionType should begin from (partition_type + 1) * 16.
   extensions 16 to max;
 }
 
+message PartitionValues {
+  repeated TypedValue partition_values = 1;
+}
+
 message RangePartitionSchemeHeader {
   extend PartitionSchemeHeader {
     // All required.
-    optional Type partition_attr_type = 32;
-    repeated TypedValue partition_range_boundaries = 33;
+    repeated Type partition_attr_types = 32;
+    repeated PartitionValues partition_range_boundaries = 33;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/catalog/CatalogRelation.cpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogRelation.cpp b/catalog/CatalogRelation.cpp
index 41c503c..793fc2d 100644
--- a/catalog/CatalogRelation.cpp
+++ b/catalog/CatalogRelation.cpp
@@ -88,7 +88,11 @@ CatalogRelation::CatalogRelation(const serialization::CatalogRelationSchema &pro
     const serialization::PartitionScheme &proto_partition_scheme =
         proto.GetExtension(serialization::CatalogRelation::partition_scheme);
 
-    DCHECK(hasAttributeWithId(proto_partition_scheme.header().partition_attribute_id()));
+#ifdef QUICKSTEP_DEBUG
+    for (int i = 0; i < proto_partition_scheme.header().partition_attribute_ids_size(); ++i) {
+      DCHECK(hasAttributeWithId(proto_partition_scheme.header().partition_attribute_ids(i)));
+    }
+#endif
 
     setPartitionScheme(PartitionScheme::ReconstructFromProto(proto_partition_scheme));
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/catalog/PartitionSchemeHeader.cpp
----------------------------------------------------------------------
diff --git a/catalog/PartitionSchemeHeader.cpp b/catalog/PartitionSchemeHeader.cpp
index 0472fe6..30fa58d 100644
--- a/catalog/PartitionSchemeHeader.cpp
+++ b/catalog/PartitionSchemeHeader.cpp
@@ -20,10 +20,12 @@
 #include "catalog/PartitionSchemeHeader.hpp"
 
 #include <cstddef>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
 #include "catalog/Catalog.pb.h"
+#include "catalog/CatalogTypedefs.hpp"
 #include "types/Type.hpp"
 #include "types/Type.pb.h"
 #include "types/TypeFactory.hpp"
@@ -32,16 +34,30 @@
 
 #include "glog/logging.h"
 
+using std::move;
+using std::size_t;
+using std::vector;
+
 namespace quickstep {
 
 PartitionSchemeHeader::PartitionSchemeHeader(const PartitionType type,
                                              const std::size_t num_partitions,
-                                             const attribute_id attr_id)
+                                             PartitionAttributeIds &&attr_ids)  // NOLINT(whitespace/operators)
     : partition_type_(type),
       num_partitions_(num_partitions),
-      partition_attribute_id_(attr_id) {
+      partition_attribute_ids_(move(attr_ids)) {
   DCHECK_GT(num_partitions, 0u);
-  DCHECK_GE(attr_id, 0);
+
+#ifdef QUICKSTEP_DEBUG
+  // Ensure that no duplicated partition attributes exist.
+  std::unordered_set<attribute_id> partition_attribute_ids;
+  for (const attribute_id attr_id : partition_attribute_ids_) {
+    DCHECK_NE(attr_id, kInvalidCatalogId);
+
+    CHECK_EQ(0u, partition_attribute_ids.count(attr_id));
+    partition_attribute_ids.insert(attr_id);
+  }
+#endif
 }
 
 bool PartitionSchemeHeader::ProtoIsValid(
@@ -60,10 +76,12 @@ bool PartitionSchemeHeader::ProtoIsValid(
     case serialization::PartitionSchemeHeader::HASH:
       return true;
     case serialization::PartitionSchemeHeader::RANGE: {
-      const std::size_t num_ranges =
-          proto.ExtensionSize(serialization::RangePartitionSchemeHeader::partition_range_boundaries);
-      return num_ranges == proto.num_partitions() - 1 &&
-             proto.HasExtension(serialization::RangePartitionSchemeHeader::partition_attr_type);
+      return proto.HasExtension(serialization::RangePartitionSchemeHeader::partition_range_boundaries) &&
+             proto.ExtensionSize(serialization::RangePartitionSchemeHeader::partition_range_boundaries) ==
+                 static_cast<int>(proto.num_partitions() - 1) &&
+             proto.HasExtension(serialization::RangePartitionSchemeHeader::partition_attr_types) &&
+             proto.ExtensionSize(serialization::RangePartitionSchemeHeader::partition_attr_types) ==
+                 proto.partition_attribute_ids_size();
     }
     default:
       // Partition type is unknown.
@@ -77,31 +95,43 @@ PartitionSchemeHeader* PartitionSchemeHeader::ReconstructFromProto(
       << "Attempted to create PartitionSchemeHeader from an invalid proto description:\n"
       << proto.DebugString();
 
+  PartitionAttributeIds partition_attribute_ids;
+  for (int i = 0; i < proto.partition_attribute_ids_size(); ++i) {
+    partition_attribute_ids.push_back(proto.partition_attribute_ids(i));
+  }
+
   switch (proto.partition_type()) {
     case serialization::PartitionSchemeHeader::HASH: {
-      return new HashPartitionSchemeHeader(proto.num_partitions(), proto.partition_attribute_id());
+      return new HashPartitionSchemeHeader(proto.num_partitions(), move(partition_attribute_ids));
     }
     case serialization::PartitionSchemeHeader::RANGE: {
-      const Type &attr_type =
-          TypeFactory::ReconstructFromProto(proto.GetExtension(
-              serialization::RangePartitionSchemeHeader::partition_attr_type));
-
-      std::vector<TypedValue> partition_ranges;
-      for (int i = 0;
-           i < proto.ExtensionSize(serialization::RangePartitionSchemeHeader::partition_range_boundaries);
-           ++i) {
-        partition_ranges.emplace_back(
-            TypedValue::ReconstructFromProto(
-                proto.GetExtension(serialization::RangePartitionSchemeHeader::partition_range_boundaries, i)));
+      std::vector<const Type*> attr_types;
+      for (int i = 0; i < proto.ExtensionSize(serialization::RangePartitionSchemeHeader::partition_attr_types); ++i) {
+        attr_types.push_back(&TypeFactory::ReconstructFromProto(
+            proto.GetExtension(serialization::RangePartitionSchemeHeader::partition_attr_types, i)));
       }
 
-      return new RangePartitionSchemeHeader(attr_type,
-                                            proto.num_partitions(),
-                                            proto.partition_attribute_id(),
-                                            std::move(partition_ranges));
+      const int partition_ranges_size =
+          proto.ExtensionSize(serialization::RangePartitionSchemeHeader::partition_range_boundaries);
+      std::vector<PartitionValues> partition_ranges(partition_ranges_size);
+      for (int i = 0; i < partition_ranges_size; ++i) {
+        const auto &proto_partition_values =
+            proto.GetExtension(serialization::RangePartitionSchemeHeader::partition_range_boundaries, i);
+        for (int j = 0; j < proto_partition_values.partition_values_size(); ++j) {
+          partition_ranges[i].emplace_back(TypedValue::ReconstructFromProto(
+              proto_partition_values.partition_values(j)));
+        }
+      }
+
+      return new RangePartitionSchemeHeader(proto.num_partitions(),
+                                            move(partition_attribute_ids),
+                                            move(attr_types),
+                                            move(partition_ranges));
     }
     default:
       LOG(FATAL) << "Invalid partition scheme header.";
+      // Avoid '-Werror=return-type' using GCC.
+      return nullptr;
   }
 }
 
@@ -120,7 +150,10 @@ serialization::PartitionSchemeHeader PartitionSchemeHeader::getProto() const {
   }
 
   proto.set_num_partitions(num_partitions_);
-  proto.set_partition_attribute_id(partition_attribute_id_);
+
+  for (const attribute_id attr_id : partition_attribute_ids_) {
+    proto.add_partition_attribute_ids(attr_id);
+  }
 
   return proto;
 }
@@ -128,12 +161,18 @@ serialization::PartitionSchemeHeader PartitionSchemeHeader::getProto() const {
 serialization::PartitionSchemeHeader RangePartitionSchemeHeader::getProto() const {
   serialization::PartitionSchemeHeader proto = PartitionSchemeHeader::getProto();
 
-  proto.MutableExtension(serialization::RangePartitionSchemeHeader::partition_attr_type)
-      ->MergeFrom(partition_attr_type_->getProto());
+  for (const Type *type : partition_attr_types_) {
+    proto.AddExtension(serialization::RangePartitionSchemeHeader::partition_attr_types)
+        ->MergeFrom(type->getProto());
+  }
+
+  for (const PartitionValues &partition_range_boundary : partition_range_boundaries_) {
+    serialization::PartitionValues *proto_range_boundary =
+        proto.AddExtension(serialization::RangePartitionSchemeHeader::partition_range_boundaries);
 
-  for (std::size_t i = 0; i < partition_range_boundaries_.size(); ++i) {
-    proto.AddExtension(serialization::RangePartitionSchemeHeader::partition_range_boundaries)
-        ->MergeFrom(partition_range_boundaries_[i].getProto());
+    for (const TypedValue &value : partition_range_boundary) {
+      proto_range_boundary->add_partition_values()->MergeFrom(value.getProto());
+    }
   }
 
   return proto;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/catalog/PartitionSchemeHeader.hpp
----------------------------------------------------------------------
diff --git a/catalog/PartitionSchemeHeader.hpp b/catalog/PartitionSchemeHeader.hpp
index c1d65d1..a03b0e2 100644
--- a/catalog/PartitionSchemeHeader.hpp
+++ b/catalog/PartitionSchemeHeader.hpp
@@ -29,7 +29,9 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "types/TypedValue.hpp"
 #include "types/operations/comparisons/Comparison.hpp"
+#include "types/operations/comparisons/EqualComparison.hpp"
 #include "types/operations/comparisons/LessComparison.hpp"
+#include "utility/CompositeHash.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -48,6 +50,13 @@ class Type;
  **/
 class PartitionSchemeHeader {
  public:
+  // A vector for partitioning catalog attributes.
+  typedef std::vector<attribute_id> PartitionAttributeIds;
+
+  // The values for partition attributes.
+  // PartitionValues.size() should be equal to PartitionAttributeIds.size().
+  typedef std::vector<TypedValue> PartitionValues;
+
   enum PartitionType {
     kHash = 0,
     kRange
@@ -84,8 +93,8 @@ class PartitionSchemeHeader {
    * @brief Calculate the partition id into which the attribute value should
    *        be inserted.
    *
-   * @param value_of_attribute The attribute value for which the
-   *                           partition id is to be determined.
+   * @param value_of_attributes A vector of attribute values for which the
+   *        partition id is to be determined.
    * @return The partition id of the partition for the attribute value.
    **/
   // TODO(gerald): Make this method more efficient since currently this is
@@ -93,7 +102,7 @@ class PartitionSchemeHeader {
   // once using a value accessor and create bitmaps for each partition with
   // tuples that correspond to those partitions.
   virtual partition_id getPartitionId(
-      const TypedValue &value_of_attribute) const = 0;
+      const PartitionValues &value_of_attributes) const = 0;
 
   /**
    * @brief Serialize the Partition Scheme as Protocol Buffer.
@@ -121,13 +130,13 @@ class PartitionSchemeHeader {
   }
 
   /**
-   * @brief Get the partitioning attribute for the relation.
+   * @brief Get the partitioning attributes for the relation.
    *
-   * @return The partitioning attribute with which the relation
+   * @return The partitioning attributes with which the relation
    *         is partitioned into.
    **/
-  inline attribute_id getPartitionAttributeId() const {
-    return partition_attribute_id_;
+  inline const PartitionAttributeIds& getPartitionAttributeIds() const {
+    return partition_attribute_ids_;
   }
 
  protected:
@@ -137,18 +146,18 @@ class PartitionSchemeHeader {
    * @param type The type of partitioning to be used to partition the
    *             relation.
    * @param num_partitions The number of partitions to be created.
-   * @param attr_id The attribute on which the partitioning happens.
+   * @param attr_ids The attributes on which the partitioning happens.
    **/
   PartitionSchemeHeader(const PartitionType type,
                         const std::size_t num_partitions,
-                        const attribute_id attr_id);
+                        PartitionAttributeIds &&attr_ids);  // NOLINT(whitespace/operators)
 
   // The type of partitioning: Hash or Range.
   const PartitionType partition_type_;
   // The number of partitions.
   const std::size_t num_partitions_;
   // The attribute of partioning.
-  const attribute_id partition_attribute_id_;
+  const PartitionAttributeIds partition_attribute_ids_;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(PartitionSchemeHeader);
@@ -158,16 +167,17 @@ class PartitionSchemeHeader {
  * @brief Implementation of PartitionSchemeHeader that partitions the tuples in
  *        a relation based on a hash function on the partitioning attribute.
 **/
-class HashPartitionSchemeHeader : public PartitionSchemeHeader {
+class HashPartitionSchemeHeader final : public PartitionSchemeHeader {
  public:
   /**
    * @brief Constructor.
    *
    * @param num_partitions The number of partitions to be created.
-   * @param attribute The attribute on which the partitioning happens.
+   * @param attributes A vector of attributes on which the partitioning happens.
    **/
-  HashPartitionSchemeHeader(const std::size_t num_partitions, const attribute_id attribute)
-      : PartitionSchemeHeader(PartitionType::kHash, num_partitions, attribute) {
+  HashPartitionSchemeHeader(const std::size_t num_partitions,
+                            PartitionAttributeIds &&attributes)  // NOLINT(whitespace/operators)
+      : PartitionSchemeHeader(PartitionType::kHash, num_partitions, std::move(attributes)) {
   }
 
   /**
@@ -176,20 +186,13 @@ class HashPartitionSchemeHeader : public PartitionSchemeHeader {
   ~HashPartitionSchemeHeader() override {
   }
 
-  /**
-   * @brief Calulate the partition id into which the attribute value
-   *        should be inserted.
-   *
-   * @param value_of_attribute The attribute value for which the
-   *                           partition id is to be determined.
-   * @return The partition id of the partition for the attribute value.
-   **/
   partition_id getPartitionId(
-      const TypedValue &value_of_attribute) const override {
+      const PartitionValues &value_of_attributes) const override {
+    DCHECK_EQ(partition_attribute_ids_.size(), value_of_attributes.size());
     // TODO(gerald): Optimize for the case where the number of partitions is a
     // power of 2. We can just mask out the lower-order hash bits rather than
     // doing a division operation.
-    return value_of_attribute.getHash() % num_partitions_;
+    return HashCompositeKey(value_of_attributes) % num_partitions_;
   }
 
  private:
@@ -200,39 +203,48 @@ class HashPartitionSchemeHeader : public PartitionSchemeHeader {
  * @brief Implementation of PartitionSchemeHeader that partitions the tuples in
  *        a relation based on a given value range on the partitioning attribute.
 **/
-class RangePartitionSchemeHeader : public PartitionSchemeHeader {
+class RangePartitionSchemeHeader final : public PartitionSchemeHeader {
  public:
   /**
    * @brief Constructor.
    *
-   * @param partition_attribute_type The type of CatalogAttribute that is used
-   *                                 for partitioning.
    * @param num_partitions The number of partitions to be created.
-   * @param attribute The attribute_id on which the partitioning happens.
-   * @param partition_range The mapping between the partition ids and the upper
-   *                        bound of the range boundaries. If two ranges R1 and
-   *                        R2 are separated by a boundary value V, then V
-   *                        would fall into range R2. For creating a range
-   *                        partition scheme with n partitions, you need to
-   *                        specify n-1 boundary values. The first partition
-   *                        will have all the values less than the first
-   *                        boundary and the last partition would have all
-   *                        values greater than or equal to the last boundary
-   *                        value.
+   * @param attributes A vector of attribute_ids on which the partitioning
+   *        happens.
+   * @param partition_attribute_types The types of CatalogAttributes used for
+   *        partitioning.
+   * @param partition_ranges The mapping between the partition ids and the upper
+   *        bound of the range boundaries. If two ranges R1 and R2 are separated
+   *        by a vector of boundary values V, then V would fall into range R2.
+   *        For creating a range partition scheme with n partitions, you need to
+   *        specify n-1 range boundaries. The first partition will have all the
+   *        values less than the first item of range boundaries and the last
+   *        partition would have all values greater than or equal to the last
+   *        item of range boundaries.
    **/
-  RangePartitionSchemeHeader(const Type &partition_attribute_type,
-                             const std::size_t num_partitions,
-                             const attribute_id attribute,
-                             std::vector<TypedValue> &&partition_range)
-      : PartitionSchemeHeader(PartitionType::kRange, num_partitions, attribute),
-        partition_attr_type_(&partition_attribute_type),
-        partition_range_boundaries_(std::move(partition_range)) {
+  RangePartitionSchemeHeader(const std::size_t num_partitions,
+                             PartitionAttributeIds &&attributes,  // NOLINT(whitespace/operators)
+                             std::vector<const Type*> &&partition_attribute_types,
+                             std::vector<PartitionValues> &&partition_ranges)
+      : PartitionSchemeHeader(PartitionType::kRange, num_partitions, std::move(attributes)),
+        partition_attr_types_(std::move(partition_attribute_types)),
+        partition_range_boundaries_(std::move(partition_ranges)) {
+    DCHECK_EQ(partition_attribute_ids_.size(), partition_attr_types_.size());
     DCHECK_EQ(num_partitions - 1, partition_range_boundaries_.size());
 
     const Comparison &less_comparison_op(LessComparison::Instance());
-    less_unchecked_comparator_.reset(
-        less_comparison_op.makeUncheckedComparatorForTypes(
-            partition_attribute_type, partition_attribute_type));
+    for (const Type *type : partition_attr_types_) {
+      std::unique_ptr<UncheckedComparator> less_unchecked_comparator(
+          less_comparison_op.makeUncheckedComparatorForTypes(*type, *type));
+      less_unchecked_comparators_.emplace_back(std::move(less_unchecked_comparator));
+    }
+
+    const Comparison &equal_comparison_op = EqualComparison::Instance();
+    for (const Type *type : partition_attr_types_) {
+      std::unique_ptr<UncheckedComparator> equal_unchecked_comparator(
+          equal_comparison_op.makeUncheckedComparatorForTypes(*type, *type));
+      equal_unchecked_comparators_.emplace_back(std::move(equal_unchecked_comparator));
+    }
 
 #ifdef QUICKSTEP_DEBUG
     checkPartitionRangeBoundaries();
@@ -245,24 +257,18 @@ class RangePartitionSchemeHeader : public PartitionSchemeHeader {
   ~RangePartitionSchemeHeader() override {
   }
 
-  /**
-   * @brief Calulate the partition id into which the attribute value
-   *        should be inserted.
-   *
-   * @param value_of_attribute The attribute value for which the
-   *                           partition id is to be determined.
-   * @return The partition id of the partition for the attribute value.
-   **/
   partition_id getPartitionId(
-      const TypedValue &value_of_attribute) const override {
+      const PartitionValues &value_of_attributes) const override {
+    DCHECK_EQ(partition_attribute_ids_.size(), value_of_attributes.size());
+
     partition_id start = 0, end = partition_range_boundaries_.size() - 1;
-    if (!less_unchecked_comparator_->compareTypedValues(value_of_attribute, partition_range_boundaries_[end])) {
+    if (!lessThan(value_of_attributes, partition_range_boundaries_[end])) {
       return num_partitions_ - 1;
     }
 
     while (start < end) {
       const partition_id mid = start + ((end - start) >> 1);
-      if (less_unchecked_comparator_->compareTypedValues(value_of_attribute, partition_range_boundaries_[mid])) {
+      if (lessThan(value_of_attributes, partition_range_boundaries_[mid])) {
         end = mid;
       } else {
         start = mid + 1;
@@ -279,7 +285,7 @@ class RangePartitionSchemeHeader : public PartitionSchemeHeader {
    *
    * @return The vector of range boundaries for partitions.
    **/
-  inline const std::vector<TypedValue>& getPartitionRangeBoundaries() const {
+  inline const std::vector<PartitionValues>& getPartitionRangeBoundaries() const {
     return partition_range_boundaries_;
   }
 
@@ -288,22 +294,52 @@ class RangePartitionSchemeHeader : public PartitionSchemeHeader {
    * @brief Check if the partition range boundaries are in ascending order.
    **/
   void checkPartitionRangeBoundaries() {
+    for (const PartitionValues &partition_range_boundary : partition_range_boundaries_) {
+      CHECK_EQ(partition_attribute_ids_.size(), partition_range_boundary.size())
+          << "A partition boundary has different size than that of partition attributes.";
+    }
+
     for (partition_id part_id = 1; part_id < partition_range_boundaries_.size(); ++part_id) {
-      if (less_unchecked_comparator_->compareTypedValues(
-              partition_range_boundaries_[part_id],
-              partition_range_boundaries_[part_id - 1])) {
-        LOG(FATAL) << "Partition boundaries are not in ascending order.";
+      CHECK(lessThan(partition_range_boundaries_[part_id - 1], partition_range_boundaries_[part_id]))
+          << "Partition boundaries are not in ascending order.";
+    }
+  }
+
+  /**
+   * @brief Check if the partition values are in the lexicographical order.
+   *
+   * @note (l_0, l_1, ..., l_n) < (r_0, r_1, ..., r_n) iff l_0 < r_0, or
+   *       (l_0 == r_0) && (l_1, ..., l_n) < (r_1, ..., r_n).
+   **/
+  bool lessThan(const PartitionValues &lhs, const PartitionValues &rhs) const {
+    DCHECK_EQ(partition_attribute_ids_.size(), lhs.size());
+    DCHECK_EQ(partition_attribute_ids_.size(), rhs.size());
+
+    for (std::size_t attr_index = 0; attr_index < partition_attribute_ids_.size(); ++attr_index) {
+      if (less_unchecked_comparators_[attr_index]->compareTypedValues(lhs[attr_index], rhs[attr_index])) {
+        break;
+      } else if (equal_unchecked_comparators_[attr_index]->compareTypedValues(lhs[attr_index], rhs[attr_index])) {
+        if (attr_index == partition_attribute_ids_.size() - 1) {
+          return false;
+        }
+      } else {
+        return false;
       }
     }
+
+    return true;
   }
 
-  const Type* partition_attr_type_;
+  // The size is equal to 'partition_attribute_ids_.size()'.
+  const std::vector<const Type*> partition_attr_types_;
 
   // The boundaries for each range in the RangePartitionSchemeHeader.
   // The upper bound of the range is stored here.
-  const std::vector<TypedValue> partition_range_boundaries_;
+  const std::vector<PartitionValues> partition_range_boundaries_;
 
-  std::unique_ptr<UncheckedComparator> less_unchecked_comparator_;
+  // Both size are equal to 'partition_attr_types_.size()'.
+  std::vector<std::unique_ptr<UncheckedComparator>> less_unchecked_comparators_;
+  std::vector<std::unique_ptr<UncheckedComparator>> equal_unchecked_comparators_;
 
   DISALLOW_COPY_AND_ASSIGN(RangePartitionSchemeHeader);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/catalog/tests/NUMAPlacementScheme_unittest.cpp
----------------------------------------------------------------------
diff --git a/catalog/tests/NUMAPlacementScheme_unittest.cpp b/catalog/tests/NUMAPlacementScheme_unittest.cpp
index 6a3b32f..7d13941 100644
--- a/catalog/tests/NUMAPlacementScheme_unittest.cpp
+++ b/catalog/tests/NUMAPlacementScheme_unittest.cpp
@@ -82,7 +82,7 @@ TEST(NUMAPlacementSchemeTest, NUMAPlacementSchemeSerializationTest) {
   // Create a HashPartitionSchemeHeader object with 64 partitions and attribute
   //  0 as the partitioning attribute.
   std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(
-      new HashPartitionSchemeHeader(num_partitions, 0));
+      new HashPartitionSchemeHeader(num_partitions, { 0 }));
 
   // Create a NUMAPlacementScheme object with the num_partitions.
   std::unique_ptr<NUMAPlacementScheme> placement_scheme(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/catalog/tests/PartitionScheme_unittest.cpp
----------------------------------------------------------------------
diff --git a/catalog/tests/PartitionScheme_unittest.cpp b/catalog/tests/PartitionScheme_unittest.cpp
index 97a9092..c4f7dec 100644
--- a/catalog/tests/PartitionScheme_unittest.cpp
+++ b/catalog/tests/PartitionScheme_unittest.cpp
@@ -35,22 +35,22 @@
 #include "types/TypedValue.hpp"
 #include "types/operations/comparisons/Comparison.hpp"
 #include "types/operations/comparisons/EqualComparison.hpp"
+#include "utility/CompositeHash.hpp"
 
 #include "gtest/gtest.h"
 
 using std::move;
 using std::size_t;
+using std::vector;
 
 namespace quickstep {
 
-class Type;
-
 TEST(PartitionSchemeHeaderTest, IntegerHashPartitionSchemeHeaderTest) {
   const std::size_t num_partitions = 4;
   std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(
-      new HashPartitionSchemeHeader(num_partitions, 0));
+      new HashPartitionSchemeHeader(num_partitions, { 0 }));
   EXPECT_EQ(num_partitions, partition_scheme_header->getNumPartitions());
-  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeId());
+  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeIds().front());
   const int kSampleInts[] = {
       0, 1, 2, 3, 400, 501, 64783970, 78437883, -2784627};
   const size_t num_ints = sizeof(kSampleInts) / sizeof(kSampleInts[0]);
@@ -59,16 +59,16 @@ TEST(PartitionSchemeHeaderTest, IntegerHashPartitionSchemeHeaderTest) {
     // an integer is the same as the hash of the integer modulus the number
     // of partitions.
     EXPECT_EQ(TypedValue(kSampleInts[i]).getHash() % num_partitions,
-              partition_scheme_header->getPartitionId(TypedValue(kSampleInts[i])));
+              partition_scheme_header->getPartitionId({ TypedValue(kSampleInts[i]) }));
   }
 }
 
 TEST(PartitionSchemeHeaderTest, LongHashPartitionSchemeHeaderTest) {
   const std::size_t num_partitions = 8;
   std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(
-      new HashPartitionSchemeHeader(num_partitions, 0));
+      new HashPartitionSchemeHeader(num_partitions, { 0 }));
   EXPECT_EQ(num_partitions, partition_scheme_header->getNumPartitions());
-  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeId());
+  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeIds().front());
   const std::int64_t kSampleLongs[] = {INT64_C(10),
                                   INT64_C(100),
                                   INT64_C(1025),
@@ -80,16 +80,16 @@ TEST(PartitionSchemeHeaderTest, LongHashPartitionSchemeHeaderTest) {
   // of partitions.
   for (size_t i = 0; i < num_longs; ++i) {
     EXPECT_EQ(TypedValue(kSampleLongs[i]).getHash() % num_partitions,
-              partition_scheme_header->getPartitionId(TypedValue(kSampleLongs[i])));
+              partition_scheme_header->getPartitionId({ TypedValue(kSampleLongs[i]) }));
   }
 }
 
 TEST(PartitionSchemeHeaderTest, FloatHashPartitionSchemeHeaderTest) {
   const std::size_t num_partitions = 5;
   std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(
-      new HashPartitionSchemeHeader(num_partitions, 0));
+      new HashPartitionSchemeHeader(num_partitions, { 0 }));
   EXPECT_EQ(num_partitions, partition_scheme_header->getNumPartitions());
-  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeId());
+  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeIds().front());
   const float kSampleFloats[] = {
       285728.895680f, 924005.4989f, -8973494.37438f};
   const size_t num_floats = sizeof(kSampleFloats) / sizeof(kSampleFloats[0]);
@@ -98,16 +98,16 @@ TEST(PartitionSchemeHeaderTest, FloatHashPartitionSchemeHeaderTest) {
   // the number of partitions.
   for (size_t i = 0; i < num_floats; ++i) {
     EXPECT_EQ(TypedValue(kSampleFloats[i]).getHash() % num_partitions,
-              partition_scheme_header->getPartitionId(TypedValue(kSampleFloats[i])));
+              partition_scheme_header->getPartitionId({ TypedValue(kSampleFloats[i]) }));
   }
 }
 
 TEST(PartitionSchemeHeaderTest, DoubleHashPartitionSchemeHeaderTest) {
   const std::size_t num_partitions = 6;
   std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(
-      new HashPartitionSchemeHeader(num_partitions, 0));
+      new HashPartitionSchemeHeader(num_partitions, { 0 }));
   EXPECT_EQ(num_partitions, partition_scheme_header->getNumPartitions());
-  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeId());
+  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeIds().front());
   const double kSampleDoubles[] = {
       1.0378, 763624.46343453, -87238497384.3187431894713};
   const size_t num_doubles = sizeof(kSampleDoubles) / sizeof(kSampleDoubles[0]);
@@ -117,16 +117,16 @@ TEST(PartitionSchemeHeaderTest, DoubleHashPartitionSchemeHeaderTest) {
   for (size_t i = 0; i < num_doubles; ++i) {
     EXPECT_EQ(
         TypedValue(kSampleDoubles[i]).getHash() % num_partitions,
-        partition_scheme_header->getPartitionId(TypedValue(kSampleDoubles[i])));
+        partition_scheme_header->getPartitionId({ TypedValue(kSampleDoubles[i]) }));
   }
 }
 
 TEST(PartitionSchemeHeaderTest, CharacterHashPartitionSchemeHeaderTest) {
   const std::size_t num_partitions = 7;
   std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(
-      new HashPartitionSchemeHeader(num_partitions, 0));
+      new HashPartitionSchemeHeader(num_partitions, { 0 }));
   EXPECT_EQ(num_partitions, partition_scheme_header->getNumPartitions());
-  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeId());
+  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeIds().front());
   const char *kSampleStrings[] = {
       "a", "gerald", "ram", "3081289", "=42?", "+-/*&^%", "hello_world"};
   const size_t num_strings = sizeof(kSampleStrings) / sizeof(kSampleStrings[0]);
@@ -139,17 +139,17 @@ TEST(PartitionSchemeHeaderTest, CharacterHashPartitionSchemeHeaderTest) {
             kChar, kSampleStrings[i], std::strlen(kSampleStrings[i]) + 1)
                 .getHash() %
             num_partitions,
-        partition_scheme_header->getPartitionId(TypedValue(
-            kChar, kSampleStrings[i], std::strlen(kSampleStrings[i]) + 1)));
+        partition_scheme_header->getPartitionId({ TypedValue(
+            kChar, kSampleStrings[i], std::strlen(kSampleStrings[i]) + 1) }));
   }
 }
 
 TEST(PartitionSchemeHeaderTest, VarCharHashPartitionSchemeHeaderTest) {
   const std::size_t num_partitions = 7;
   std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(
-      new HashPartitionSchemeHeader(num_partitions, 0));
+      new HashPartitionSchemeHeader(num_partitions, { 0 }));
   EXPECT_EQ(num_partitions, partition_scheme_header->getNumPartitions());
-  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeId());
+  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeIds().front());
   const char *kSampleStrings[] = {
       "hello", "world", "1234567", "!@#$^&*", "pa345+="};
   const size_t num_strings = sizeof(kSampleStrings) / sizeof(kSampleStrings[0]);
@@ -163,131 +163,153 @@ TEST(PartitionSchemeHeaderTest, VarCharHashPartitionSchemeHeaderTest) {
                 .getHash() %
             num_partitions,
         partition_scheme_header->getPartitionId(
-            TypedValue(kVarChar,
-                       kSampleStrings[i],
-                       std::strlen(kSampleStrings[i]) + 1)));
+            { TypedValue(kVarChar, kSampleStrings[i], std::strlen(kSampleStrings[i]) + 1) }));
+  }
+}
+
+TEST(PartitionSchemeHeaderTest, MultiAttributeHashPartitionSchemeHeaderTest) {
+  const std::size_t num_partitions = 4;
+  constexpr attribute_id kPartitioningFirstAttribute = 0;
+  constexpr attribute_id kPartitioningLastAttribute = 2;
+  std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(
+      new HashPartitionSchemeHeader(num_partitions, { kPartitioningFirstAttribute, kPartitioningLastAttribute }));
+  EXPECT_EQ(num_partitions, partition_scheme_header->getNumPartitions());
+  EXPECT_EQ(kPartitioningFirstAttribute, partition_scheme_header->getPartitionAttributeIds().front());
+  EXPECT_EQ(kPartitioningLastAttribute, partition_scheme_header->getPartitionAttributeIds().back());
+  const int kSampleInts[] = {
+      0, 78437883, -2784627};
+  const double kSampleDoubles[] = {
+      1.0378, 763624.46343453, -87238497384.3187431894713};
+  const size_t num_ints = sizeof(kSampleInts) / sizeof(kSampleInts[0]);
+  for (size_t i = 0; i < num_ints; ++i) {
+    const PartitionSchemeHeader::PartitionValues values =
+        { TypedValue(kSampleInts[i]), TypedValue(kSampleDoubles[i]) };
+    // Check if the partition id returned by the partition scheme for
+    // an integer is the same as the hash of the integer modulus the number
+    // of partitions.
+    EXPECT_EQ(HashCompositeKey(values) % num_partitions,
+              partition_scheme_header->getPartitionId(values));
   }
 }
 
 TEST(PartitionSchemeHeaderTest, IntegerRangePartitionSchemeHeaderTest) {
-  std::vector<TypedValue> partition_range;
+  vector<PartitionSchemeHeader::PartitionValues> partition_ranges;
   // Partition boundaries are 0, 10, 20.
   // Last partition can hold upto infinity.
   // First partition can hold from -infinity to -1.
   for (int i = 0; i < 3; ++i) {
-    partition_range.push_back(TypedValue(i * 10));
+    partition_ranges.push_back({ TypedValue(i * 10) });
   }
   std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(
-      new RangePartitionSchemeHeader(TypeFactory::GetType(kInt), 4, 0, move(partition_range)));
+      new RangePartitionSchemeHeader(4, { 0 }, { &TypeFactory::GetType(kInt) }, move(partition_ranges)));
   EXPECT_EQ(4u, partition_scheme_header->getNumPartitions());
   // Check if the partition id returned by the Range Partition Scheme for
   // integers is the same as the partition id into which it is supposed to
   // be based on the partition boundaries that we have defined.
-  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeId());
-  EXPECT_EQ(1u, partition_scheme_header->getPartitionId(TypedValue(0)));
-  EXPECT_EQ(2u, partition_scheme_header->getPartitionId(TypedValue(10)));
-  EXPECT_EQ(3u, partition_scheme_header->getPartitionId(TypedValue(20)));
-  EXPECT_EQ(3u, partition_scheme_header->getPartitionId(TypedValue(30)));
-  EXPECT_EQ(0u, partition_scheme_header->getPartitionId(TypedValue(-4)));
-  EXPECT_EQ(2u, partition_scheme_header->getPartitionId(TypedValue(15)));
-  EXPECT_EQ(1u, partition_scheme_header->getPartitionId(TypedValue(6)));
-  EXPECT_EQ(0u, partition_scheme_header->getPartitionId(TypedValue(-70)));
-  EXPECT_EQ(3u, partition_scheme_header->getPartitionId(TypedValue(1000)));
-  EXPECT_EQ(3u, partition_scheme_header->getPartitionId(TypedValue(20000)));
+  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeIds().front());
+
+  EXPECT_EQ(1u, partition_scheme_header->getPartitionId({ TypedValue(0) }));
+  EXPECT_EQ(2u, partition_scheme_header->getPartitionId({ TypedValue(10) }));
+  EXPECT_EQ(3u, partition_scheme_header->getPartitionId({ TypedValue(20) }));
+  EXPECT_EQ(3u, partition_scheme_header->getPartitionId({ TypedValue(30) }));
+  EXPECT_EQ(0u, partition_scheme_header->getPartitionId({ TypedValue(-4) }));
+  EXPECT_EQ(2u, partition_scheme_header->getPartitionId({ TypedValue(15) }));
+  EXPECT_EQ(1u, partition_scheme_header->getPartitionId({ TypedValue(6) }));
+  EXPECT_EQ(0u, partition_scheme_header->getPartitionId({ TypedValue(-70) }));
+  EXPECT_EQ(3u, partition_scheme_header->getPartitionId({ TypedValue(1000) }));
+  EXPECT_EQ(3u, partition_scheme_header->getPartitionId({ TypedValue(20000) }));
 }
 
 TEST(PartitionSchemeHeaderTest, LongRangePartitionSchemeHeaderTest) {
-  std::vector<TypedValue> partition_range;
+  vector<PartitionSchemeHeader::PartitionValues> partition_ranges;
   // Partition boundaries are 0, 10000, 20000, 30000
   for (int i = 0; i < 3; ++i) {
-    partition_range.push_back(TypedValue(i * INT64_C(10000)));
+    partition_ranges.push_back({ TypedValue(i * INT64_C(10000)) });
   }
   std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(
-      new RangePartitionSchemeHeader(TypeFactory::GetType(kLong), 4, 0, move(partition_range)));
+      new RangePartitionSchemeHeader(4, { 0 }, { &TypeFactory::GetType(kLong) }, move(partition_ranges)));
 
   EXPECT_EQ(4u, partition_scheme_header->getNumPartitions());
-  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeId());
+  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeIds().front());
   // Check if the partition id returned by the Range Partition Scheme for
   // long numbers is the same as the partition id into which it is supposed to
   // be based on the partition boundaries that we have defined.
-  EXPECT_EQ(1u, partition_scheme_header->getPartitionId(TypedValue(INT64_C(0))));
-  EXPECT_EQ(2u, partition_scheme_header->getPartitionId(TypedValue(INT64_C(13456))));
-  EXPECT_EQ(3u, partition_scheme_header->getPartitionId(TypedValue(INT64_C(20000))));
-  EXPECT_EQ(3u, partition_scheme_header->getPartitionId(TypedValue(INT64_C(300123))));
+  EXPECT_EQ(1u, partition_scheme_header->getPartitionId({ TypedValue(INT64_C(0)) }));
+  EXPECT_EQ(2u, partition_scheme_header->getPartitionId({ TypedValue(INT64_C(13456)) }));
+  EXPECT_EQ(3u, partition_scheme_header->getPartitionId({ TypedValue(INT64_C(20000)) }));
+  EXPECT_EQ(3u, partition_scheme_header->getPartitionId({ TypedValue(INT64_C(300123)) }));
   EXPECT_EQ(0u,
-            partition_scheme_header->getPartitionId(TypedValue(INT64_C(-400000))));
-  EXPECT_EQ(2u, partition_scheme_header->getPartitionId(TypedValue(INT64_C(15123))));
-  EXPECT_EQ(1u, partition_scheme_header->getPartitionId(TypedValue(INT64_C(6012))));
+            partition_scheme_header->getPartitionId({ TypedValue(INT64_C(-400000)) }));
+  EXPECT_EQ(2u, partition_scheme_header->getPartitionId({ TypedValue(INT64_C(15123)) }));
+  EXPECT_EQ(1u, partition_scheme_header->getPartitionId({ TypedValue(INT64_C(6012)) }));
   EXPECT_EQ(0u,
-            partition_scheme_header->getPartitionId(TypedValue(INT64_C(-7000000))));
+            partition_scheme_header->getPartitionId({ TypedValue(INT64_C(-7000000)) }));
 }
 
 TEST(PartitionSchemeHeaderTest, FloatRangePartitionSchemeHeaderTest) {
-  std::vector<TypedValue> partition_range;
+  vector<PartitionSchemeHeader::PartitionValues> partition_ranges;
   // Partition boundaries are 0.0, 10.0, 20.0
   for (int i = 0; i < 3; ++i) {
-    partition_range.push_back(TypedValue(i * 10.0f));
+    partition_ranges.push_back({ TypedValue(i * 10.0f) });
   }
   std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(
-      new RangePartitionSchemeHeader(TypeFactory::GetType(kFloat), 4, 0, move(partition_range)));
+      new RangePartitionSchemeHeader(4, { 0 }, { &TypeFactory::GetType(kFloat) }, move(partition_ranges)));
   EXPECT_EQ(4u, partition_scheme_header->getNumPartitions());
-  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeId());
+  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeIds().front());
   // Check if the partition id returned by the Range Partition Scheme for
   // floats is the same as the partition id into which it is supposed to
   // be based on the partition boundaries that we have defined.
-  EXPECT_EQ(1u, partition_scheme_header->getPartitionId(TypedValue(0.1f)));
-  EXPECT_EQ(2u, partition_scheme_header->getPartitionId(TypedValue(10.00000000f)));
-  EXPECT_EQ(3u, partition_scheme_header->getPartitionId(TypedValue(20.23f)));
-  EXPECT_EQ(3u, partition_scheme_header->getPartitionId(TypedValue(30.56f)));
-  EXPECT_EQ(0u, partition_scheme_header->getPartitionId(TypedValue(-4.5f)));
-  EXPECT_EQ(2u, partition_scheme_header->getPartitionId(TypedValue(15.034f)));
-  EXPECT_EQ(1u, partition_scheme_header->getPartitionId(TypedValue(6.987f)));
-  EXPECT_EQ(0u, partition_scheme_header->getPartitionId(TypedValue(-70.384f)));
+  EXPECT_EQ(1u, partition_scheme_header->getPartitionId({ TypedValue(0.1f) }));
+  EXPECT_EQ(2u, partition_scheme_header->getPartitionId({ TypedValue(10.00000000f) }));
+  EXPECT_EQ(3u, partition_scheme_header->getPartitionId({ TypedValue(20.23f) }));
+  EXPECT_EQ(3u, partition_scheme_header->getPartitionId({ TypedValue(30.56f) }));
+  EXPECT_EQ(0u, partition_scheme_header->getPartitionId({ TypedValue(-4.5f) }));
+  EXPECT_EQ(2u, partition_scheme_header->getPartitionId({ TypedValue(15.034f) }));
+  EXPECT_EQ(1u, partition_scheme_header->getPartitionId({ TypedValue(6.987f) }));
+  EXPECT_EQ(0u, partition_scheme_header->getPartitionId({ TypedValue(-70.384f) }));
 }
 
 TEST(PartitionSchemeHeaderTest, DoubleRangePartitionSchemeHeaderTest) {
-  std::vector<TypedValue> partition_range;
+  vector<PartitionSchemeHeader::PartitionValues> partition_ranges;
   // Partition boundaries are 0.00000, 10.00000, 20.00000
   for (int i = 0; i < 3; ++i) {
-    partition_range.push_back(TypedValue(i * 10.00000));
+    partition_ranges.push_back({ TypedValue(i * 10.00000) });
   }
   std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(
-      new RangePartitionSchemeHeader(TypeFactory::GetType(kDouble), 4, 0, move(partition_range)));
+      new RangePartitionSchemeHeader(4, { 0 }, { &TypeFactory::GetType(kDouble) }, move(partition_ranges)));
   EXPECT_EQ(4u, partition_scheme_header->getNumPartitions());
-  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeId());
+  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeIds().front());
   // Check if the partition id returned by the Range Partition Scheme for
   // doubles is the same as the partition id into which it is supposed to
   // be based on the partition boundaries that we have defined.
-  EXPECT_EQ(1u, partition_scheme_header->getPartitionId(TypedValue(0.1897438974)));
+  EXPECT_EQ(1u, partition_scheme_header->getPartitionId({ TypedValue(0.1897438974) }));
   EXPECT_EQ(2u,
-            partition_scheme_header->getPartitionId(TypedValue(10.00000000287489)));
+            partition_scheme_header->getPartitionId({ TypedValue(10.00000000287489) }));
   EXPECT_EQ(3u,
-            partition_scheme_header->getPartitionId(TypedValue(20.23249859403750)));
-  EXPECT_EQ(3u, partition_scheme_header->getPartitionId(TypedValue(30.567866347563)));
+            partition_scheme_header->getPartitionId({ TypedValue(20.23249859403750) }));
+  EXPECT_EQ(3u, partition_scheme_header->getPartitionId({ TypedValue(30.567866347563) }));
   EXPECT_EQ(0u,
-            partition_scheme_header->getPartitionId(TypedValue(-4.57583978935689)));
+            partition_scheme_header->getPartitionId({ TypedValue(-4.57583978935689) }));
   EXPECT_EQ(2u,
-            partition_scheme_header->getPartitionId(TypedValue(15.034248758978936)));
-  EXPECT_EQ(1u, partition_scheme_header->getPartitionId(TypedValue(6.98792489)));
+            partition_scheme_header->getPartitionId({ TypedValue(15.034248758978936) }));
+  EXPECT_EQ(1u, partition_scheme_header->getPartitionId({ TypedValue(6.98792489) }));
   EXPECT_EQ(
-      0u, partition_scheme_header->getPartitionId(TypedValue(-70.38454985893768738)));
+      0u, partition_scheme_header->getPartitionId({ TypedValue(-70.38454985893768738) }));
 }
 
 TEST(PartitionSchemeHeaderTest, CharacterRangePartitionSchemeHeaderTest) {
-  std::vector<TypedValue> partition_range;
+  vector<PartitionSchemeHeader::PartitionValues> partition_ranges;
   // Partition boundaries are the following 3 characters.
   const char *kRangeBoundaryStrings[] = {"don", "hippo", "pattasu"};
   const size_t num_boundaries = sizeof(kRangeBoundaryStrings) / sizeof(kRangeBoundaryStrings[0]);
   for (size_t i = 0; i < num_boundaries; ++i) {
-    partition_range.push_back(
-        TypedValue(kChar,
-                   kRangeBoundaryStrings[i],
-                   std::strlen(kRangeBoundaryStrings[i]) + 1));
+    partition_ranges.push_back(
+        { TypedValue(kChar, kRangeBoundaryStrings[i], std::strlen(kRangeBoundaryStrings[i]) + 1) });
   }
   std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(
-      new RangePartitionSchemeHeader(TypeFactory::GetType(kChar, 20, false), 4, 0, move(partition_range)));
+      new RangePartitionSchemeHeader(4, { 0 }, { &TypeFactory::GetType(kChar, 20, false) }, move(partition_ranges)));
   EXPECT_EQ(4u, partition_scheme_header->getNumPartitions());
-  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeId());
+  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeIds().front());
   const char *kSampleStrings[] = {"amma",
                                   "ganesh",
                                   "e",
@@ -306,27 +328,25 @@ TEST(PartitionSchemeHeaderTest, CharacterRangePartitionSchemeHeaderTest) {
   for (size_t i = 0; i < num_strings; ++i) {
     EXPECT_EQ(
         kExpectedPartitions[i],
-        partition_scheme_header->getPartitionId(TypedValue(
-            kChar, kSampleStrings[i], std::strlen(kSampleStrings[i]) + 1)));
+        partition_scheme_header->getPartitionId({ TypedValue(
+            kChar, kSampleStrings[i], std::strlen(kSampleStrings[i]) + 1) }));
   }
 }
 
 TEST(PartitionSchemeHeaderTest, VarCharRangePartitionSchemeHeaderTest) {
-  std::vector<TypedValue> partition_range;
+  vector<PartitionSchemeHeader::PartitionValues> partition_ranges;
   // Partition boundaries are the following 3 strings.
   const char *kRangeBoundaryStrings[] = { "elephant", "jamaica", "zorgonz"};
   const size_t num_boundaries = sizeof(kRangeBoundaryStrings) / sizeof(kRangeBoundaryStrings[0]);
   for (size_t i = 0; i < num_boundaries; ++i) {
-    partition_range.push_back(
-        TypedValue(kVarChar,
-                   kRangeBoundaryStrings[i],
-                   std::strlen(kRangeBoundaryStrings[i]) + 1));
+    partition_ranges.push_back(
+        { TypedValue(kVarChar, kRangeBoundaryStrings[i], std::strlen(kRangeBoundaryStrings[i]) + 1) });
   }
 
   std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(
-      new RangePartitionSchemeHeader(TypeFactory::GetType(kVarChar, 20, false), 4, 0, move(partition_range)));
+      new RangePartitionSchemeHeader(4, { 0 }, { &TypeFactory::GetType(kVarChar, 20, false) }, move(partition_ranges)));
   EXPECT_EQ(4u, partition_scheme_header->getNumPartitions());
-  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeId());
+  EXPECT_EQ(0, partition_scheme_header->getPartitionAttributeIds().front());
   const char *kSampleStrings[] = {"apple",
                                   "halloween",
                                   "mango",
@@ -343,15 +363,43 @@ TEST(PartitionSchemeHeaderTest, VarCharRangePartitionSchemeHeaderTest) {
   for (size_t i = 0; i < num_strings; ++i) {
     EXPECT_EQ(kExpectedPartitions[i],
               partition_scheme_header->getPartitionId(
-                  TypedValue(kVarChar,
-                             kSampleStrings[i],
-                             std::strlen(kSampleStrings[i]) + 1)));
+                  { TypedValue(kVarChar, kSampleStrings[i], std::strlen(kSampleStrings[i]) + 1) }));
+  }
+}
+
+TEST(PartitionSchemeHeaderTest, MultiAttributeRangePartitionSchemeHeaderTest) {
+  vector<PartitionSchemeHeader::PartitionValues> partition_ranges;
+  // Partition boundaries are { 0, 0.00000 }, { 10, 10.0000 }, { 20, 20.00000 }
+  // Last partition can hold upto infinity.
+  // First partition can hold from { -infinity, -infinity } to { 0, -2^(-1074) }.
+  for (int i = 0; i < 3; ++i) {
+    partition_ranges.push_back({ TypedValue(i * 10), TypedValue(i * 10.00000) });
   }
+
+  constexpr attribute_id kPartitioningFirstAttribute = 0;
+  constexpr attribute_id kPartitioningLastAttribute = 2;
+  std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(
+      new RangePartitionSchemeHeader(4, { kPartitioningFirstAttribute, kPartitioningLastAttribute },
+                                     { &TypeFactory::GetType(kInt), &TypeFactory::GetType(kDouble) },
+                                     move(partition_ranges)));
+  EXPECT_EQ(4u, partition_scheme_header->getNumPartitions());
+  EXPECT_EQ(kPartitioningFirstAttribute, partition_scheme_header->getPartitionAttributeIds().front());
+  EXPECT_EQ(kPartitioningLastAttribute, partition_scheme_header->getPartitionAttributeIds().back());
+
+  // Check if the partition id returned by the Range Partition Scheme for
+  // { int, double } is the same as the partition id into which it is supposed
+  // to be based on the partition boundaries that we have defined.
+  EXPECT_EQ(0u, partition_scheme_header->getPartitionId({ TypedValue(-70), TypedValue(30.567866347563) }));
+  EXPECT_EQ(0u, partition_scheme_header->getPartitionId({ TypedValue(0),   TypedValue(-4.57583978935689) }));
+  EXPECT_EQ(1u, partition_scheme_header->getPartitionId({ TypedValue(6),   TypedValue(15.034248758978936) }));
+  EXPECT_EQ(2u, partition_scheme_header->getPartitionId({ TypedValue(10),  TypedValue(10.00000000287489) }));
+  EXPECT_EQ(3u, partition_scheme_header->getPartitionId({ TypedValue(20),  TypedValue(20.23249859403750) }));
+  EXPECT_EQ(3u, partition_scheme_header->getPartitionId({ TypedValue(300), TypedValue(-70.38454985893768738) }));
 }
 
 TEST(PartitionSchemeTest, AddBlocksToPartitionTest) {
   std::unique_ptr<PartitionScheme> partition_scheme(
-      new PartitionScheme(new HashPartitionSchemeHeader(4, 0)));
+      new PartitionScheme(new HashPartitionSchemeHeader(4, { 0 })));
   for (int i = 0; i < 10; ++i) {
     partition_scheme->addBlockToPartition(i, i % 4);
   }
@@ -367,7 +415,7 @@ TEST(PartitionSchemeTest, AddBlocksToPartitionTest) {
       partition_scheme->getBlocksInPartition(3);
 
   EXPECT_EQ(4u, partition_scheme->getPartitionSchemeHeader().getNumPartitions());
-  EXPECT_EQ(0, partition_scheme->getPartitionSchemeHeader().getPartitionAttributeId());
+  EXPECT_EQ(0, partition_scheme->getPartitionSchemeHeader().getPartitionAttributeIds().front());
 
   // Check if the blocks are present in the partitions that we
   // expect them to be based on where we inserted them.
@@ -420,13 +468,13 @@ TEST(PartitionSchemeTest, AddBlocksToPartitionTest) {
 
 TEST(PartitionSchemeTest, RemoveBlocksFromPartitionTest) {
   std::unique_ptr<PartitionScheme> partition_scheme(
-      new PartitionScheme(new HashPartitionSchemeHeader(4, 0)));
+      new PartitionScheme(new HashPartitionSchemeHeader(4, { 0 })));
   for (int i = 0; i < 10; ++i) {
     partition_scheme->addBlockToPartition(i, i % 4);
   }
 
   EXPECT_EQ(4u, partition_scheme->getPartitionSchemeHeader().getNumPartitions());
-  EXPECT_EQ(0, partition_scheme->getPartitionSchemeHeader().getPartitionAttributeId());
+  EXPECT_EQ(0, partition_scheme->getPartitionSchemeHeader().getPartitionAttributeIds().front());
   // remove block 0 from partition 0
   partition_scheme->removeBlockFromPartition(0, 0);
   const std::vector<block_id> blocks_in_partition_zero =
@@ -504,7 +552,7 @@ TEST(PartitionSchemeTest, RemoveBlocksFromPartitionTest) {
 TEST(PartitionSchemeTest, CheckHashPartitionSchemeSerialization) {
   const std::size_t num_partitions = 4;
   std::unique_ptr<PartitionScheme> part_scheme(
-      new PartitionScheme(new HashPartitionSchemeHeader(num_partitions, 0)));
+      new PartitionScheme(new HashPartitionSchemeHeader(num_partitions, { 0 })));
   // Add some blocks to each partition.
   for (int i = 0; i < 10; ++i) {
     part_scheme->addBlockToPartition(i, i % num_partitions);
@@ -523,8 +571,8 @@ TEST(PartitionSchemeTest, CheckHashPartitionSchemeSerialization) {
   EXPECT_EQ(header.getNumPartitions(),
             header_from_proto.getNumPartitions());
   // Check the partition attribute id
-  EXPECT_EQ(header.getPartitionAttributeId(),
-            header_from_proto.getPartitionAttributeId());
+  EXPECT_EQ(header.getPartitionAttributeIds(),
+            header_from_proto.getPartitionAttributeIds());
   // Check the block in each partition
   for (partition_id part_id = 0; part_id < num_partitions; ++part_id) {
     // Collect the blocks from C++ Partition Scheme object.
@@ -542,75 +590,8 @@ TEST(PartitionSchemeTest, CheckHashPartitionSchemeSerialization) {
   }
 }
 
-TEST(PartitionSchemeTest, CheckRangePartitionSchemeSerialization) {
-  const Type &type = TypeFactory::GetType(kInt);
-  const std::size_t num_partitions = 4;
-  std::vector<TypedValue> partition_range;
-  // Partition boundaries are 0, 10, 20.
-  // Last partition can hold upto infinity.
-  // First partition can hold from -infinity to -1.
-  for (std::size_t i = 0; i < num_partitions - 1; ++i) {
-    partition_range.push_back(TypedValue(static_cast<int>(i * 10)));
-  }
-  std::unique_ptr<PartitionScheme> part_scheme(
-      new PartitionScheme(
-          new RangePartitionSchemeHeader(type, num_partitions, 0, move(partition_range))));
-  for (int i = 0; i < 10; ++i) {
-    part_scheme->addBlockToPartition(i * 5, i % num_partitions);
-  }
-  std::unique_ptr<PartitionScheme> part_scheme_from_proto;
-
-  part_scheme_from_proto.reset(
-      PartitionScheme::ReconstructFromProto(part_scheme->getProto()));
-
-  const PartitionSchemeHeader &header = part_scheme->getPartitionSchemeHeader();
-  const PartitionSchemeHeader &header_from_proto = part_scheme_from_proto->getPartitionSchemeHeader();
-
-  // Check the partition type
-  EXPECT_EQ(header.getPartitionType(),
-            header_from_proto.getPartitionType());
-
-  // Check number of partitions
-  EXPECT_EQ(header.getNumPartitions(),
-            header_from_proto.getNumPartitions());
-
-  // Check the partition attribute id
-  EXPECT_EQ(header.getPartitionAttributeId(),
-            header_from_proto.getPartitionAttributeId());
-
-  // Check the partition range boundaries' size.
-  const std::vector<TypedValue> &range_boundaries_part_scheme =
-      static_cast<const RangePartitionSchemeHeader&>(header).getPartitionRangeBoundaries();
-  const std::vector<TypedValue> &range_boundaries_part_scheme_from_proto =
-      static_cast<const RangePartitionSchemeHeader&>(header_from_proto).getPartitionRangeBoundaries();
-  EXPECT_EQ(range_boundaries_part_scheme.size(),
-            range_boundaries_part_scheme_from_proto.size());
-
-  // Check the partition range boundaries' values.
-  const Comparison &equal_comparison_op(EqualComparison::Instance());
-  std::unique_ptr<UncheckedComparator> equal_unchecked_comparator;
-  equal_unchecked_comparator.reset(
-      equal_comparison_op.makeUncheckedComparatorForTypes(
-          TypeFactory::GetType(kInt), TypeFactory::GetType(kInt)));
-  for (std::size_t i = 0; i < range_boundaries_part_scheme.size(); ++i) {
-    EXPECT_TRUE(equal_unchecked_comparator->compareTypedValues(
-        range_boundaries_part_scheme[i],
-        range_boundaries_part_scheme_from_proto[i]));
-  }
-
-  // Check the blocks in each partition from both the Partition Scheme's
-  // C++ object and protocol buffer.
-  for (partition_id part_id = 0; part_id < num_partitions; ++part_id) {
-    std::vector<block_id> blocks_in_part_scheme =
-        part_scheme->getBlocksInPartition(part_id);
-    std::vector<block_id> blocks_in_part_scheme_from_proto =
-        part_scheme_from_proto->getBlocksInPartition(part_id);
-    std::sort(blocks_in_part_scheme.begin(), blocks_in_part_scheme.end());
-    std::sort(blocks_in_part_scheme_from_proto.begin(),
-              blocks_in_part_scheme_from_proto.end());
-    EXPECT_EQ(blocks_in_part_scheme, blocks_in_part_scheme_from_proto);
-  }
-}
+// TODO(quickstep-team): Add back CheckRangePartitionSchemeSerialization test
+// due to QUICKSTEP-86.
 
 TEST(PartitionSchemeTest, CheckBlocksInPartitionTest) {
   std::unique_ptr<PartitionScheme> partition_scheme;
@@ -619,7 +600,7 @@ TEST(PartitionSchemeTest, CheckBlocksInPartitionTest) {
   constexpr attribute_id kPartitioningAttribute = 0;
   // Create a partition scheme object.
   partition_scheme.reset(
-      new PartitionScheme(new HashPartitionSchemeHeader(kNumPartitions, kPartitioningAttribute)));
+      new PartitionScheme(new HashPartitionSchemeHeader(kNumPartitions, { kPartitioningAttribute })));
   // Add blocks to different partitions.
   for (std::size_t block_id = 0; block_id < kNumBlocks; ++block_id) {
     partition_scheme->addBlockToPartition(block_id,
@@ -630,7 +611,7 @@ TEST(PartitionSchemeTest, CheckBlocksInPartitionTest) {
 
   // Check the number of partitions and the partitioning attribute.
   EXPECT_EQ(kNumPartitions, header.getNumPartitions());
-  EXPECT_EQ(kPartitioningAttribute, header.getPartitionAttributeId());
+  EXPECT_EQ(kPartitioningAttribute, header.getPartitionAttributeIds().front());
 
   // Check if the blocks are correctly assigned to its partitions.
   EXPECT_EQ(0u, partition_scheme->getPartitionForBlock(0));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/parser/tests/Create.test
----------------------------------------------------------------------
diff --git a/parser/tests/Create.test b/parser/tests/Create.test
index eee44a6..8c11054 100644
--- a/parser/tests/Create.test
+++ b/parser/tests/Create.test
@@ -209,6 +209,44 @@ CREATE TABLE test (attr INT) PARTITIONS 4
                              ^
 ==
 
+# Duplicate partition attributes.
+CREATE TABLE test (attr INT) PARTITION BY HASH(attr, attr) PARTITIONS 4
+--
+CreateTableStatement[relation_name=test]
++-attribute_list=
+| +-AttributeDefinition[name=attr,type=Int]
++-partition_clause=
+  +-PartitionClause[partition_type=hash]
+    +-Number of Partitions=NumericLiteral[numeric_string=4,float_like=false]
+    +-attribute_name_list=
+      +-String[value=attr]
+      +-String[value=attr]
+==
+
+# Multiple partition attributes.
+CREATE TABLE test (attr1 INT, attr2 LONG, attr3 FLOAT, attr4 DOUBLE, attr5 CHAR(5), attr6 VARCHAR(4))
+PARTITION BY HASH(attr1, attr2, attr3, attr4, attr5, attr6) PARTITIONS 4
+--
+CreateTableStatement[relation_name=test]
++-attribute_list=
+| +-AttributeDefinition[name=attr1,type=Int]
+| +-AttributeDefinition[name=attr2,type=Long]
+| +-AttributeDefinition[name=attr3,type=Float]
+| +-AttributeDefinition[name=attr4,type=Double]
+| +-AttributeDefinition[name=attr5,type=Char(5)]
+| +-AttributeDefinition[name=attr6,type=VarChar(4)]
++-partition_clause=
+  +-PartitionClause[partition_type=hash]
+    +-Number of Partitions=NumericLiteral[numeric_string=4,float_like=false]
+    +-attribute_name_list=
+      +-String[value=attr1]
+      +-String[value=attr2]
+      +-String[value=attr3]
+      +-String[value=attr4]
+      +-String[value=attr5]
+      +-String[value=attr6]
+==
+
 CREATE TABLE test (attr INT) WITH BLOCKPROPERTIES
 (TYPE rowstore)
 --

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 70b69e0..6fec85b 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -791,7 +791,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
       build_attribute_ids.size() == 1) {
     const PartitionSchemeHeader &partition_scheme_header =
         build_relation->getPartitionScheme()->getPartitionSchemeHeader();
-    if (build_attribute_ids[0] == partition_scheme_header.getPartitionAttributeId()) {
+    if (build_attribute_ids[0] == partition_scheme_header.getPartitionAttributeIds().front()) {
       // TODO(zuyu): add optimizer support for partitioned hash joins.
       hash_table_context_proto->set_num_partitions(num_partitions);
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index df589fd..ed465e5 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -695,23 +695,7 @@ const S::PartitionSchemeHeader* Resolver::resolvePartitionClause(
         << "Partition type must be specified and be a string.";
   }
 
-  const PtrList<ParseString> &attribute_name_list = partition_clause->attribute_name_list();
-  if (attribute_name_list.size() != 1) {
-    THROW_SQL_ERROR_AT(partition_clause)
-        << "Partition is supported on only one attribute.";
-  }
-
-  const ParseString &partition_attribute_name = attribute_name_list.front();
-  const attribute_id attr_id = GetAttributeIdFromName(create_table_statement.attribute_definition_list(),
-                                                      partition_attribute_name.value());
-  if (attr_id == kInvalidAttributeID) {
-    THROW_SQL_ERROR_AT(&partition_attribute_name)
-        << "The given attribute was not found.";
-  }
-
   auto proto = make_unique<S::PartitionSchemeHeader>();
-  proto->set_num_partitions(partition_clause->num_partitions()->long_value());
-  proto->set_partition_attribute_id(attr_id);
 
   const std::string partition_type = ToLower(partition_type_string->value());
   if (partition_type == kHashPartitionType) {
@@ -724,6 +708,25 @@ const S::PartitionSchemeHeader* Resolver::resolvePartitionClause(
     THROW_SQL_ERROR_AT(partition_type_string) << "Unrecognized partition type: " << partition_type;
   }
 
+  proto->set_num_partitions(partition_clause->num_partitions()->long_value());
+
+  std::unordered_set<attribute_id> unique_partition_attrs;
+  for (const ParseString &partition_attribute_name : partition_clause->attribute_name_list()) {
+    const attribute_id attr_id = GetAttributeIdFromName(create_table_statement.attribute_definition_list(),
+                                                        partition_attribute_name.value());
+    if (attr_id == kInvalidAttributeID) {
+      THROW_SQL_ERROR_AT(&partition_attribute_name)
+          << "The given attribute was not found.";
+    } else if (unique_partition_attrs.find(attr_id) != unique_partition_attrs.end()) {
+      THROW_SQL_ERROR_AT(&partition_attribute_name)
+          << "A duplicate partition attribute was found.";
+    }
+
+    unique_partition_attrs.insert(attr_id);
+
+    proto->add_partition_attribute_ids(attr_id);
+  }
+
   return proto.release();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/query_optimizer/tests/execution_generator/Create.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Create.test b/query_optimizer/tests/execution_generator/Create.test
index 4ffa665..b3ef724 100644
--- a/query_optimizer/tests/execution_generator/Create.test
+++ b/query_optimizer/tests/execution_generator/Create.test
@@ -49,3 +49,20 @@ SELECT * FROM foo3;
 |attr       |
 +-----------+
 +-----------+
+==
+
+CREATE TABLE foo4 (attr INT) PARTITION BY HASH(attr, attr) PARTITIONS 4;
+--
+ERROR: A duplicate partition attribute was found. (1 : 54)
+...foo4 (attr INT) PARTITION BY HASH(attr, attr) PARTITIONS 4;
+                                           ^
+==
+
+CREATE TABLE foo5 (attr1 INT, attr2 LONG, attr3 FLOAT, attr4 DOUBLE, attr5 CHAR(5), attr6 VARCHAR(4))
+PARTITION BY HASH(attr1, attr2, attr3, attr4, attr5, attr6) PARTITIONS 4;
+SELECT * FROM foo5;
+--
++-----------+--------------------+---------------+------------------------+-----+-----+
+|attr1      |attr2               |attr3          |attr4                   |attr5|attr6|
++-----------+--------------------+---------------+------------------------+-----+-----+
++-----------+--------------------+---------------+------------------------+-----+-----+

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/query_optimizer/tests/logical_generator/Create.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/logical_generator/Create.test b/query_optimizer/tests/logical_generator/Create.test
index 04134f9..aac49fb 100644
--- a/query_optimizer/tests/logical_generator/Create.test
+++ b/query_optimizer/tests/logical_generator/Create.test
@@ -56,3 +56,24 @@ TopLevelPlan
 |   +-AttributeReference[id=0,name=attr,relation=foo,type=Int]
 +-output_attributes=
   +-AttributeReference[id=0,name=attr,relation=foo,type=Int]
+==
+
+CREATE TABLE foo (attr1 INT, attr2 LONG, attr3 FLOAT, attr4 DOUBLE, attr5 CHAR(5), attr6 VARCHAR(4))
+PARTITION BY HASH(attr1, attr2, attr3, attr4, attr5, attr6) PARTITIONS 4;
+--
+TopLevelPlan
++-plan=CreateTable[relation=foo]
+| +-attributes=
+|   +-AttributeReference[id=0,name=attr1,relation=foo,type=Int]
+|   +-AttributeReference[id=1,name=attr2,relation=foo,type=Long]
+|   +-AttributeReference[id=2,name=attr3,relation=foo,type=Float]
+|   +-AttributeReference[id=3,name=attr4,relation=foo,type=Double]
+|   +-AttributeReference[id=4,name=attr5,relation=foo,type=Char(5)]
+|   +-AttributeReference[id=5,name=attr6,relation=foo,type=VarChar(4)]
++-output_attributes=
+  +-AttributeReference[id=0,name=attr1,relation=foo,type=Int]
+  +-AttributeReference[id=1,name=attr2,relation=foo,type=Long]
+  +-AttributeReference[id=2,name=attr3,relation=foo,type=Float]
+  +-AttributeReference[id=3,name=attr4,relation=foo,type=Double]
+  +-AttributeReference[id=4,name=attr5,relation=foo,type=Char(5)]
+  +-AttributeReference[id=5,name=attr6,relation=foo,type=VarChar(4)]

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/query_optimizer/tests/physical_generator/Create.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/physical_generator/Create.test b/query_optimizer/tests/physical_generator/Create.test
index c555371..161cc00 100644
--- a/query_optimizer/tests/physical_generator/Create.test
+++ b/query_optimizer/tests/physical_generator/Create.test
@@ -133,3 +133,42 @@ TopLevelPlan
 |   +-AttributeReference[id=0,name=attr,relation=foo,type=Int]
 +-output_attributes=
   +-AttributeReference[id=0,name=attr,relation=foo,type=Int]
+==
+
+CREATE TABLE foo (attr1 INT, attr2 LONG, attr3 FLOAT, attr4 DOUBLE, attr5 CHAR(5), attr6 VARCHAR(4))
+PARTITION BY HASH(attr1, attr2, attr3, attr4, attr5, attr6) PARTITIONS 4;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=CreateTable[relation=foo]
+| +-attributes=
+|   +-AttributeReference[id=0,name=attr1,relation=foo,type=Int]
+|   +-AttributeReference[id=1,name=attr2,relation=foo,type=Long]
+|   +-AttributeReference[id=2,name=attr3,relation=foo,type=Float]
+|   +-AttributeReference[id=3,name=attr4,relation=foo,type=Double]
+|   +-AttributeReference[id=4,name=attr5,relation=foo,type=Char(5)]
+|   +-AttributeReference[id=5,name=attr6,relation=foo,type=VarChar(4)]
++-output_attributes=
+  +-AttributeReference[id=0,name=attr1,relation=foo,type=Int]
+  +-AttributeReference[id=1,name=attr2,relation=foo,type=Long]
+  +-AttributeReference[id=2,name=attr3,relation=foo,type=Float]
+  +-AttributeReference[id=3,name=attr4,relation=foo,type=Double]
+  +-AttributeReference[id=4,name=attr5,relation=foo,type=Char(5)]
+  +-AttributeReference[id=5,name=attr6,relation=foo,type=VarChar(4)]
+[Physical Plan]
+TopLevelPlan
++-plan=CreateTable[relation=foo]
+| +-attributes=
+|   +-AttributeReference[id=0,name=attr1,relation=foo,type=Int]
+|   +-AttributeReference[id=1,name=attr2,relation=foo,type=Long]
+|   +-AttributeReference[id=2,name=attr3,relation=foo,type=Float]
+|   +-AttributeReference[id=3,name=attr4,relation=foo,type=Double]
+|   +-AttributeReference[id=4,name=attr5,relation=foo,type=Char(5)]
+|   +-AttributeReference[id=5,name=attr6,relation=foo,type=VarChar(4)]
++-output_attributes=
+  +-AttributeReference[id=0,name=attr1,relation=foo,type=Int]
+  +-AttributeReference[id=1,name=attr2,relation=foo,type=Long]
+  +-AttributeReference[id=2,name=attr3,relation=foo,type=Float]
+  +-AttributeReference[id=3,name=attr4,relation=foo,type=Double]
+  +-AttributeReference[id=4,name=attr5,relation=foo,type=Char(5)]
+  +-AttributeReference[id=5,name=attr6,relation=foo,type=VarChar(4)]

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/query_optimizer/tests/resolver/Create.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/resolver/Create.test b/query_optimizer/tests/resolver/Create.test
index 28bd4f5..1372cf4 100644
--- a/query_optimizer/tests/resolver/Create.test
+++ b/query_optimizer/tests/resolver/Create.test
@@ -239,3 +239,24 @@ TopLevelPlan
 |   +-AttributeReference[id=0,name=attr,relation=foo,type=Int]
 +-output_attributes=
   +-AttributeReference[id=0,name=attr,relation=foo,type=Int]
+==
+
+CREATE TABLE foo (attr1 INT, attr2 LONG, attr3 FLOAT, attr4 DOUBLE, attr5 CHAR(5), attr6 VARCHAR(4))
+PARTITION BY HASH(attr1, attr2, attr3, attr4, attr5, attr6) PARTITIONS 4;
+--
+TopLevelPlan
++-plan=CreateTable[relation=foo]
+| +-attributes=
+|   +-AttributeReference[id=0,name=attr1,relation=foo,type=Int]
+|   +-AttributeReference[id=1,name=attr2,relation=foo,type=Long]
+|   +-AttributeReference[id=2,name=attr3,relation=foo,type=Float]
+|   +-AttributeReference[id=3,name=attr4,relation=foo,type=Double]
+|   +-AttributeReference[id=4,name=attr5,relation=foo,type=Char(5)]
+|   +-AttributeReference[id=5,name=attr6,relation=foo,type=VarChar(4)]
++-output_attributes=
+  +-AttributeReference[id=0,name=attr1,relation=foo,type=Int]
+  +-AttributeReference[id=1,name=attr2,relation=foo,type=Long]
+  +-AttributeReference[id=2,name=attr3,relation=foo,type=Float]
+  +-AttributeReference[id=3,name=attr4,relation=foo,type=Double]
+  +-AttributeReference[id=4,name=attr5,relation=foo,type=Char(5)]
+  +-AttributeReference[id=5,name=attr6,relation=foo,type=VarChar(4)]

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/relational_operators/tests/HashJoinOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/HashJoinOperator_unittest.cpp b/relational_operators/tests/HashJoinOperator_unittest.cpp
index 03350d4..8338872 100644
--- a/relational_operators/tests/HashJoinOperator_unittest.cpp
+++ b/relational_operators/tests/HashJoinOperator_unittest.cpp
@@ -269,11 +269,11 @@ class HashJoinOperatorTest : public ::testing::TestWithParam<HashTableImplType>
   void insertTuplesWithSingleAttributePartitions() {
     // Set PartitionScheme.
     dim_part_scheme_ = new PartitionScheme(
-        new HashPartitionSchemeHeader(kMultiplePartitions, dim_table_->getAttributeByName("long")->getID()));
+        new HashPartitionSchemeHeader(kMultiplePartitions, { dim_table_->getAttributeByName("long")->getID() }));
     dim_table_->setPartitionScheme(dim_part_scheme_);
 
     fact_part_scheme_ = new PartitionScheme(
-        new HashPartitionSchemeHeader(kMultiplePartitions, fact_table_->getAttributeByName("long")->getID()));
+        new HashPartitionSchemeHeader(kMultiplePartitions, { fact_table_->getAttributeByName("long")->getID() }));
     fact_table_->setPartitionScheme(fact_part_scheme_);
 
     // Create StorageLayout

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 6fe6436..cb1f098 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -779,6 +779,7 @@ target_link_libraries(quickstep_storage_InsertDestination
                       tmb)
 target_link_libraries(quickstep_storage_InsertDestinationInterface
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_catalog_PartitionSchemeHeader
                       quickstep_types_containers_Tuple)
 target_link_libraries(quickstep_storage_InsertDestination_proto
                       quickstep_catalog_Catalog_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 75e1217..b6a9e3a 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -560,14 +560,16 @@ const std::vector<block_id>& PartitionAwareInsertDestination::getTouchedBlocksIn
   return done_block_ids_[part_id];
 }
 
-attribute_id PartitionAwareInsertDestination::getPartitioningAttribute() const {
-  return partition_scheme_header_->getPartitionAttributeId();
+PartitionSchemeHeader::PartitionAttributeIds PartitionAwareInsertDestination::getPartitioningAttributes() const {
+  return partition_scheme_header_->getPartitionAttributeIds();
 }
 
 void PartitionAwareInsertDestination::insertTuple(const Tuple &tuple) {
-  const partition_id part_id =
-      partition_scheme_header_->getPartitionId(
-          tuple.getAttributeValue(partition_scheme_header_->getPartitionAttributeId()));
+  PartitionSchemeHeader::PartitionValues values;
+  for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
+    values.push_back(tuple.getAttributeValue(attr_id));
+  }
+  const partition_id part_id = partition_scheme_header_->getPartitionId(values);
 
   MutableBlockReference output_block = getBlockForInsertionInPartition(part_id);
 
@@ -586,9 +588,11 @@ void PartitionAwareInsertDestination::insertTuple(const Tuple &tuple) {
 }
 
 void PartitionAwareInsertDestination::insertTupleInBatch(const Tuple &tuple) {
-  const partition_id part_id =
-      partition_scheme_header_->getPartitionId(
-          tuple.getAttributeValue(partition_scheme_header_->getPartitionAttributeId()));
+  PartitionSchemeHeader::PartitionValues values;
+  for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
+    values.push_back(tuple.getAttributeValue(attr_id));
+  }
+  const partition_id part_id = partition_scheme_header_->getPartitionId(values);
 
   MutableBlockReference output_block = getBlockForInsertionInPartition(part_id);
 
@@ -608,12 +612,10 @@ void PartitionAwareInsertDestination::insertTupleInBatch(const Tuple &tuple) {
 
 void PartitionAwareInsertDestination::bulkInsertTuples(ValueAccessor *accessor, bool always_mark_full) {
   const std::size_t num_partitions = partition_scheme_header_->getNumPartitions();
-  const attribute_id partition_attribute_id = partition_scheme_header_->getPartitionAttributeId();
 
   InvokeOnAnyValueAccessor(
       accessor,
       [this,
-       &partition_attribute_id,
        &always_mark_full,
        &num_partitions](auto *accessor) -> void {  // NOLINT(build/c++11)
     std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
@@ -627,8 +629,11 @@ void PartitionAwareInsertDestination::bulkInsertTuples(ValueAccessor *accessor,
     // set a bit in the appropriate TupleIdSequence.
     accessor->beginIteration();
     while (accessor->next()) {
-      TypedValue attr_val = accessor->getTypedValue(partition_attribute_id);
-      partition_membership[partition_scheme_header_->getPartitionId(attr_val)]
+      PartitionSchemeHeader::PartitionValues values;
+      for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
+        values.push_back(accessor->getTypedValue(attr_id));
+      }
+      partition_membership[partition_scheme_header_->getPartitionId(values)]
           ->set(accessor->getCurrentPosition(), true);
     }
 
@@ -662,12 +667,10 @@ void PartitionAwareInsertDestination::bulkInsertTuples(ValueAccessor *accessor,
 void PartitionAwareInsertDestination::bulkInsertTuplesWithRemappedAttributes(
     const std::vector<attribute_id> &attribute_map, ValueAccessor *accessor, bool always_mark_full) {
   const std::size_t num_partitions = partition_scheme_header_->getNumPartitions();
-  const attribute_id partition_attribute_id = partition_scheme_header_->getPartitionAttributeId();
 
   InvokeOnAnyValueAccessor(
       accessor,
       [this,
-       &partition_attribute_id,
        &attribute_map,
        &always_mark_full,
        &num_partitions](auto *accessor) -> void {  // NOLINT(build/c++11)
@@ -682,8 +685,11 @@ void PartitionAwareInsertDestination::bulkInsertTuplesWithRemappedAttributes(
     // set a bit in the appropriate TupleIdSequence.
     accessor->beginIteration();
     while (accessor->next()) {
-      TypedValue attr_val = accessor->getTypedValue(attribute_map[partition_attribute_id]);
-      partition_membership[partition_scheme_header_->getPartitionId(attr_val)]
+      PartitionSchemeHeader::PartitionValues values;
+      for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
+        values.push_back(accessor->getTypedValue(attr_id));
+      }
+      partition_membership[partition_scheme_header_->getPartitionId(values)]
           ->set(accessor->getCurrentPosition(), true);
     }
 
@@ -720,10 +726,14 @@ void PartitionAwareInsertDestination::insertTuplesFromVector(std::vector<Tuple>:
     return;
   }
 
-  const attribute_id partition_attribute_id = partition_scheme_header_->getPartitionAttributeId();
   for (; begin != end; ++begin) {
-    const partition_id part_id =
-        partition_scheme_header_->getPartitionId(begin->getAttributeValue(partition_attribute_id));
+    PartitionSchemeHeader::PartitionValues values;
+    for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
+      values.push_back(begin->getAttributeValue(attr_id));
+    }
+
+    const partition_id part_id = partition_scheme_header_->getPartitionId(values);
+
     MutableBlockReference dest_block = getBlockForInsertionInPartition(part_id);
     // FIXME(chasseur): Deal with TupleTooLargeForBlock exception.
     while (!dest_block->insertTupleInBatch(*begin)) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index e9335ce..dc5a093 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -137,8 +137,8 @@ class InsertDestination : public InsertDestinationInterface {
     return relation_;
   }
 
-  attribute_id getPartitioningAttribute() const override {
-    return -1;
+  PartitionSchemeHeader::PartitionAttributeIds getPartitioningAttributes() const override {
+    return {};
   }
 
   void insertTuple(const Tuple &tuple) override;
@@ -515,7 +515,7 @@ class PartitionAwareInsertDestination : public InsertDestination {
     available_block_refs_[part_id].clear();
   }
 
-  attribute_id getPartitioningAttribute() const override;
+  PartitionSchemeHeader::PartitionAttributeIds getPartitioningAttributes() const override;
 
   void insertTuple(const Tuple &tuple) override;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/storage/InsertDestinationInterface.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestinationInterface.hpp b/storage/InsertDestinationInterface.hpp
index be6b0c2..b8c584b 100644
--- a/storage/InsertDestinationInterface.hpp
+++ b/storage/InsertDestinationInterface.hpp
@@ -24,6 +24,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionSchemeHeader.hpp"
 #include "types/containers/Tuple.hpp"
 
 namespace quickstep {
@@ -58,15 +59,15 @@ class InsertDestinationInterface {
   virtual const CatalogRelationSchema& getRelation() const = 0;
 
   /**
-   * @brief Get the attribute ID used for partitioning, if any.
+   * @brief Get the attribute IDs used for partitioning, if any.
    * @note This is intended only for use by StorageBlock::update(), to
    *       determine whether it is safe to do in-place updates or whether all
    *       updated Tuples must be relocated to land in the correct partition.
    *
-   * @return The ID of the attribute used for partitioning, or -1 if there is
-   *         no partitioning.
+   * @return The IDs of the attribute used for partitioning, or empty if there
+   *         is no partitioning.
    **/
-  virtual attribute_id getPartitioningAttribute() const = 0;
+  virtual PartitionSchemeHeader::PartitionAttributeIds getPartitioningAttributes() const = 0;
 
   /**
    * @brief Insert a single tuple into a block managed by this

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/05b47b5a/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index 0cc7735..e91c1ac 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -429,8 +429,7 @@ StorageBlock::UpdateResult StorageBlock::update(
 
   // To be safe, relocate ALL tuples if the relation is partitioned and we are
   // updating the partitioning attribute.
-  const bool relocate_all =
-      assignments.find(relocation_destination->getPartitioningAttribute()) != assignments.end();
+  const bool relocate_all = !relocation_destination->getPartitioningAttributes().empty();
 
   // IDs of tuples which should be re-added to indices.
   TupleIdSequence in_place_ids(tuple_store_->getMaxTupleID() + 1);