You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2018/05/16 19:44:34 UTC
[2/3] incubator-quickstep git commit: Updates for grail
Updates for grail
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a0024a2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a0024a2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a0024a2e
Branch: refs/heads/trace
Commit: a0024a2e9ec76d14013fcdee0e8d714d8776c1a3
Parents: 56e0a4e
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Tue May 8 17:53:57 2018 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Tue May 8 17:53:57 2018 -0500
----------------------------------------------------------------------
.../aggregation/AggregationHandleMax.hpp | 2 +
.../aggregation/AggregationHandleMin.hpp | 2 +
query_execution/WorkOrdersContainer.hpp | 8 ++-
query_optimizer/ExecutionGenerator.cpp | 6 +--
.../cost_model/StarSchemaSimpleCostModel.cpp | 6 +--
relational_operators/UnionAllOperator.cpp | 6 ++-
storage/AggregationOperationState.cpp | 2 +-
storage/PackedPayloadHashTable.cpp | 8 +--
storage/StorageManager.cpp | 42 ++--------------
utility/ShardedLockManager.hpp | 52 +-------------------
10 files changed, 29 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index 8f8c0d8..d6fdd12 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -191,6 +191,7 @@ class AggregationHandleMax : public AggregationConcreteHandle {
if (state->max_.isNull() ||
fast_comparator_->compareTypedValues(value, state->max_)) {
state->max_ = value;
+ state->max_.ensureNotReference();
}
}
@@ -200,6 +201,7 @@ class AggregationHandleMax : public AggregationConcreteHandle {
if (max_ptr->isNull() ||
fast_comparator_->compareTypedValues(value, *max_ptr)) {
*max_ptr = value;
+ max_ptr->ensureNotReference();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index 0e62be5..441af52 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -191,6 +191,7 @@ class AggregationHandleMin : public AggregationConcreteHandle {
if (state->min_.isNull() ||
fast_comparator_->compareTypedValues(value, state->min_)) {
state->min_ = value;
+ state->min_.ensureNotReference();
}
}
@@ -200,6 +201,7 @@ class AggregationHandleMin : public AggregationConcreteHandle {
if (min_ptr->isNull() ||
fast_comparator_->compareTypedValues(value, *min_ptr)) {
*min_ptr = value;
+ min_ptr->ensureNotReference();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/query_execution/WorkOrdersContainer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkOrdersContainer.hpp b/query_execution/WorkOrdersContainer.hpp
index 431a270..55ec185 100644
--- a/query_execution/WorkOrdersContainer.hpp
+++ b/query_execution/WorkOrdersContainer.hpp
@@ -362,7 +362,13 @@ class WorkOrdersContainer {
return nullptr;
}
- std::cerr << "# work orders: " << workorders_.size() << " \r";
+ /*
+ const std::size_t num_remaining_workorders = workorders_.size();
+ if (num_remaining_workorders % 1000 == 0) {
+ std::cerr << "# work orders: " << num_remaining_workorders
+ << " \r";
+ }
+ */
WorkOrder *work_order = workorders_.front().release();
workorders_.pop();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 8ca367b..75f15ac 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -190,7 +190,7 @@ static const volatile bool num_aggregation_partitions_dummy
= gflags::RegisterFlagValidator(&FLAGS_num_aggregation_partitions, &ValidateNumAggregationPartitions);
DEFINE_uint64(partition_aggregation_num_groups_threshold,
- 100000,
+ 10000,
"The threshold used for deciding whether the aggregation is done "
"in a partitioned way or not");
@@ -204,10 +204,6 @@ bool CheckAggregatePartitioned(const std::size_t num_aggregate_functions,
const std::vector<bool> &is_distincts,
const std::vector<attribute_id> &group_by_attrs,
const std::size_t estimated_num_groups) {
- // If there's no aggregation, return false.
- if (num_aggregate_functions == 0) {
- return false;
- }
// If there is only only aggregate function, we allow distinct aggregation.
// Otherwise it can't be partitioned with distinct aggregation.
if (num_aggregate_functions > 1) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index 729a563..2f4902f 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -170,7 +170,7 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForFilterJoin(
std::size_t left_cardinality = estimateCardinality(physical_plan->left());
double right_selectivity = estimateSelectivity(physical_plan->right());
return static_cast<std::size_t>(
- left_cardinality * build_side_filter_selectivity * right_selectivity + 0.5);
+ left_cardinality * build_side_filter_selectivity * std::min(right_selectivity, 0.01) + 0.5);
}
std::size_t StarSchemaSimpleCostModel::estimateCardinalityForHashJoin(
@@ -179,7 +179,7 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForHashJoin(
std::size_t right_cardinality = estimateCardinality(physical_plan->right());
double left_selectivity = estimateSelectivity(physical_plan->left());
double right_selectivity = estimateSelectivity(physical_plan->right());
- return std::max(static_cast<std::size_t>(left_cardinality * right_selectivity + 0.5),
+ return std::min(static_cast<std::size_t>(left_cardinality * right_selectivity + 0.5),
static_cast<std::size_t>(right_cardinality * left_selectivity + 0.5));
}
@@ -344,7 +344,7 @@ double StarSchemaSimpleCostModel::estimateSelectivity(
double build_side_filter_selectivity =
estimateSelectivityForPredicate(filter_join->build_side_filter_predicate(),
filter_join->right());
- return left_selectivity * right_selectivity * build_side_filter_selectivity;
+ return std::min(left_selectivity * right_selectivity, 0.01) * build_side_filter_selectivity;
}
case P::PhysicalType::kHashJoin: {
const P::HashJoinPtr &hash_join =
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/relational_operators/UnionAllOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UnionAllOperator.cpp b/relational_operators/UnionAllOperator.cpp
index 1d82aef..bffacb6 100644
--- a/relational_operators/UnionAllOperator.cpp
+++ b/relational_operators/UnionAllOperator.cpp
@@ -40,8 +40,10 @@ namespace quickstep {
void UnionAllOperator::feedInputBlock(const block_id input_block_id,
const relation_id input_relation_id,
const partition_id part_id) {
- std::size_t index = relation_id_to_index_.at(input_relation_id);
- input_relations_block_ids_[index].push_back(input_block_id);
+ const auto it = relation_id_to_index_.find(input_relation_id);
+ if (it != relation_id_to_index_.end()) {
+ input_relations_block_ids_[it->second].push_back(input_block_id);
+ }
}
void UnionAllOperator::doneFeedingInputBlocks(const relation_id rel_id) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 92798d8..b88290b 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -113,7 +113,7 @@ AggregationOperationState::AggregationOperationState(
}
}
- std::vector<AggregationHandle *> group_by_handles;
+ std::vector<AggregationHandle*> group_by_handles;
// Set up each individual aggregate in this operation.
std::vector<const AggregateFunction *>::const_iterator agg_func_it =
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/storage/PackedPayloadHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp
index 1df20d0..c76869c 100644
--- a/storage/PackedPayloadHashTable.cpp
+++ b/storage/PackedPayloadHashTable.cpp
@@ -440,10 +440,10 @@ void PackedPayloadHashTable::resize(const std::size_t extra_buckets,
// Copy over variable-length key components, if any.
if (original_variable_storage_used > 0) {
- DEBUG_ASSERT(original_variable_storage_used ==
- key_manager_.getNextVariableLengthKeyOffset());
- DEBUG_ASSERT(original_variable_storage_used <=
- resized_variable_length_key_storage_size);
+// DEBUG_ASSERT(original_variable_storage_used ==
+// key_manager_.getNextVariableLengthKeyOffset());
+// DEBUG_ASSERT(original_variable_storage_used <=
+// resized_variable_length_key_storage_size);
std::memcpy(resized_variable_length_key_storage,
key_manager_.getVariableLengthKeyStorage(),
original_variable_storage_used);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 5924607..006d6a7 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -816,13 +816,13 @@ MutableBlockReference StorageManager::getBlockInternal(
ret = MutableBlockReference(static_cast<StorageBlock*>(it->second.block), eviction_policy_.get());
}
}
- // To be safe, release the block's shard after 'eviction_lock' destructs.
- lock_manager_.release(block);
if (ret.valid()) {
return ret;
}
+ makeRoomForBlockOrBlob(0);
+
// Note that there is no way for the block to be evicted between the call to
// loadBlock and the call to EvictionPolicy::blockReferenced from
// MutableBlockReference's constructor; this is because EvictionPolicy
@@ -843,8 +843,6 @@ MutableBlockReference StorageManager::getBlockInternal(
// No other thread loaded the block before us.
ret = MutableBlockReference(loadBlock(block, relation, numa_node), eviction_policy_.get());
} while (false);
- // To be safe, release the block's shard after 'io_lock' destructs.
- lock_manager_.release(block);
return ret;
}
@@ -861,8 +859,6 @@ MutableBlobReference StorageManager::getBlobInternal(const block_id blob,
ret = MutableBlobReference(static_cast<StorageBlob*>(it->second.block), eviction_policy_.get());
}
}
- // To be safe, release the blob's shard after 'eviction_lock' destructs.
- lock_manager_.release(blob);
if (ret.valid()) {
return ret;
@@ -887,8 +883,6 @@ MutableBlobReference StorageManager::getBlobInternal(const block_id blob,
// No other thread loaded the blob before us.
ret = MutableBlobReference(loadBlob(blob, numa_node), eviction_policy_.get());
} while (false);
- // To be safe, release the blob's shard after 'io_lock' destructs.
- lock_manager_.release(blob);
return ret;
}
@@ -904,51 +898,21 @@ void StorageManager::makeRoomForBlockOrBlob(const size_t slots) {
break;
}
- bool has_collision = false;
- SpinSharedMutexExclusiveLock<false> eviction_lock(*lock_manager_.get(block_to_evict, &has_collision));
- if (has_collision) {
- // We have a collision in the shared lock manager, where some callers
- // of this function (i.e., getBlockInternal or getBlobInternal) has
- // acquired an exclusive lock, and we are trying to evict a block that
- // hashes to the same location. This will cause a deadlock.
-
- // For now simply treat this situation as the case where there is not
- // enough memory and we temporarily go over the memory limit.
- break;
- }
-
+ SpinSharedMutexExclusiveLock<false> eviction_lock(*lock_manager_.get(block_to_evict));
{
SpinSharedMutexSharedLock<false> read_lock(blocks_shared_mutex_);
if (blocks_.find(block_to_evict) == blocks_.end()) {
// another thread must have jumped in and evicted it before us
-
- // NOTE(zuyu): It is ok to release the shard for a block or blob,
- // before 'eviction_lock' destructs, because we will never encounter a
- // self-deadlock in a single thread, and in multiple-thread case some
- // thread will block but not deadlock if there is a shard collision.
- lock_manager_.release(block_to_evict);
continue;
}
}
if (eviction_policy_->getRefCount(block_to_evict) > 0) {
// Someone sneaked in and referenced the block before we could evict it.
-
- // NOTE(zuyu): It is ok to release the shard for a block or blob, before
- // before 'eviction_lock' destructs, because we will never encounter a
- // self-deadlock in a single thread, and in multiple-thread case some
- // thread will block but not deadlock if there is a shard collision.
- lock_manager_.release(block_to_evict);
continue;
}
if (saveBlockOrBlob(block_to_evict)) {
evictBlockOrBlob(block_to_evict);
} // else : Someone sneaked in and evicted the block before we could.
-
- // NOTE(zuyu): It is ok to release the shard for a block or blob, before
- // before 'eviction_lock' destructs, because we will never encounter a
- // self-deadlock in a single thread, and in multiple-thread case some
- // thread will block but not deadlock if there is a shard collision.
- lock_manager_.release(block_to_evict);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/utility/ShardedLockManager.hpp
----------------------------------------------------------------------
diff --git a/utility/ShardedLockManager.hpp b/utility/ShardedLockManager.hpp
index 520f879..03f3e3c 100644
--- a/utility/ShardedLockManager.hpp
+++ b/utility/ShardedLockManager.hpp
@@ -58,65 +58,17 @@ class ShardedLockManager {
/**
* @brief Get the SharedMutex corresponding to the provided key.
* @param key The key to map to a SharedMutex.
- * @param has_collision Whether accessing the given key would result in a
- * hash collision. Used in StorageManager::makeRoomForBlock only.
- * @return The corresponding SharedMutex if there is no collision; otherwise,
- * the collision SharedMutex.
+ * @return The corresponding SharedMutex.
*/
- SharedMutexT* get(const T key, bool *has_collision = nullptr) {
+ SharedMutexT* get(const T key) {
const std::size_t shard = hash_(key) % N;
-
- if (has_collision != nullptr) {
- // In StorageManager::makeRoomForBlock, check whether the evicting block
- // or blob has a shard collision with existing referenced shards.
- SpinSharedMutexSharedLock<false> read_lock(shard_count_mutex_);
- if (shard_count_.find(shard) != shard_count_.end()) {
- *has_collision = true;
- return &collision_mutex_;
- }
- }
-
- {
- SpinSharedMutexExclusiveLock<false> write_lock(shard_count_mutex_);
-
- // Check one more time for the evicting block or blob if there is a shard
- // collision.
- auto it = shard_count_.find(shard);
- if (it != shard_count_.end()) {
- if (has_collision != nullptr) {
- *has_collision = true;
- return &collision_mutex_;
- }
-
- ++it->second;
- } else {
- shard_count_.emplace(shard, 1);
- }
- }
return &sharded_mutexes_[shard];
}
- /**
- * @brief Release the shard corresponding to the provided key.
- * @param key The key to compute the shard.
- */
- void release(const T key) {
- SpinSharedMutexExclusiveLock<false> write_lock(shard_count_mutex_);
- auto it = shard_count_.find(hash_(key) % N);
- DCHECK(it != shard_count_.end());
-
- if (--it->second == 0) {
- shard_count_.erase(it);
- }
- }
-
private:
std::hash<T> hash_;
std::array<SharedMutexT, N> sharded_mutexes_;
- // The placeholder mutex used whenever there is a hash collision.
- SharedMutexT collision_mutex_;
-
// Count all shards referenced by StorageManager in multiple threads.
// The key is the shard, while the value is the count. If the count equals to
// zero, we delete the shard entry.