You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/22 18:56:31 UTC
[22/22] incubator-quickstep git commit: Merged with fuse-select-join
Merged with fuse-select-join
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e2f49814
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e2f49814
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e2f49814
Branch: refs/heads/LIP-for-tpch-merged
Commit: e2f498144e892105cf4ea742e8d7bca41cee2117
Parents: 109a12f
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Thu Aug 18 21:17:56 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Mon Aug 22 13:55:56 2016 -0500
----------------------------------------------------------------------
expressions/scalar/ScalarAttribute.cpp | 2 +-
query_optimizer/ExecutionGenerator.cpp | 2 +-
.../cost_model/StarSchemaSimpleCostModel.cpp | 4 +-
query_optimizer/physical/HashJoin.hpp | 4 +-
query_optimizer/physical/Selection.cpp | 15 +++-
query_optimizer/physical/Selection.hpp | 2 +
query_optimizer/rules/CMakeLists.txt | 3 +-
query_optimizer/rules/FuseJoinSelect.cpp | 9 +--
relational_operators/CMakeLists.txt | 1 +
relational_operators/HashJoinOperator.cpp | 42 +++++++---
relational_operators/HashJoinOperator.hpp | 8 ++
storage/AggregationOperationState.cpp | 3 +-
storage/CMakeLists.txt | 1 +
storage/CountedReference.hpp | 4 +-
storage/HashTable.hpp | 8 +-
storage/StorageBlock.cpp | 6 +-
storage/StorageManager.cpp | 55 +++-----------
storage/StorageManager.hpp | 14 ++--
utility/ConcurrentHashMap.hpp | 0
utility/EventProfiler.cpp | 2 +-
utility/EventProfiler.hpp | 2 +-
utility/PlanVisualizer.cpp | 4 +
utility/ShardedLockManager.hpp | 80 ++++++++++----------
23 files changed, 143 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/expressions/scalar/ScalarAttribute.cpp
----------------------------------------------------------------------
diff --git a/expressions/scalar/ScalarAttribute.cpp b/expressions/scalar/ScalarAttribute.cpp
index 08dc9dd..1a54d16 100644
--- a/expressions/scalar/ScalarAttribute.cpp
+++ b/expressions/scalar/ScalarAttribute.cpp
@@ -143,7 +143,7 @@ ColumnVector* ScalarAttribute::getAllValuesForJoin(
ValueAccessor *accessor = using_left_relation ? left_accessor
: right_accessor;
- return InvokeOnValueAccessorNotAdapter(
+ return InvokeOnAnyValueAccessor(
accessor,
[&joined_tuple_ids,
&attr_id,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index d2092c4..886ba75 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -693,7 +693,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
// Convert the left filter predicate proto.
QueryContext::predicate_id left_filter_predicate_index = QueryContext::kInvalidPredicateId;
- if (physical_plan->residual_predicate()) {
+ if (physical_plan->left_filter_predicate()) {
left_filter_predicate_index = query_context_proto_->predicates_size();
unique_ptr<const Predicate> left_filter_predicate(convertPredicate(physical_plan->left_filter_predicate()));
query_context_proto_->add_predicates()->CopyFrom(left_filter_predicate->getProto());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index ba7a3c6..d0205b9 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -253,11 +253,11 @@ double StarSchemaSimpleCostModel::estimateSelectivityForPredicate(
double unit_selectivity = 1.0 / it->second;
return comparison_expression->isEqualityComparisonPredicate()
? unit_selectivity
- : 0.5;
+ : 0.1;
}
}
- return comparison_expression->isEqualityComparisonPredicate() ? 0.1 : 0.5;
+ return 0.1;
}
case E::ExpressionType::kLogicalAnd: {
const E::LogicalAndPtr &logical_and =
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/query_optimizer/physical/HashJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.hpp b/query_optimizer/physical/HashJoin.hpp
index e24dbeb..32b4f21 100644
--- a/query_optimizer/physical/HashJoin.hpp
+++ b/query_optimizer/physical/HashJoin.hpp
@@ -121,8 +121,8 @@ class HashJoin : public BinaryJoin {
residual_predicate_,
project_expressions(),
join_type_,
- bloom_filter_config_,
- left_filter_predicate_);
+ left_filter_predicate_,
+ bloom_filter_config_);
}
std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/query_optimizer/physical/Selection.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Selection.cpp b/query_optimizer/physical/Selection.cpp
index 73af500..22323f4 100644
--- a/query_optimizer/physical/Selection.cpp
+++ b/query_optimizer/physical/Selection.cpp
@@ -20,6 +20,7 @@
#include "query_optimizer/physical/Selection.hpp"
#include <string>
+#include <unordered_set>
#include <vector>
#include "query_optimizer/OptimizerTree.hpp"
@@ -36,6 +37,19 @@ namespace physical {
namespace E = ::quickstep::optimizer::expressions;
+bool Selection::isSimpleSelection() const {
+ std::unordered_set<E::ExprId> input_attr_ids;
+ for (const auto &attr : input()->getOutputAttributes()) {
+ input_attr_ids.emplace(attr->id());
+ }
+ for (const auto &attr : getOutputAttributes()) {
+ if (input_attr_ids.find(attr->id()) == input_attr_ids.end()) {
+ return false;
+ }
+ }
+ return true;
+}
+
PhysicalPtr Selection::copyWithNewChildren(
const std::vector<PhysicalPtr> &new_children) const {
DCHECK_EQ(children().size(), new_children.size());
@@ -87,7 +101,6 @@ bool Selection::impliesUniqueAttributes(
return input()->impliesUniqueAttributes(attributes);
}
-
void Selection::getFieldStringItems(
std::vector<std::string> *inline_field_names,
std::vector<std::string> *inline_field_values,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/query_optimizer/physical/Selection.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Selection.hpp b/query_optimizer/physical/Selection.hpp
index bb50314..bc0f072 100644
--- a/query_optimizer/physical/Selection.hpp
+++ b/query_optimizer/physical/Selection.hpp
@@ -75,6 +75,8 @@ class Selection : public Physical {
*/
inline const PhysicalPtr& input() const { return children()[0]; }
+ bool isSimpleSelection() const;
+
PhysicalPtr copyWithNewChildren(
const std::vector<PhysicalPtr> &new_children) const override;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 9990a4d..b675bc6 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -74,7 +74,8 @@ target_link_libraries(quickstep_queryoptimizer_rules_FuseJoinSelect
quickstep_queryoptimizer_physical_Selection
quickstep_queryoptimizer_physical_TableReference
quickstep_queryoptimizer_rules_BottomUpRule
- quickstep_queryoptimizer_rules_Rule)
+ quickstep_queryoptimizer_rules_Rule
+ quickstep_utility_Macros)
target_link_libraries(quickstep_queryoptimizer_rules_GenerateJoins
glog
quickstep_queryoptimizer_expressions_AttributeReference
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/query_optimizer/rules/FuseJoinSelect.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseJoinSelect.cpp b/query_optimizer/rules/FuseJoinSelect.cpp
index e40acfc..87dbc95 100644
--- a/query_optimizer/rules/FuseJoinSelect.cpp
+++ b/query_optimizer/rules/FuseJoinSelect.cpp
@@ -16,21 +16,18 @@ namespace E = ::quickstep::optimizer::expressions;
P::PhysicalPtr FuseJoinSelect::applyToNode(const P::PhysicalPtr &input) {
P::HashJoinPtr hash_join;
P::SelectionPtr selection;
- P::TableReferencePtr table_reference;
if (P::SomeHashJoin::MatchesWithConditionalCast(input, &hash_join)
- && hash_join->join_type() == P::HashJoin::JoinType::kInnerJoin
&& P::SomeSelection::MatchesWithConditionalCast(hash_join->left(), &selection)
- && P::SomeTableReference::MatchesWithConditionalCast(selection->input(), &table_reference)) {
- const E::PredicatePtr filter_predicate = selection->filter_predicate();
- P::PhysicalPtr output = P::HashJoin::Create(table_reference,
+ && selection->isSimpleSelection()) {
+ P::PhysicalPtr output = P::HashJoin::Create(selection->input(),
hash_join->right(),
hash_join->left_join_attributes(),
hash_join->right_join_attributes(),
hash_join->residual_predicate(),
hash_join->project_expressions(),
hash_join->join_type(),
- filter_predicate);
+ selection->filter_predicate());
LOG_APPLYING_RULE(input, output);
return output;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 6d71794..af411ea 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -202,6 +202,7 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator
quickstep_types_TypedValue
quickstep_types_containers_ColumnVector
quickstep_types_containers_ColumnVectorsValueAccessor
+ quickstep_utility_EventProfiler
quickstep_utility_Macros
tmb)
target_link_libraries(quickstep_relationaloperators_InsertOperator
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index e95bb1c..70fbb1a 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -48,6 +48,7 @@
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/EventProfiler.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
@@ -216,7 +217,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
selection,
hash_table,
output_destination,
- storage_manager),
+ storage_manager,
+ getOperatorIndex()),
op_index_);
}
started_ = true;
@@ -237,7 +239,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
selection,
hash_table,
output_destination,
- storage_manager),
+ storage_manager,
+ getOperatorIndex()),
op_index_);
++num_workorders_generated_;
} // end while
@@ -436,6 +439,7 @@ void HashInnerJoinWorkOrder::execute() {
BlockReference probe_block(
storage_manager_->getBlock(block_id_, probe_relation_));
const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+// auto *container = simple_profiler.getContainer();
std::unique_ptr<ValueAccessor> probe_accessor(
probe_store.createValueAccessor(
@@ -443,7 +447,6 @@ void HashInnerJoinWorkOrder::execute() {
? nullptr
: probe_block->getMatchesForPredicate(left_filter_predicate_)));
-
MapBasedJoinedTupleCollector collector;
if (join_key_attributes_.size() == 1) {
hash_table_.getAllFromValueAccessor(
@@ -462,11 +465,23 @@ void HashInnerJoinWorkOrder::execute() {
const relation_id build_relation_id = build_relation_.getID();
const relation_id probe_relation_id = probe_relation_.getID();
+// auto *materialize_line = container->getEventLine("materialize");
+// auto *iterate_line = container->getEventLine("iterate_blocks");
+// auto *get_block_line = container->getEventLine("get_block");
+
+// materialize_line->emplace_back();
+// iterate_line->emplace_back();
ColumnVectorsValueAccessor temp_result;
for (std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>
&build_block_entry : *collector.getJoinedTuples()) {
+// iterate_line->back().endEvent();
+// iterate_line->back().setPayload(getOperatorIndex(), build_block_entry.second.size());
+
+// get_block_line->emplace_back();
BlockReference build_block =
storage_manager_->getBlock(build_block_entry.first, build_relation_);
+// get_block_line->back().endEvent();
+// get_block_line->back().setPayload(getOperatorIndex(), 0);
const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock();
std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor());
@@ -514,7 +529,6 @@ void HashInnerJoinWorkOrder::execute() {
// benefit (probably only a real performance win when there are very few
// matching tuples in each individual inner block but very many inner
// blocks with at least one match).
- //ColumnVectorsValueAccessor temp_result;
std::size_t i = 0;
for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection_.begin();
selection_cit != selection_.end();
@@ -527,13 +541,23 @@ void HashInnerJoinWorkOrder::execute() {
i);
}
- // NOTE(chasseur): calling the bulk-insert method of InsertDestination once
- // for each pair of joined blocks incurs some extra overhead that could be
- // avoided by keeping checked-out MutableBlockReferences across iterations
- // of this loop, but that would get messy when combined with partitioning.
- // output_destination_->bulkInsertTuples(&temp_result);
+// iterate_line->emplace_back();
}
+// iterate_line->back().endEvent();
+// iterate_line->back().setPayload(getOperatorIndex(), 0);
+
output_destination_->bulkInsertTuples(&temp_result);
+
+// materialize_line->back().endEvent();
+// materialize_line->back().setPayload(getOperatorIndex(), collector.getJoinedTuples()->size());
+
+ if (build_relation_id == 0 &&
+ probe_relation_id == 0 &&
+ residual_predicate_ == nullptr &&
+ output_destination_ == nullptr &&
+ selection_.empty()) {
+ return;
+ }
}
void HashSemiJoinWorkOrder::execute() {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 05e16a4..4f53daa 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -511,6 +511,10 @@ class HashSemiJoinWorkOrder : public WorkOrder {
void execute() override;
+ const Predicate *left_filter_predicate() const {
+ return left_filter_predicate_;
+ }
+
private:
void executeWithoutResidualPredicate();
@@ -645,6 +649,10 @@ class HashAntiJoinWorkOrder : public WorkOrder {
}
}
+ const Predicate *left_filter_predicate() const {
+ return left_filter_predicate_;
+ }
+
private:
void executeWithoutResidualPredicate();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index d85b5c4..1e8ddfd 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -460,7 +460,7 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left;
for (std::size_t i = 0; i < batch_size; ++i) {
accessor->next();
- batch.push_back(accessor->getCurrentPosition());
+ batch[i] = accessor->getCurrentPosition();
}
std::size_t num_hits =
@@ -469,7 +469,6 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
filtered->set(batch[t], true);
}
- batch.clear();
num_tuples_left -= batch_size;
batch_size_try = batch_size * 2;
} while (num_tuples_left > 0);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 11e7f40..fad66ef 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -994,6 +994,7 @@ target_link_libraries(quickstep_storage_StorageManager
quickstep_threading_SpinSharedMutex
quickstep_utility_Alignment
quickstep_utility_CalculateInstalledMemory
+ quickstep_utility_EventProfiler
quickstep_utility_Macros
quickstep_utility_ShardedLockManager
tmb)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/storage/CountedReference.hpp
----------------------------------------------------------------------
diff --git a/storage/CountedReference.hpp b/storage/CountedReference.hpp
index 2d9cec3..49d0f73 100644
--- a/storage/CountedReference.hpp
+++ b/storage/CountedReference.hpp
@@ -66,7 +66,7 @@ class CountedReference {
**/
CountedReference(T *block, EvictionPolicy *eviction_policy)
: block_(block), eviction_policy_(eviction_policy) {
- eviction_policy_->blockReferenced(block_->getID());
+// eviction_policy_->blockReferenced(block_->getID());
#ifdef QUICKSTEP_DEBUG
block_->ref();
#endif
@@ -111,7 +111,7 @@ class CountedReference {
#ifdef QUICKSTEP_DEBUG
block_->unref();
#endif
- eviction_policy_->blockUnreferenced(block_->getID());
+// eviction_policy_->blockUnreferenced(block_->getID());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index 9c50fc7..82205a1 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -2314,7 +2314,7 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left;
for (std::size_t i = 0; i < batch_size; ++i) {
accessor->next();
- batch.push_back(accessor->getCurrentPosition());
+ batch[i] = accessor->getCurrentPosition();
}
std::size_t num_hits =
@@ -2338,13 +2338,11 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
break;
}
}
- batch.clear();
+
num_tuples_left -= batch_size;
batch_size_try = batch_size * 2;
} while (!accessor->iterationFinished());
- }
-
- else { // no Bloom filters to probe
+ } else { // no Bloom filters to probe
while(accessor->next()) {
TypedValue key = accessor->getTypedValue(key_attr_id);
if (check_for_null_keys && key.isNull()) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index a115af9..4dc1fc7 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -389,7 +389,7 @@ void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left;
for (std::size_t i = 0; i < batch_size; ++i) {
accessor->next();
- batch.push_back(accessor->getCurrentPosition());
+ batch[i] = accessor->getCurrentPosition();
}
std::size_t num_hits =
@@ -398,7 +398,6 @@ void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
matches->set(batch[t], true);
}
- batch.clear();
num_tuples_left -= batch_size;
batch_size_try = batch_size * 2;
} while (num_tuples_left > 0);
@@ -465,7 +464,7 @@ void StorageBlock::selectSimple(const std::vector<attribute_id> &selection,
batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left;
for (std::size_t i = 0; i < batch_size; ++i) {
accessor->next();
- batch.push_back(accessor->getCurrentPosition());
+ batch[i] = accessor->getCurrentPosition();
}
std::size_t num_hits =
@@ -474,7 +473,6 @@ void StorageBlock::selectSimple(const std::vector<attribute_id> &selection,
matches->set(batch[t], true);
}
- batch.clear();
num_tuples_left -= batch_size;
batch_size_try = batch_size * 2;
} while (num_tuples_left > 0);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 56ca323..4fb9291 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -86,6 +86,7 @@
#include "storage/StorageErrors.hpp"
#include "threading/SpinSharedMutex.hpp"
#include "utility/Alignment.hpp"
+#include "utility/EventProfiler.hpp"
#include "utility/CalculateInstalledMemory.hpp"
#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS
@@ -689,7 +690,7 @@ void* StorageManager::allocateSlots(const std::size_t num_slots,
= MAP_PRIVATE | MAP_ANONYMOUS | MAP_ALIGNED_SUPER;
#endif
- makeRoomForBlockOrBlob(num_slots);
+// makeRoomForBlockOrBlob(num_slots);
void *slots = nullptr;
#if defined(QUICKSTEP_HAVE_MMAP_LINUX_HUGETLB) || defined(QUICKSTEP_HAVE_MMAP_BSD_SUPERPAGE)
@@ -784,7 +785,7 @@ MutableBlockReference StorageManager::getBlockInternal(
}
}
// To be safe, release the block's shard after 'eviction_lock' destructs.
- lock_manager_.release(block);
+// lock_manager_.release(block);
if (ret.valid()) {
return ret;
@@ -795,7 +796,7 @@ MutableBlockReference StorageManager::getBlockInternal(
// MutableBlockReference's constructor; this is because EvictionPolicy
// doesn't know about the block until blockReferenced is called, so
// chooseBlockToEvict shouldn't return the block.
- do {
+ {
SpinSharedMutexExclusiveLock<false> io_lock(*lock_manager_.get(block));
{
// Check one more time if the block got loaded in memory by someone else.
@@ -803,17 +804,14 @@ MutableBlockReference StorageManager::getBlockInternal(
std::unordered_map<block_id, BlockHandle>::iterator it = blocks_.find(block);
if (it != blocks_.end()) {
DEBUG_ASSERT(!it->second.block->isBlob());
- ret = MutableBlockReference(static_cast<StorageBlock*>(it->second.block), eviction_policy_.get());
- break;
+ return MutableBlockReference(static_cast<StorageBlock*>(it->second.block), eviction_policy_.get());
}
}
// No other thread loaded the block before us.
- ret = MutableBlockReference(loadBlock(block, relation, numa_node), eviction_policy_.get());
- } while (false);
+ return MutableBlockReference(loadBlock(block, relation, numa_node), eviction_policy_.get());
+ }
// To be safe, release the block's shard after 'io_lock' destructs.
- lock_manager_.release(block);
-
- return ret;
+// lock_manager_.release(block);
}
MutableBlobReference StorageManager::getBlobInternal(const block_id blob,
@@ -829,7 +827,7 @@ MutableBlobReference StorageManager::getBlobInternal(const block_id blob,
}
}
// To be safe, release the blob's shard after 'eviction_lock' destructs.
- lock_manager_.release(blob);
+// lock_manager_.release(blob);
if (ret.valid()) {
return ret;
@@ -855,7 +853,7 @@ MutableBlobReference StorageManager::getBlobInternal(const block_id blob,
ret = MutableBlobReference(loadBlob(blob, numa_node), eviction_policy_.get());
} while (false);
// To be safe, release the blob's shard after 'io_lock' destructs.
- lock_manager_.release(blob);
+// lock_manager_.release(blob);
return ret;
}
@@ -871,51 +869,20 @@ void StorageManager::makeRoomForBlockOrBlob(const size_t slots) {
break;
}
- bool has_collision = false;
- SpinSharedMutexExclusiveLock<false> eviction_lock(*lock_manager_.get(block_to_evict, &has_collision));
- if (has_collision) {
- // We have a collision in the shared lock manager, where some callers
- // of this function (i.e., getBlockInternal or getBlobInternal) has
- // acquired an exclusive lock, and we are trying to evict a block that
- // hashes to the same location. This will cause a deadlock.
-
- // For now simply treat this situation as the case where there is not
- // enough memory and we temporarily go over the memory limit.
- break;
- }
-
+ SpinSharedMutexExclusiveLock<false> eviction_lock(*lock_manager_.get(block_to_evict));
{
SpinSharedMutexSharedLock<false> read_lock(blocks_shared_mutex_);
if (blocks_.find(block_to_evict) == blocks_.end()) {
- // another thread must have jumped in and evicted it before us
-
- // NOTE(zuyu): It is ok to release the shard for a block or blob,
- // before 'eviction_lock' destructs, because we will never encounter a
- // self-deadlock in a single thread, and in multiple-thread case some
- // thread will block but not deadlock if there is a shard collision.
- lock_manager_.release(block_to_evict);
continue;
}
}
if (eviction_policy_->getRefCount(block_to_evict) > 0) {
// Someone sneaked in and referenced the block before we could evict it.
-
- // NOTE(zuyu): It is ok to release the shard for a block or blob, before
- // before 'eviction_lock' destructs, because we will never encounter a
- // self-deadlock in a single thread, and in multiple-thread case some
- // thread will block but not deadlock if there is a shard collision.
- lock_manager_.release(block_to_evict);
continue;
}
if (saveBlockOrBlob(block_to_evict)) {
evictBlockOrBlob(block_to_evict);
} // else : Someone sneaked in and evicted the block before we could.
-
- // NOTE(zuyu): It is ok to release the shard for a block or blob, before
- // before 'eviction_lock' destructs, because we will never encounter a
- // self-deadlock in a single thread, and in multiple-thread case some
- // thread will block but not deadlock if there is a shard collision.
- lock_manager_.release(block_to_evict);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/storage/StorageManager.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 066953b..da9a1d2 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -96,9 +96,10 @@ class StorageManager {
: StorageManager(storage_path,
FLAGS_block_domain,
FLAGS_buffer_pool_slots,
- LRUKEvictionPolicyFactory::ConstructLRUKEvictionPolicy(
- 2,
- std::chrono::milliseconds(200))) {
+// LRUKEvictionPolicyFactory::ConstructLRUKEvictionPolicy(
+// 2,
+// std::chrono::milliseconds(200))) {
+ new EvictAnyBlockEvictionPolicy()) {
}
/**
@@ -122,9 +123,10 @@ class StorageManager {
: StorageManager(storage_path,
FLAGS_block_domain,
max_memory_usage,
- LRUKEvictionPolicyFactory::ConstructLRUKEvictionPolicy(
- 2,
- std::chrono::milliseconds(200))) {
+// LRUKEvictionPolicyFactory::ConstructLRUKEvictionPolicy(
+// 2,
+// std::chrono::milliseconds(200))) {
+ new EvictAnyBlockEvictionPolicy()) {
}
#ifdef QUICKSTEP_DISTRIBUTED
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/utility/ConcurrentHashMap.hpp
----------------------------------------------------------------------
diff --git a/utility/ConcurrentHashMap.hpp b/utility/ConcurrentHashMap.hpp
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/utility/EventProfiler.cpp
----------------------------------------------------------------------
diff --git a/utility/EventProfiler.cpp b/utility/EventProfiler.cpp
index e345993..9ab917b 100644
--- a/utility/EventProfiler.cpp
+++ b/utility/EventProfiler.cpp
@@ -23,7 +23,7 @@
namespace quickstep {
-EventProfiler<int, std::size_t, std::size_t> simple_profiler;
+EventProfiler<std::string, std::size_t, std::size_t> simple_profiler;
EventProfiler<std::size_t> relop_profiler;
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/utility/EventProfiler.hpp
----------------------------------------------------------------------
diff --git a/utility/EventProfiler.hpp b/utility/EventProfiler.hpp
index f7fa598..1cbd830 100644
--- a/utility/EventProfiler.hpp
+++ b/utility/EventProfiler.hpp
@@ -178,7 +178,7 @@ class EventProfiler {
Mutex mutex_;
};
-extern EventProfiler<int, std::size_t, std::size_t> simple_profiler;
+extern EventProfiler<std::string, std::size_t, std::size_t> simple_profiler;
extern EventProfiler<std::size_t> relop_profiler;
/** @} */
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/utility/PlanVisualizer.cpp
----------------------------------------------------------------------
diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp
index e4df69c..0e662b9 100644
--- a/utility/PlanVisualizer.cpp
+++ b/utility/PlanVisualizer.cpp
@@ -189,6 +189,10 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
node_info.labels.emplace_back("RIGHT join attrs unique");
}
+ if (hash_join->left_filter_predicate()) {
+ node_info.labels.emplace_back("has left filter predicate");
+ }
+
const auto &bf_config = hash_join->bloom_filter_config();
for (const auto &bf : bf_config.build_side_bloom_filters) {
node_info.labels.emplace_back(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e2f49814/utility/ShardedLockManager.hpp
----------------------------------------------------------------------
diff --git a/utility/ShardedLockManager.hpp b/utility/ShardedLockManager.hpp
index 520f879..e7a5273 100644
--- a/utility/ShardedLockManager.hpp
+++ b/utility/ShardedLockManager.hpp
@@ -63,36 +63,36 @@ class ShardedLockManager {
* @return The corresponding SharedMutex if there is no collision; otherwise,
* the collision SharedMutex.
*/
- SharedMutexT* get(const T key, bool *has_collision = nullptr) {
+ inline SharedMutexT* get(const T key) {
const std::size_t shard = hash_(key) % N;
-
- if (has_collision != nullptr) {
- // In StorageManager::makeRoomForBlock, check whether the evicting block
- // or blob has a shard collision with existing referenced shards.
- SpinSharedMutexSharedLock<false> read_lock(shard_count_mutex_);
- if (shard_count_.find(shard) != shard_count_.end()) {
- *has_collision = true;
- return &collision_mutex_;
- }
- }
-
- {
- SpinSharedMutexExclusiveLock<false> write_lock(shard_count_mutex_);
-
- // Check one more time for the evicting block or blob if there is a shard
- // collision.
- auto it = shard_count_.find(shard);
- if (it != shard_count_.end()) {
- if (has_collision != nullptr) {
- *has_collision = true;
- return &collision_mutex_;
- }
-
- ++it->second;
- } else {
- shard_count_.emplace(shard, 1);
- }
- }
+//
+// if (has_collision != nullptr) {
+// // In StorageManager::makeRoomForBlock, check whether the evicting block
+// // or blob has a shard collision with existing referenced shards.
+// SpinSharedMutexSharedLock<false> read_lock(shard_count_mutex_);
+// if (shard_count_.find(shard) != shard_count_.end()) {
+// *has_collision = true;
+// return &collision_mutex_;
+// }
+// }
+//
+// {
+// SpinSharedMutexExclusiveLock<false> write_lock(shard_count_mutex_);
+//
+// // Check one more time for the evicting block or blob if there is a shard
+// // collision.
+// auto it = shard_count_.find(shard);
+// if (it != shard_count_.end()) {
+// if (has_collision != nullptr) {
+// *has_collision = true;
+// return &collision_mutex_;
+// }
+//
+// ++it->second;
+// } else {
+// shard_count_.emplace(shard, 1);
+// }
+// }
return &sharded_mutexes_[shard];
}
@@ -100,15 +100,15 @@ class ShardedLockManager {
* @brief Release the shard corresponding to the provided key.
* @param key The key to compute the shard.
*/
- void release(const T key) {
- SpinSharedMutexExclusiveLock<false> write_lock(shard_count_mutex_);
- auto it = shard_count_.find(hash_(key) % N);
- DCHECK(it != shard_count_.end());
-
- if (--it->second == 0) {
- shard_count_.erase(it);
- }
- }
+// void release(const T key) {
+// SpinSharedMutexExclusiveLock<false> write_lock(shard_count_mutex_);
+// auto it = shard_count_.find(hash_(key) % N);
+// DCHECK(it != shard_count_.end());
+//
+// if (--it->second == 0) {
+// shard_count_.erase(it);
+// }
+// }
private:
std::hash<T> hash_;
@@ -120,8 +120,8 @@ class ShardedLockManager {
// Count all shards referenced by StorageManager in multiple threads.
// The key is the shard, while the value is the count. If the count equals to
// zero, we delete the shard entry.
- std::unordered_map<std::size_t, std::size_t> shard_count_;
- alignas(kCacheLineBytes) mutable SpinSharedMutex<false> shard_count_mutex_;
+// std::unordered_map<std::size_t, std::size_t> shard_count_;
+// alignas(kCacheLineBytes) mutable SpinSharedMutex<false> shard_count_mutex_;
DISALLOW_COPY_AND_ASSIGN(ShardedLockManager);
};