You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2018/05/16 19:44:34 UTC

[2/3] incubator-quickstep git commit: Updates for grail

Updates for grail


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a0024a2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a0024a2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a0024a2e

Branch: refs/heads/trace
Commit: a0024a2e9ec76d14013fcdee0e8d714d8776c1a3
Parents: 56e0a4e
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Tue May 8 17:53:57 2018 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Tue May 8 17:53:57 2018 -0500

----------------------------------------------------------------------
 .../aggregation/AggregationHandleMax.hpp        |  2 +
 .../aggregation/AggregationHandleMin.hpp        |  2 +
 query_execution/WorkOrdersContainer.hpp         |  8 ++-
 query_optimizer/ExecutionGenerator.cpp          |  6 +--
 .../cost_model/StarSchemaSimpleCostModel.cpp    |  6 +--
 relational_operators/UnionAllOperator.cpp       |  6 ++-
 storage/AggregationOperationState.cpp           |  2 +-
 storage/PackedPayloadHashTable.cpp              |  8 +--
 storage/StorageManager.cpp                      | 42 ++--------------
 utility/ShardedLockManager.hpp                  | 52 +-------------------
 10 files changed, 29 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index 8f8c0d8..d6fdd12 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -191,6 +191,7 @@ class AggregationHandleMax : public AggregationConcreteHandle {
     if (state->max_.isNull() ||
         fast_comparator_->compareTypedValues(value, state->max_)) {
       state->max_ = value;
+      state->max_.ensureNotReference();
     }
   }
 
