You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/06/14 23:55:06 UTC

incubator-quickstep git commit: Added RandomPartitionSchemeHeader.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 7c5d76223 -> cbb84b4d6


Added RandomPartitionSchemeHeader.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/cbb84b4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/cbb84b4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/cbb84b4d

Branch: refs/heads/master
Commit: cbb84b4d619540f8ca7c586695fece0e46cfeaf7
Parents: 7c5d762
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Jun 3 09:39:48 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Jun 14 18:53:43 2017 -0500

----------------------------------------------------------------------
 catalog/CMakeLists.txt                     |  2 ++
 catalog/Catalog.proto                      |  2 ++
 catalog/PartitionSchemeHeader.cpp          |  7 ++++
 catalog/PartitionSchemeHeader.hpp          | 43 ++++++++++++++++++++++++-
 catalog/tests/PartitionScheme_unittest.cpp | 41 +++++++++++++++++++++++
 5 files changed, 94 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbb84b4d/catalog/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/catalog/CMakeLists.txt b/catalog/CMakeLists.txt
index 9684cfe..0ca014e 100644
--- a/catalog/CMakeLists.txt
+++ b/catalog/CMakeLists.txt
@@ -168,6 +168,8 @@ target_link_libraries(quickstep_catalog_PartitionSchemeHeader
                       glog
                       quickstep_catalog_CatalogTypedefs
                       quickstep_catalog_Catalog_proto
+                      quickstep_storage_StorageConstants
+                      quickstep_threading_SpinMutex
                       quickstep_types_Type
                       quickstep_types_TypeFactory
                       quickstep_types_Type_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbb84b4d/catalog/Catalog.proto
----------------------------------------------------------------------
diff --git a/catalog/Catalog.proto b/catalog/Catalog.proto
index 4e2fafe..6c6accf 100644
--- a/catalog/Catalog.proto
+++ b/catalog/Catalog.proto
@@ -31,8 +31,10 @@ message CatalogAttribute {
 
 // TODO(zuyu): Move PartitionScheme to a dedicate proto file.
 message PartitionSchemeHeader {
+  // Next tag: 3.
   enum PartitionType {
     HASH = 0;
+    RANDOM = 2;
     RANGE = 1;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbb84b4d/catalog/PartitionSchemeHeader.cpp
----------------------------------------------------------------------
diff --git a/catalog/PartitionSchemeHeader.cpp b/catalog/PartitionSchemeHeader.cpp
index 30fa58d..f7a0971 100644
--- a/catalog/PartitionSchemeHeader.cpp
+++ b/catalog/PartitionSchemeHeader.cpp
@@ -74,6 +74,7 @@ bool PartitionSchemeHeader::ProtoIsValid(
   // Check that the proto has a valid partition type.
   switch (proto.partition_type()) {
     case serialization::PartitionSchemeHeader::HASH:
+    case serialization::PartitionSchemeHeader::RANDOM:
       return true;
     case serialization::PartitionSchemeHeader::RANGE: {
       return proto.HasExtension(serialization::RangePartitionSchemeHeader::partition_range_boundaries) &&
@@ -104,6 +105,9 @@ PartitionSchemeHeader* PartitionSchemeHeader::ReconstructFromProto(
     case serialization::PartitionSchemeHeader::HASH: {
       return new HashPartitionSchemeHeader(proto.num_partitions(), move(partition_attribute_ids));
     }
+    case serialization::PartitionSchemeHeader::RANDOM: {
+      return new RandomPartitionSchemeHeader(proto.num_partitions());
+    }
     case serialization::PartitionSchemeHeader::RANGE: {
       std::vector<const Type*> attr_types;
       for (int i = 0; i < proto.ExtensionSize(serialization::RangePartitionSchemeHeader::partition_attr_types); ++i) {
@@ -142,6 +146,9 @@ serialization::PartitionSchemeHeader PartitionSchemeHeader::getProto() const {
     case PartitionType::kHash:
       proto.set_partition_type(serialization::PartitionSchemeHeader::HASH);
       break;
+    case PartitionType::kRandom:
+      proto.set_partition_type(serialization::PartitionSchemeHeader::RANDOM);
+      break;
     case PartitionType::kRange:
       proto.set_partition_type(serialization::PartitionSchemeHeader::RANGE);
       break;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbb84b4d/catalog/PartitionSchemeHeader.hpp
----------------------------------------------------------------------
diff --git a/catalog/PartitionSchemeHeader.hpp b/catalog/PartitionSchemeHeader.hpp
index a03b0e2..9bbbc0f 100644
--- a/catalog/PartitionSchemeHeader.hpp
+++ b/catalog/PartitionSchemeHeader.hpp
@@ -22,11 +22,14 @@
 
 #include <cstddef>
 #include <memory>
+#include <random>
 #include <utility>
 #include <vector>
 
 #include "catalog/Catalog.pb.h"
 #include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageConstants.hpp"
+#include "threading/SpinMutex.hpp"
 #include "types/TypedValue.hpp"
 #include "types/operations/comparisons/Comparison.hpp"
 #include "types/operations/comparisons/EqualComparison.hpp"
@@ -57,8 +60,9 @@ class PartitionSchemeHeader {
   // PartitionValues.size() should be equal to PartitionAttributeIds.size().
   typedef std::vector<TypedValue> PartitionValues;
 
-  enum PartitionType {
+  enum class PartitionType {
     kHash = 0,
+    kRandom,
     kRange
   };
 
@@ -201,6 +205,43 @@ class HashPartitionSchemeHeader final : public PartitionSchemeHeader {
 
 /**
  * @brief Implementation of PartitionSchemeHeader that partitions the tuples in
+ *        a relation randomly.
+**/
+class RandomPartitionSchemeHeader final : public PartitionSchemeHeader {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param num_partitions The number of partitions to be created.
+   **/
+  explicit RandomPartitionSchemeHeader(const std::size_t num_partitions)
+      : PartitionSchemeHeader(PartitionType::kRandom, num_partitions, {}),
+        mt_(1729),
+        dist_(0, num_partitions - 1) {
+  }
+
+  /**
+   * @brief Destructor.
+   **/
+  ~RandomPartitionSchemeHeader() override {}
+
+  partition_id getPartitionId(
+      const PartitionValues &value_of_attributes) const override {
+    SpinMutexLock lock(mutex_);
+    return dist_(mt_);
+  }
+
+ private:
+  mutable std::mt19937_64 mt_;
+  mutable std::uniform_int_distribution<partition_id> dist_;
+
+  alignas(kCacheLineBytes) mutable SpinMutex mutex_;
+
+  DISALLOW_COPY_AND_ASSIGN(RandomPartitionSchemeHeader);
+};
+
+/**
+ * @brief Implementation of PartitionSchemeHeader that partitions the tuples in
  *        a relation based on a given value range on the partitioning attribute.
 **/
 class RangePartitionSchemeHeader final : public PartitionSchemeHeader {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbb84b4d/catalog/tests/PartitionScheme_unittest.cpp
----------------------------------------------------------------------
diff --git a/catalog/tests/PartitionScheme_unittest.cpp b/catalog/tests/PartitionScheme_unittest.cpp
index c4f7dec..190978f 100644
--- a/catalog/tests/PartitionScheme_unittest.cpp
+++ b/catalog/tests/PartitionScheme_unittest.cpp
@@ -590,6 +590,47 @@ TEST(PartitionSchemeTest, CheckHashPartitionSchemeSerialization) {
   }
 }
 
+TEST(PartitionSchemeTest, CheckRandomPartitionSchemeSerialization) {
+  const std::size_t num_partitions = 4;
+  std::unique_ptr<PartitionScheme> part_scheme(
+      new PartitionScheme(new RandomPartitionSchemeHeader(num_partitions)));
+  // Add some blocks to each partition.
+  for (int i = 0; i < 10; ++i) {
+    part_scheme->addBlockToPartition(i, i % num_partitions);
+  }
+  std::unique_ptr<PartitionScheme> part_scheme_from_proto;
+  part_scheme_from_proto.reset(
+      PartitionScheme::ReconstructFromProto(part_scheme->getProto()));
+
+  const PartitionSchemeHeader &header = part_scheme->getPartitionSchemeHeader();
+  const PartitionSchemeHeader &header_from_proto = part_scheme_from_proto->getPartitionSchemeHeader();
+
+  // Check the partition type
+  EXPECT_EQ(header.getPartitionType(),
+            header_from_proto.getPartitionType());
+  // Check number of partitions
+  EXPECT_EQ(header.getNumPartitions(),
+            header_from_proto.getNumPartitions());
+  // Check the partition attribute id
+  EXPECT_EQ(header.getPartitionAttributeIds(),
+            header_from_proto.getPartitionAttributeIds());
+  // Check the block in each partition
+  for (partition_id part_id = 0; part_id < num_partitions; ++part_id) {
+    // Collect the blocks from C++ Partition Scheme object.
+    std::vector<block_id> blocks_in_part_scheme =
+        part_scheme->getBlocksInPartition(part_id);
+    // Collect the blocks from Partition Scheme's protocol buffer.
+    std::vector<block_id> blocks_in_part_scheme_from_proto =
+        part_scheme_from_proto->getBlocksInPartition(part_id);
+    // Sort both these vector of block ids so that we can compare them.
+    std::sort(blocks_in_part_scheme.begin(), blocks_in_part_scheme.end());
+    std::sort(blocks_in_part_scheme_from_proto.begin(),
+              blocks_in_part_scheme_from_proto.end());
+    // Compare the two sorted lists to check if they are equal.
+    EXPECT_EQ(blocks_in_part_scheme, blocks_in_part_scheme_from_proto);
+  }
+}
+
 // TODO(quickstep-team): Add back CheckRangePartitionSchemeSerialization test
 // due to QUICKSTEP-86.