You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/06/14 23:55:06 UTC
incubator-quickstep git commit: Added RandomPartitionSchemeHeader.
Repository: incubator-quickstep
Updated Branches:
refs/heads/master 7c5d76223 -> cbb84b4d6
Added RandomPartitionSchemeHeader.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/cbb84b4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/cbb84b4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/cbb84b4d
Branch: refs/heads/master
Commit: cbb84b4d619540f8ca7c586695fece0e46cfeaf7
Parents: 7c5d762
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Jun 3 09:39:48 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Jun 14 18:53:43 2017 -0500
----------------------------------------------------------------------
catalog/CMakeLists.txt | 2 ++
catalog/Catalog.proto | 2 ++
catalog/PartitionSchemeHeader.cpp | 7 ++++
catalog/PartitionSchemeHeader.hpp | 43 ++++++++++++++++++++++++-
catalog/tests/PartitionScheme_unittest.cpp | 41 +++++++++++++++++++++++
5 files changed, 94 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbb84b4d/catalog/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/catalog/CMakeLists.txt b/catalog/CMakeLists.txt
index 9684cfe..0ca014e 100644
--- a/catalog/CMakeLists.txt
+++ b/catalog/CMakeLists.txt
@@ -168,6 +168,8 @@ target_link_libraries(quickstep_catalog_PartitionSchemeHeader
glog
quickstep_catalog_CatalogTypedefs
quickstep_catalog_Catalog_proto
+ quickstep_storage_StorageConstants
+ quickstep_threading_SpinMutex
quickstep_types_Type
quickstep_types_TypeFactory
quickstep_types_Type_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbb84b4d/catalog/Catalog.proto
----------------------------------------------------------------------
diff --git a/catalog/Catalog.proto b/catalog/Catalog.proto
index 4e2fafe..6c6accf 100644
--- a/catalog/Catalog.proto
+++ b/catalog/Catalog.proto
@@ -31,8 +31,10 @@ message CatalogAttribute {
// TODO(zuyu): Move PartitionScheme to a dedicate proto file.
message PartitionSchemeHeader {
+ // Next tag: 3.
enum PartitionType {
HASH = 0;
+ RANDOM = 2;
RANGE = 1;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbb84b4d/catalog/PartitionSchemeHeader.cpp
----------------------------------------------------------------------
diff --git a/catalog/PartitionSchemeHeader.cpp b/catalog/PartitionSchemeHeader.cpp
index 30fa58d..f7a0971 100644
--- a/catalog/PartitionSchemeHeader.cpp
+++ b/catalog/PartitionSchemeHeader.cpp
@@ -74,6 +74,7 @@ bool PartitionSchemeHeader::ProtoIsValid(
// Check that the proto has a valid partition type.
switch (proto.partition_type()) {
case serialization::PartitionSchemeHeader::HASH:
+ case serialization::PartitionSchemeHeader::RANDOM:
return true;
case serialization::PartitionSchemeHeader::RANGE: {
return proto.HasExtension(serialization::RangePartitionSchemeHeader::partition_range_boundaries) &&
@@ -104,6 +105,9 @@ PartitionSchemeHeader* PartitionSchemeHeader::ReconstructFromProto(
case serialization::PartitionSchemeHeader::HASH: {
return new HashPartitionSchemeHeader(proto.num_partitions(), move(partition_attribute_ids));
}
+ case serialization::PartitionSchemeHeader::RANDOM: {
+ return new RandomPartitionSchemeHeader(proto.num_partitions());
+ }
case serialization::PartitionSchemeHeader::RANGE: {
std::vector<const Type*> attr_types;
for (int i = 0; i < proto.ExtensionSize(serialization::RangePartitionSchemeHeader::partition_attr_types); ++i) {
@@ -142,6 +146,9 @@ serialization::PartitionSchemeHeader PartitionSchemeHeader::getProto() const {
case PartitionType::kHash:
proto.set_partition_type(serialization::PartitionSchemeHeader::HASH);
break;
+ case PartitionType::kRandom:
+ proto.set_partition_type(serialization::PartitionSchemeHeader::RANDOM);
+ break;
case PartitionType::kRange:
proto.set_partition_type(serialization::PartitionSchemeHeader::RANGE);
break;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbb84b4d/catalog/PartitionSchemeHeader.hpp
----------------------------------------------------------------------
diff --git a/catalog/PartitionSchemeHeader.hpp b/catalog/PartitionSchemeHeader.hpp
index a03b0e2..9bbbc0f 100644
--- a/catalog/PartitionSchemeHeader.hpp
+++ b/catalog/PartitionSchemeHeader.hpp
@@ -22,11 +22,14 @@
#include <cstddef>
#include <memory>
+#include <random>
#include <utility>
#include <vector>
#include "catalog/Catalog.pb.h"
#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageConstants.hpp"
+#include "threading/SpinMutex.hpp"
#include "types/TypedValue.hpp"
#include "types/operations/comparisons/Comparison.hpp"
#include "types/operations/comparisons/EqualComparison.hpp"
@@ -57,8 +60,9 @@ class PartitionSchemeHeader {
// PartitionValues.size() should be equal to PartitionAttributeIds.size().
typedef std::vector<TypedValue> PartitionValues;
- enum PartitionType {
+ enum class PartitionType {
kHash = 0,
+ kRandom,
kRange
};
@@ -201,6 +205,43 @@ class HashPartitionSchemeHeader final : public PartitionSchemeHeader {
/**
* @brief Implementation of PartitionSchemeHeader that partitions the tuples in
+ * a relation randomly.
+**/
+class RandomPartitionSchemeHeader final : public PartitionSchemeHeader {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param num_partitions The number of partitions to be created.
+ **/
+ explicit RandomPartitionSchemeHeader(const std::size_t num_partitions)
+ : PartitionSchemeHeader(PartitionType::kRandom, num_partitions, {}),
+ mt_(1729),
+ dist_(0, num_partitions - 1) {
+ }
+
+ /**
+ * @brief Destructor.
+ **/
+ ~RandomPartitionSchemeHeader() override {}
+
+ partition_id getPartitionId(
+ const PartitionValues &value_of_attributes) const override {
+ SpinMutexLock lock(mutex_);
+ return dist_(mt_);
+ }
+
+ private:
+ mutable std::mt19937_64 mt_;
+ mutable std::uniform_int_distribution<partition_id> dist_;
+
+ alignas(kCacheLineBytes) mutable SpinMutex mutex_;
+
+ DISALLOW_COPY_AND_ASSIGN(RandomPartitionSchemeHeader);
+};
+
+/**
+ * @brief Implementation of PartitionSchemeHeader that partitions the tuples in
* a relation based on a given value range on the partitioning attribute.
**/
class RangePartitionSchemeHeader final : public PartitionSchemeHeader {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cbb84b4d/catalog/tests/PartitionScheme_unittest.cpp
----------------------------------------------------------------------
diff --git a/catalog/tests/PartitionScheme_unittest.cpp b/catalog/tests/PartitionScheme_unittest.cpp
index c4f7dec..190978f 100644
--- a/catalog/tests/PartitionScheme_unittest.cpp
+++ b/catalog/tests/PartitionScheme_unittest.cpp
@@ -590,6 +590,47 @@ TEST(PartitionSchemeTest, CheckHashPartitionSchemeSerialization) {
}
}
+TEST(PartitionSchemeTest, CheckRandomPartitionSchemeSerialization) {
+ const std::size_t num_partitions = 4;
+ std::unique_ptr<PartitionScheme> part_scheme(
+ new PartitionScheme(new RandomPartitionSchemeHeader(num_partitions)));
+ // Add some blocks to each partition.
+ for (int i = 0; i < 10; ++i) {
+ part_scheme->addBlockToPartition(i, i % num_partitions);
+ }
+ std::unique_ptr<PartitionScheme> part_scheme_from_proto;
+ part_scheme_from_proto.reset(
+ PartitionScheme::ReconstructFromProto(part_scheme->getProto()));
+
+ const PartitionSchemeHeader &header = part_scheme->getPartitionSchemeHeader();
+ const PartitionSchemeHeader &header_from_proto = part_scheme_from_proto->getPartitionSchemeHeader();
+
+ // Check the partition type
+ EXPECT_EQ(header.getPartitionType(),
+ header_from_proto.getPartitionType());
+ // Check number of partitions
+ EXPECT_EQ(header.getNumPartitions(),
+ header_from_proto.getNumPartitions());
+ // Check the partition attribute id
+ EXPECT_EQ(header.getPartitionAttributeIds(),
+ header_from_proto.getPartitionAttributeIds());
+ // Check the block in each partition
+ for (partition_id part_id = 0; part_id < num_partitions; ++part_id) {
+ // Collect the blocks from C++ Partition Scheme object.
+ std::vector<block_id> blocks_in_part_scheme =
+ part_scheme->getBlocksInPartition(part_id);
+ // Collect the blocks from Partition Scheme's protocol buffer.
+ std::vector<block_id> blocks_in_part_scheme_from_proto =
+ part_scheme_from_proto->getBlocksInPartition(part_id);
+ // Sort both these vector of block ids so that we can compare them.
+ std::sort(blocks_in_part_scheme.begin(), blocks_in_part_scheme.end());
+ std::sort(blocks_in_part_scheme_from_proto.begin(),
+ blocks_in_part_scheme_from_proto.end());
+ // Compare the two sorted lists to check if they are equal.
+ EXPECT_EQ(blocks_in_part_scheme, blocks_in_part_scheme_from_proto);
+ }
+}
+
// TODO(quickstep-team): Add back CheckRangePartitionSchemeSerialization test
// due to QUICKSTEP-86.