@@ -200,6 +201,7 @@ class AggregationHandleMax : public AggregationConcreteHandle {
     if (max_ptr->isNull() ||
         fast_comparator_->compareTypedValues(value, *max_ptr)) {
       *max_ptr = value;
+      max_ptr->ensureNotReference();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index 0e62be5..441af52 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -191,6 +191,7 @@ class AggregationHandleMin : public AggregationConcreteHandle {
     if (state->min_.isNull() ||
         fast_comparator_->compareTypedValues(value, state->min_)) {
       state->min_ = value;
+      state->min_.ensureNotReference();
     }
   }
 
@@ -200,6 +201,7 @@ class AggregationHandleMin : public AggregationConcreteHandle {
     if (min_ptr->isNull() ||
         fast_comparator_->compareTypedValues(value, *min_ptr)) {
       *min_ptr = value;
+      min_ptr->ensureNotReference();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/query_execution/WorkOrdersContainer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/WorkOrdersContainer.hpp b/query_execution/WorkOrdersContainer.hpp
index 431a270..55ec185 100644
--- a/query_execution/WorkOrdersContainer.hpp
+++ b/query_execution/WorkOrdersContainer.hpp
@@ -362,7 +362,13 @@ class WorkOrdersContainer {
         return nullptr;
       }
 
-      std::cerr << "# work orders: " << workorders_.size() << "        \r";
+      /*
+      const std::size_t num_remaining_workorders = workorders_.size();
+      if (num_remaining_workorders % 1000 == 0) {
+        std::cerr << "# work orders: " << num_remaining_workorders
+                  << "        \r";
+      }
+      */
 
       WorkOrder *work_order = workorders_.front().release();
       workorders_.pop();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 8ca367b..75f15ac 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -190,7 +190,7 @@ static const volatile bool num_aggregation_partitions_dummy
     = gflags::RegisterFlagValidator(&FLAGS_num_aggregation_partitions, &ValidateNumAggregationPartitions);
 
 DEFINE_uint64(partition_aggregation_num_groups_threshold,
-              100000,
+              10000,
               "The threshold used for deciding whether the aggregation is done "
               "in a partitioned way or not");
 
@@ -204,10 +204,6 @@ bool CheckAggregatePartitioned(const std::size_t num_aggregate_functions,
                                const std::vector<bool> &is_distincts,
                                const std::vector<attribute_id> &group_by_attrs,
                                const std::size_t estimated_num_groups) {
-  // If there's no aggregation, return false.
-  if (num_aggregate_functions == 0) {
-    return false;
-  }
   // If there is only only aggregate function, we allow distinct aggregation.
   // Otherwise it can't be partitioned with distinct aggregation.
   if (num_aggregate_functions > 1) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index 729a563..2f4902f 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -170,7 +170,7 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForFilterJoin(
   std::size_t left_cardinality = estimateCardinality(physical_plan->left());
   double right_selectivity = estimateSelectivity(physical_plan->right());
   return static_cast<std::size_t>(
-      left_cardinality * build_side_filter_selectivity * right_selectivity + 0.5);
+      left_cardinality * build_side_filter_selectivity * std::min(right_selectivity, 0.01) + 0.5);
 }
 
 std::size_t StarSchemaSimpleCostModel::estimateCardinalityForHashJoin(
@@ -179,7 +179,7 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForHashJoin(
   std::size_t right_cardinality = estimateCardinality(physical_plan->right());
   double left_selectivity = estimateSelectivity(physical_plan->left());
   double right_selectivity = estimateSelectivity(physical_plan->right());
-  return std::max(static_cast<std::size_t>(left_cardinality * right_selectivity + 0.5),
+  return std::min(static_cast<std::size_t>(left_cardinality * right_selectivity + 0.5),
                   static_cast<std::size_t>(right_cardinality * left_selectivity + 0.5));
 }
 
@@ -344,7 +344,7 @@ double StarSchemaSimpleCostModel::estimateSelectivity(
       double build_side_filter_selectivity =
           estimateSelectivityForPredicate(filter_join->build_side_filter_predicate(),
                                           filter_join->right());
-      return left_selectivity * right_selectivity * build_side_filter_selectivity;
+      return std::min(left_selectivity * right_selectivity, 0.01) * build_side_filter_selectivity;
     }
     case P::PhysicalType::kHashJoin: {
       const P::HashJoinPtr &hash_join =

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/relational_operators/UnionAllOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UnionAllOperator.cpp b/relational_operators/UnionAllOperator.cpp
index 1d82aef..bffacb6 100644
--- a/relational_operators/UnionAllOperator.cpp
+++ b/relational_operators/UnionAllOperator.cpp
@@ -40,8 +40,10 @@ namespace quickstep {
 void UnionAllOperator::feedInputBlock(const block_id input_block_id,
                                       const relation_id input_relation_id,
                                       const partition_id part_id) {
-  std::size_t index = relation_id_to_index_.at(input_relation_id);
-  input_relations_block_ids_[index].push_back(input_block_id);
+  const auto it = relation_id_to_index_.find(input_relation_id);
+  if (it != relation_id_to_index_.end()) {
+    input_relations_block_ids_[it->second].push_back(input_block_id);
+  }
 }
 
 void UnionAllOperator::doneFeedingInputBlocks(const relation_id rel_id) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 92798d8..b88290b 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -113,7 +113,7 @@ AggregationOperationState::AggregationOperationState(
     }
   }
 
-  std::vector<AggregationHandle *> group_by_handles;
+  std::vector<AggregationHandle*> group_by_handles;
 
   // Set up each individual aggregate in this operation.
   std::vector<const AggregateFunction *>::const_iterator agg_func_it =

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/storage/PackedPayloadHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp
index 1df20d0..c76869c 100644
--- a/storage/PackedPayloadHashTable.cpp
+++ b/storage/PackedPayloadHashTable.cpp
@@ -440,10 +440,10 @@ void PackedPayloadHashTable::resize(const std::size_t extra_buckets,
 
   // Copy over variable-length key components, if any.
   if (original_variable_storage_used > 0) {
-    DEBUG_ASSERT(original_variable_storage_used ==
-                 key_manager_.getNextVariableLengthKeyOffset());
-    DEBUG_ASSERT(original_variable_storage_used <=
-                 resized_variable_length_key_storage_size);
+//    DEBUG_ASSERT(original_variable_storage_used ==
+//                 key_manager_.getNextVariableLengthKeyOffset());
+//    DEBUG_ASSERT(original_variable_storage_used <=
+//                 resized_variable_length_key_storage_size);
     std::memcpy(resized_variable_length_key_storage,
                 key_manager_.getVariableLengthKeyStorage(),
                 original_variable_storage_used);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/storage/StorageManager.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 5924607..006d6a7 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -816,13 +816,13 @@ MutableBlockReference StorageManager::getBlockInternal(
       ret = MutableBlockReference(static_cast<StorageBlock*>(it->second.block), eviction_policy_.get());
     }
   }
-  // To be safe, release the block's shard after 'eviction_lock' destructs.
-  lock_manager_.release(block);
 
   if (ret.valid()) {
     return ret;
   }
 
+  makeRoomForBlockOrBlob(0);
+
   // Note that there is no way for the block to be evicted between the call to
   // loadBlock and the call to EvictionPolicy::blockReferenced from
   // MutableBlockReference's constructor; this is because EvictionPolicy
@@ -843,8 +843,6 @@ MutableBlockReference StorageManager::getBlockInternal(
     // No other thread loaded the block before us.
     ret = MutableBlockReference(loadBlock(block, relation, numa_node), eviction_policy_.get());
   } while (false);
-  // To be safe, release the block's shard after 'io_lock' destructs.
-  lock_manager_.release(block);
 
   return ret;
 }
@@ -861,8 +859,6 @@ MutableBlobReference StorageManager::getBlobInternal(const block_id blob,
       ret = MutableBlobReference(static_cast<StorageBlob*>(it->second.block), eviction_policy_.get());
     }
   }
-  // To be safe, release the blob's shard after 'eviction_lock' destructs.
-  lock_manager_.release(blob);
 
   if (ret.valid()) {
     return ret;
@@ -887,8 +883,6 @@ MutableBlobReference StorageManager::getBlobInternal(const block_id blob,
     // No other thread loaded the blob before us.
     ret = MutableBlobReference(loadBlob(blob, numa_node), eviction_policy_.get());
   } while (false);
-  // To be safe, release the blob's shard after 'io_lock' destructs.
-  lock_manager_.release(blob);
 
   return ret;
 }
@@ -904,51 +898,21 @@ void StorageManager::makeRoomForBlockOrBlob(const size_t slots) {
       break;
     }
 
-    bool has_collision = false;
-    SpinSharedMutexExclusiveLock<false> eviction_lock(*lock_manager_.get(block_to_evict, &has_collision));
-    if (has_collision) {
-      // We have a collision in the shared lock manager, where some callers
-      // of this function (i.e., getBlockInternal or getBlobInternal) has
-      // acquired an exclusive lock, and we are trying to evict a block that
-      // hashes to the same location. This will cause a deadlock.
-
-      // For now simply treat this situation as the case where there is not
-      // enough memory and we temporarily go over the memory limit.
-      break;
-    }
-
+    SpinSharedMutexExclusiveLock<false> eviction_lock(*lock_manager_.get(block_to_evict));
     {
       SpinSharedMutexSharedLock<false> read_lock(blocks_shared_mutex_);
       if (blocks_.find(block_to_evict) == blocks_.end()) {
         // another thread must have jumped in and evicted it before us
-
-        // NOTE(zuyu): It is ok to release the shard for a block or blob,
-        // before 'eviction_lock' destructs, because we will never encounter a
-        // self-deadlock in a single thread, and in multiple-thread case some
-        // thread will block but not deadlock if there is a shard collision.
-        lock_manager_.release(block_to_evict);
         continue;
       }
     }
     if (eviction_policy_->getRefCount(block_to_evict) > 0) {
       // Someone sneaked in and referenced the block before we could evict it.
-
-      // NOTE(zuyu): It is ok to release the shard for a block or blob, before
-      // before 'eviction_lock' destructs, because we will never encounter a
-      // self-deadlock in a single thread, and in multiple-thread case some
-      // thread will block but not deadlock if there is a shard collision.
-      lock_manager_.release(block_to_evict);
       continue;
     }
     if (saveBlockOrBlob(block_to_evict)) {
       evictBlockOrBlob(block_to_evict);
     }  // else : Someone sneaked in and evicted the block before we could.
-
-    // NOTE(zuyu): It is ok to release the shard for a block or blob, before
-    // before 'eviction_lock' destructs, because we will never encounter a
-    // self-deadlock in a single thread, and in multiple-thread case some
-    // thread will block but not deadlock if there is a shard collision.
-    lock_manager_.release(block_to_evict);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a0024a2e/utility/ShardedLockManager.hpp
----------------------------------------------------------------------
diff --git a/utility/ShardedLockManager.hpp b/utility/ShardedLockManager.hpp
index 520f879..03f3e3c 100644
--- a/utility/ShardedLockManager.hpp
+++ b/utility/ShardedLockManager.hpp
@@ -58,65 +58,17 @@ class ShardedLockManager {
   /**
    * @brief Get the SharedMutex corresponding to the provided key.
    * @param key The key to map to a SharedMutex.
-   * @param has_collision Whether accessing the given key would result in a
-   *        hash collision. Used in StorageManager::makeRoomForBlock only.
-   * @return The corresponding SharedMutex if there is no collision; otherwise,
-   *         the collision SharedMutex.
+   * @return The corresponding SharedMutex.
    */
-  SharedMutexT* get(const T key, bool *has_collision = nullptr) {
+  SharedMutexT* get(const T key) {
     const std::size_t shard = hash_(key) % N;
-
-    if (has_collision != nullptr) {
-      // In StorageManager::makeRoomForBlock, check whether the evicting block
-      // or blob has a shard collision with existing referenced shards.
-      SpinSharedMutexSharedLock<false> read_lock(shard_count_mutex_);
-      if (shard_count_.find(shard) != shard_count_.end()) {
-        *has_collision = true;
-        return &collision_mutex_;
-      }
-    }
-
-    {
-      SpinSharedMutexExclusiveLock<false> write_lock(shard_count_mutex_);
-
-      // Check one more time for the evicting block or blob if there is a shard
-      // collision.
-      auto it = shard_count_.find(shard);
-      if (it != shard_count_.end()) {
-        if (has_collision != nullptr) {
-          *has_collision = true;
-          return &collision_mutex_;
-        }
-
-        ++it->second;
-      } else {
-        shard_count_.emplace(shard, 1);
-      }
-    }
     return &sharded_mutexes_[shard];
   }
 
-  /**
-   * @brief Release the shard corresponding to the provided key.
-   * @param key The key to compute the shard.
-   */
-  void release(const T key) {
-    SpinSharedMutexExclusiveLock<false> write_lock(shard_count_mutex_);
-    auto it = shard_count_.find(hash_(key) % N);
-    DCHECK(it != shard_count_.end());
-
-    if (--it->second == 0) {
-      shard_count_.erase(it);
-    }
-  }
-
  private:
   std::hash<T> hash_;
   std::array<SharedMutexT, N> sharded_mutexes_;
 
-  // The placeholder mutex used whenever there is a hash collision.
-  SharedMutexT collision_mutex_;
-
   // Count all shards referenced by StorageManager in multiple threads.
   // The key is the shard, while the value is the count. If the count equals to
   // zero, we delete the shard entry.