You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by se...@apache.org on 2017/05/08 23:01:29 UTC
[1/3] incubator-trafodion git commit: [TRAFODION-2604] sort merge
phase memory pool improvements
Repository: incubator-trafodion
Updated Branches:
refs/heads/master 33a9005d2 -> bf3e8d083
[TRAFODION-2604] sort merge phase memory pool improvements
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/8a7deed6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/8a7deed6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/8a7deed6
Branch: refs/heads/master
Commit: 8a7deed640202084169091f89d320115f5a55aba
Parents: a9ec249
Author: Prashant Vasudev <pr...@esgyn.com>
Authored: Tue May 2 22:04:16 2017 +0000
Committer: Prashant Vasudev <pr...@esgyn.com>
Committed: Tue May 2 22:04:16 2017 +0000
----------------------------------------------------------------------
core/sql/executor/ex_sort.cpp | 283 +++++++++++++++++++++++------------
core/sql/executor/ex_sort.h | 35 ++++-
core/sql/sort/SortUtil.cpp | 98 +++++++-----
core/sql/sort/SortUtil.h | 6 +-
core/sql/sqlcomp/nadefaults.cpp | 2 +-
5 files changed, 283 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/8a7deed6/core/sql/executor/ex_sort.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ex_sort.cpp b/core/sql/executor/ex_sort.cpp
index b56cdce..d76124c 100644
--- a/core/sql/executor/ex_sort.cpp
+++ b/core/sql/executor/ex_sort.cpp
@@ -98,13 +98,41 @@ ExOperStats * ExSortTcb::doAllocateStatsEntry(CollHeap *heap,
void ExSortTcb::setupPoolBuffers(ex_queue_entry *pentry_down)
{
+ //In case of prepare once and execute many, if sort overflowed,
+ //sortSendPool_ is deallocated and receivePool_ is newly allocated
+ //when switching from sortSend to sortReceive. Note that if sort
+ //did not overflow, sortSendPool_ and receivePool_ are same.
+ //Here we need to reset these pools to start again.
+ if((sortSendPool_ == NULL) &&
+ (receivePool_ != NULL) && //pool reference
+ (sortPool_ != NULL)) //actual pool
+ {
+ //receivePool_ always allocated outside of quota system.
+ //so no need to adjust quota system especially when sortSendPool_
+ //and receivePool_ are not the same.
+ delete receivePool_;
+ receivePool_ = NULL;
+
+ delete sortPool_;
+ sortPool_ = NULL;
+
+ //Also delete and reallocate the space object from which the sortPool_ is
+ //allocated. This will really release the memory.
+ delete sortSpace_;
+ sortSpace_ = NULL;
+ }
+
//if any of these pools is already allocated, most likely
//from a prepare once execute many scenario, then no need
//to reallocate the pool again. Just return.
- if(partialSortPool_ || topNSortPool_ || regularSortPool_)
+ if(partialSortPool_ || topNSortPool_ || sortPool_)
return;
-
- CollHeap *space = getGlobals()->getSpace();
+
+ if(!sortSpace_)
+ {
+ sortSpace_ = new (sortHeap_)Space(Space::EXECUTOR_SPACE, TRUE,(char*)"Sort Space setupPoolBuffers");
+ sortSpace_->setParent(sortHeap_);
+ }
// Allocate the buffer pool.
// Note that when memoryQuota system is enabled, we initialize the
@@ -114,64 +142,77 @@ void ExSortTcb::setupPoolBuffers(ex_queue_entry *pentry_down)
// account the estimate number of rows by the compiler and limited by
// maximum of GEN_SORT_MAX_BUFFER_SIZE. The memory quota system will
// come into force for additional buffers following this initial buffer.
- Lng32 numBuffs = (sortTdb().sortOptions_->memoryQuotaMB()<= 0)?sortTdb().numBuffers_:2;
+ initialNumOfPoolBuffers_ = sortTdb().numBuffers_;
Lng32 numSortBuffs = 0;
// need separate pools for sorting and saving result rows
if (sortTdb().partialSort())
{
// give result pool and sort pool each half of the space
- numSortBuffs = numBuffs = (numBuffs + 1)/2;
+ numSortBuffs = initialNumOfPoolBuffers_ = (initialNumOfPoolBuffers_ + 1)/2;
if(numSortBuffs < 2) numSortBuffs = 2; //initialize the pool with atleast 2 buffers.
- if(numBuffs < 2) numBuffs = 2;
+ if(initialNumOfPoolBuffers_ < 2) initialNumOfPoolBuffers_ = 2;
}
- //partial sort uses two pools. one partialSortPool_ and regularSortPool_
+ //partial sort uses two pools. one partialSortPool_ and sortPool_
//partialSortPool_ will be used for receiving the sorted records.
if (numSortBuffs > 0)
{
- partialSortPool_ = new(space) sql_buffer_pool(
- numSortBuffs, sortTdb().bufferSize_, space);
+ partialSortPool_ = new(sortSpace_) sql_buffer_pool(
+ numSortBuffs, sortTdb().bufferSize_, sortSpace_);
}
- //setup sortPool_ reference handle. If TopN, topNSortPool_ will be allocated
- //from ExSimpleSQLBuffer based on numBuffs. If not TopN, regularSortPool_ will
+ //setup sortSendPool_ reference handle. If TopN, topNSortPool_ will be allocated
+ //from ExSimpleSQLBuffer based on numBuffs. If not TopN, sortPool_ will
//be allocated from sql_buffer_pool based on numBuffs.
- //sortPool_ reference handle will either point to topNSortPool or
- //regularSortPool.
+ //sortSendPool_ reference handle will either point to topNSortPool or
+ //sortPool_.
if((pentry_down->downState.request == ex_queue::GET_N) &&
(pentry_down->downState.requestValue > 0) &&
(sortTdb().topNSortEnabled()))
{
- topNSortPool_ = new(space)
+ topNSortPool_ = new(sortSpace_)
ExSimpleSQLBuffer(pentry_down->downState.requestValue + 1,
- sortTdb().sortRecLen_, space);
+ sortTdb().sortRecLen_, sortSpace_);
- sortPool_ = new(space) ExSortBufferPool((void*)topNSortPool_,
- ExSortBufferPool::SIMPLE_BUFFER_TYPE);
+ sortSendPool_ = new(sortSpace_) ExSortBufferPool((void*)topNSortPool_,
+ ExSortBufferPool::SIMPLE_BUFFER_TYPE,
+ bmoStats_);
}
else
{
- regularSortPool_ = new(space) sql_buffer_pool(numBuffs,sortTdb().bufferSize_,
- space);
+ sortPool_ = new(sortSpace_) sql_buffer_pool(initialNumOfPoolBuffers_,
+ sortTdb().bufferSize_,
+ sortSpace_);
- sortPool_ = new(space)ExSortBufferPool((void*)regularSortPool_,
- ExSortBufferPool::SQL_BUFFER_TYPE);
+ sortSendPool_ = new(sortSpace_)ExSortBufferPool((void*)sortPool_,
+ ExSortBufferPool::SQL_BUFFER_TYPE,
+ bmoStats_);
}
//setup the receive pool. Receive pool is the same as sortPool for all cases except
//for partial sort.
if(sortTdb().partialSort())
- receivePool_ = new(space) ExSortBufferPool(partialSortPool_,
- ExSortBufferPool::SQL_BUFFER_TYPE);
+ {
+ receivePool_ = new(sortSpace_) ExSortBufferPool(partialSortPool_,
+ ExSortBufferPool::SQL_BUFFER_TYPE,
+ bmoStats_);
+ }
else
- receivePool_ = sortPool_;
+ {
+ //Assume sort does not overflow to being with.
+ //In this case, receivePool_ and sortSendpool_ are same.
+ //if overflow occured ( no overflow in topN case) then
+ //sortSendpool_( and actual sortPool_)is deleted and receivePool_
+ //is allocated new.
+ receivePool_ = sortSendPool_;
+ }
//CIF defrag option only if NOT topNSortPool_
defragTd_ = NULL;
if (considerBufferDefrag() && (topNSortPool_ == NULL))
{
- defragTd_ = sortPool_->addDefragTuppDescriptor(sortTdb().sortRecLen_);
+ defragTd_ = sortSendPool_->addDefragTuppDescriptor(sortTdb().sortRecLen_);
}
if(bmoStats_)
@@ -182,6 +223,51 @@ void ExSortTcb::setupPoolBuffers(ex_queue_entry *pentry_down)
}
+//This method is called only if sort overflowed and transitioning
+//from sortSend to sortReceive. SortPool_ is deallocated and space
+//object is deallocated and reallocated minimum to reuse memory (quota)
+//for sort Receive.
+void ExSortTcb::deleteAndReallocateSortPool()
+{
+ //delete reference to sortPool_
+ ex_assert(sortSendPool_ == receivePool_, "sortSendPool_ != receivePool_");
+
+ //initialNumOfPoolBuffers_ is allocated outside of quota system.
+ sortUtil_->returnConsumedMemoryQuota(
+ (sortSendPool_->get_number_of_buffers() - initialNumOfPoolBuffers_) *
+ sortTdb().bufferSize_);
+
+ delete sortSendPool_;
+ sortSendPool_ = NULL;
+ receivePool_ = NULL;
+
+ //delete actual pool.
+ //if we are here, sortPool_ must be valid since we should not
+ //reach here if topNSort or partial sort.
+ delete sortPool_;
+ sortPool_ = NULL;
+
+ //Also delete and reallocate the space object from which the pool was
+ //allocated. This will really release the memory.
+ delete sortSpace_;
+ sortSpace_ = new(sortHeap_)Space(Space::EXECUTOR_SPACE, TRUE,(char*)"Sort Space reallocated");
+ sortSpace_->setParent(sortHeap_);
+
+
+ //now allocate a pool and assign it to receivePool_ handle.
+ //Allocated outside of memory quota.
+ sortPool_ = new(sortSpace_) sql_buffer_pool(initialNumOfPoolBuffers_
+ ,sortTdb().bufferSize_,
+ sortSpace_);
+
+ receivePool_ = new(sortSpace_)ExSortBufferPool((void*)sortPool_,
+ ExSortBufferPool::SQL_BUFFER_TYPE,
+ bmoStats_);
+
+ if (bmoStats_)
+ bmoStats_->setSpaceBufferCount(initialNumOfPoolBuffers_);
+}
+
//
// Build a sort tcb
//
@@ -218,7 +304,8 @@ ExSortTcb::ExSortTcb(const ExSortTdb & sort_tdb,
sortStats_ = NULL;
childTcb_ = &child_tcb;
- CollHeap * space = glob->getSpace();
+ //Create heap to be used by sort.
+ sortHeap_ = new(getHeap()) NAHeap("Sort Heap", (NAHeap *)getHeap(), 204800);
// cast sort tdb to non-const
ExSortTdb * st = (ExSortTdb *)&sort_tdb;
@@ -230,16 +317,18 @@ ExSortTcb::ExSortTcb(const ExSortTdb & sort_tdb,
allocateParentQueues(qparent_);
// Intialize processedInputs_ to the next request to process
processedInputs_ = qparent_.down->getTailIndex();
- workAtp_ = allocateAtp(sort_tdb.workCriDesc_, space);
- workAtp_->getTupp(2) = new(space) tupp_descriptor();
+ workAtp_ = allocateAtp(sort_tdb.workCriDesc_, glob->getSpace());
+ workAtp_->getTupp(2) = new(glob->getSpace()) tupp_descriptor();
- //buffer pools are allocated in SORT_PREP work phase.
+ //buffer pools are allocated from sortSpace_ in SORT_PREP work phase.
+ sortSpace_ = NULL;
topNSortPool_ = NULL;
- regularSortPool_ = NULL;
+ sortPool_ = NULL;
partialSortPool_ = NULL;
+ initialNumOfPoolBuffers_ = 0;
//pool reference handles. Initialized in SORT_PREP phase.
- sortPool_ = NULL;
+ sortSendPool_ = NULL;
receivePool_ = NULL;
*(short *)&sortType_ = 0;
@@ -267,14 +356,11 @@ ExSortTcb::ExSortTcb(const ExSortTdb & sort_tdb,
break;
}
- sortUtil_ = new(space) SortUtil(sort_tdb.getExplainNodeId());
+ sortUtil_ = new(sortHeap_) SortUtil(sort_tdb.getExplainNodeId());
sortDiag_ = NULL;
- // Create heap to be used by sort.
- sortHeap_ = new(space) NAHeap("Sort Heap", (NAHeap *)getHeap(), 204800);
-
- sortCfg_ = new(space) SortUtilConfig(sortHeap_);
+ sortCfg_ = new(sortHeap_) SortUtilConfig(sortHeap_);
sortCfg_->setSortType(sortType_);
sortCfg_->setScratchThreshold(st->sortOptions_->scratchFreeSpaceThresholdPct());
@@ -395,29 +481,39 @@ void ExSortTcb::freeResources()
delete partialSortPool_;
partialSortPool_ = NULL;
}
- if (regularSortPool_)
+ if (sortPool_)
{
- delete regularSortPool_;
- regularSortPool_ = NULL;
+ delete sortPool_;
+ sortPool_ = NULL;
}
if (topNSortPool_)
{
delete topNSortPool_;
topNSortPool_ = NULL;
}
- if (sortPool_)
+
+ //sortSendPool_ and receivePool_
+ //are ExSortBufferPool class objects.
+ if (sortSendPool_)
{
- if(sortPool_ != receivePool_)
+ if(sortSendPool_ != receivePool_)
{
- delete sortPool_;
+ delete sortSendPool_;
}
- sortPool_ = NULL;
+ sortSendPool_ = NULL;
}
if (receivePool_)
{
delete receivePool_;
receivePool_ = NULL;
}
+
+ if(sortSpace_)
+ {
+ delete sortSpace_;
+ sortSpace_ = NULL;
+ }
+
delete qparent_.up;
delete qparent_.down;
};
@@ -676,8 +772,6 @@ short ExSortTcb::workUp()
// LCOV_EXCL_STOP
case ExSortTcb::SORT_PREP:
{
- sortHeap_->reInitialize();
-
if ( sortDiag_ != NULL )
{
// LCOV_EXCL_START
@@ -846,7 +940,22 @@ short ExSortTcb::workUp()
if (qparent_.up->isFull()){
return workStatus(WORK_OK); // parent queue is full. Just return
}
-
+
+ //First time reaching here and before calling
+ //sortReceive, release the buffers used during
+ //sortSend phase ONLY if sort overflowed( by this
+ //time, all sort records are in scratch files).
+ //Overflow does not happen in TopNSort. Partial sort
+ //has a separate receive pool.
+ if((sortSendPool_ != NULL) && //not yet released
+ (!pstate.noOverflow_) && //overflow happened
+ (!sortTdb().partialSort())) //not partial sort
+ {
+
+ deleteAndReallocateSortPool();
+ }
+
+
ex_queue_entry * pentry = qparent_.up->getTailEntry();
rc = sortReceive(pentry_down, request, pentry, FALSE,
pentry_down->downState.parentIndex,
@@ -948,7 +1057,7 @@ short ExSortTcb::sortSend(ex_queue_entry * srcEntry,
td = NULL;
if (defragTd_ && //considerBufferDefrag() && //resizeCifRecord() &&
- !sortPool_->currentBufferHasEnoughSpace(sortTdb().sortRecLen_))
+ !sortSendPool_->currentBufferHasEnoughSpace(sortTdb().sortRecLen_))
{
#if defined(_DEBUG)
assert(resizeCifRecord());
@@ -978,14 +1087,14 @@ short ExSortTcb::sortSend(ex_queue_entry * srcEntry,
{
defragLength = *rowLenPtr;
td =
- sortPool_->get_free_tupp_descriptor(defragLength + dataOffset, &buf);// do we need &buf here??
+ sortSendPool_->get_free_tupp_descriptor(defragLength + dataOffset, &buf);// do we need &buf here??
}
}
}
else
{
td =
- sortPool_->get_free_tupp_descriptor(sortTdb().sortRecLen_, &buf);
+ sortSendPool_->get_free_tupp_descriptor(sortTdb().sortRecLen_, &buf);
}
@@ -996,12 +1105,12 @@ short ExSortTcb::sortSend(ex_queue_entry * srcEntry,
if (sortTdb().sortOptions_->dontOverflow())
{
// LCOV_EXCL_START
- sortPool_->addBuffer(sortTdb().bufferSize_);
+ sortSendPool_->addBuffer(sortTdb().bufferSize_);
// LCOV_EXCL_STOP
}
// add more buffers if there is more space
//available in the pool.
- else if (sortPool_->get_number_of_buffers() <
+ else if (sortSendPool_->get_number_of_buffers() <
sortTdb().maxNumBuffers_)
{
//No more space in the pool to allocate sorted rows.
@@ -1013,7 +1122,7 @@ short ExSortTcb::sortSend(ex_queue_entry * srcEntry,
sortUtil_->consumeMemoryQuota(sortTdb().bufferSize_))
{
// Add a new buffer.
- sortPool_->addBuffer(sortTdb().bufferSize_);
+ sortSendPool_->addBuffer(sortTdb().bufferSize_);
}
else
{
@@ -1061,41 +1170,19 @@ short ExSortTcb::sortSend(ex_queue_entry * srcEntry,
// have been added or tupples freed because of overflow
// completion.
td =
- sortPool_->get_free_tupp_descriptor(sortTdb().sortRecLen_, &buf);
+ sortSendPool_->get_free_tupp_descriptor(sortTdb().sortRecLen_, &buf);
- if (td == NULL)
- {
- // This is a bad situation where executor does not
- // have enough space to proceed giving rows to Sort
- // Add another buffer to the dynamic buffer list
- // after increasing maxNumBuffers (which is set at
- // compile time)
-
- // Increase the max number of buffers in the
- // dynamic array list
- // LCOV_EXCL_START
- sortPool_->set_max_number_of_buffers
- (sortPool_->get_max_number_of_buffers() +1);
-
- sortPool_->addBuffer(sortPool_->defaultBufferSize());
-
- // allocate the tuple yet again.
- td =
- sortPool_->get_free_tupp_descriptor(sortTdb().sortRecLen_,
- &buf);
-
- if (td == NULL)
- {
+ if (td == NULL)
+ {
// This is definitely a problem
- ex_assert(0,"Must have space to allocate tuple in newly added buffer");
+ ex_assert(0,"Must get a tuple from pool as they must be available");
step = ExSortTcb::SORT_ERROR;
break;
- // LCOV_EXCL_STOP
- }
- }
- }
- if (bmoStats_)
- bmoStats_->setSpaceBufferCount(sortPool_->get_number_of_buffers());
+ // LCOV_EXCL_STOP
+ }
+
+ }
+ //reaching here td is not NULL.
}
else
{
@@ -1105,7 +1192,7 @@ short ExSortTcb::sortSend(ex_queue_entry * srcEntry,
allocatedTuppDesc = NULL;
// LCOV_EXCL_STOP
}
-
+ //reaching here td is not NULL.
ex_expr::exp_return_type retCode = ex_expr::EXPR_OK;
char *dataPointer = td->getTupleAddress();
@@ -1433,13 +1520,26 @@ short ExSortTcb::sortReceive(ex_queue_entry * pentry_down,
td = receivePool_->get_free_tupp_descriptor(sortTdb().sortRecLen_,
&buf);
- if(td == NULL)
+ if(td == NULL)
{
- // no more space in the pool to allocate sorted rows from.
- // Return and come back later when some space gets freed up.
- // LCOV_EXCL_START
- workRC = WORK_POOL_BLOCKED;
- return workStatus(1);
+ //if sortSendPool_ is NULL, means there is option to
+ //try and add additional buffers as long as upqueue is not full.
+ //Upqueue will drive addition of buffers(sortSend is not called
+ //if upQueue is full), assumption is very few buffers.
+ //Add buffer outside of memory quota.
+ receivePool_->addBuffer(sortTdb().bufferSize_);
+
+ //try getting a tupp now.
+ td = receivePool_->get_free_tupp_descriptor(sortTdb().sortRecLen_,
+ &buf);
+ if(td == NULL)
+ {
+ // no more space in the pool to allocate sorted rows from.
+ //Return and come back later when some space gets freed up.
+ // LCOV_EXCL_START
+ workRC = WORK_POOL_BLOCKED;
+ return workStatus(1);
+ }
// LCOV_EXCL_STOP
}
tgtEntry->getAtp()->getTupp(sortTdb().tuppIndex_) = td;
@@ -1752,7 +1852,6 @@ short ExSortTcb::done(
step = ExSortTcb::SORT_EMPTY;
sortUtil_->sortEnd();
- sortHeap_->reInitialize();
if(setCompareTd_)
{
// LCOV_EXCL_START
@@ -1980,8 +2079,6 @@ short ExSortFromTopTcb::work()
case ExSortTcb::SORT_PREP:
{
- sortHeap_->reInitialize();
-
if ( sortDiag_ != NULL )
{
// LCOV_EXCL_START
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/8a7deed6/core/sql/executor/ex_sort.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/ex_sort.h b/core/sql/executor/ex_sort.h
index 72229cd..0e435f3 100644
--- a/core/sql/executor/ex_sort.h
+++ b/core/sql/executor/ex_sort.h
@@ -46,6 +46,7 @@
#include "NABoolean.h"
#include "ComTdbSort.h"
#include "ExSimpleSqlBuffer.h"
+#include "ExStats.h"
// -----------------------------------------------------------------------
// Classes defined in this file
@@ -136,7 +137,8 @@ public:
SORT_CANCEL_CHILD,
SORT_DONE_WITH_QND
};
-
+private:
+ void deleteAndReallocateSortPool();
protected:
friend class ExSortTdb;
friend class ExSortPrivateState;
@@ -155,17 +157,28 @@ protected:
ExSubtask *ioEventHandler_;
// this heap is used by sort.
NAHeap * sortHeap_;
+
+ // this space is used for sort pools.
+ Space * sortSpace_;
// store a sort error in case up queue is full we still have a handle on it
ComDiagsArea * sortDiag_;
NAList<ComDiagsArea *> *nfDiags_;
sql_buffer_pool *partialSortPool_; //used exclusively by partial sort
- sql_buffer_pool *regularSortPool_; //used by regular sort and partial sort.
+ sql_buffer_pool *sortPool_; //used by regular sort and partial sort.
ExSimpleSQLBuffer *topNSortPool_; //used only by topNSort
-
- ExSortBufferPool *sortPool_; // pool reference handle used for sorting.
+ //sortPool_ or partialSortPool_ initial number of
+ //buffers used outside of memoryQuota.
+ Lng32 initialNumOfPoolBuffers_;
+
+ //sortSendPool_ and receivepool_ are generic handles
+ //that point to the actual pool like partialSortPool_ or
+ //sortPool_ or topNSortPool_.
+ //If sort does not overflow, sortSendPool_ and receivePool_
+ //point to the same pools.
+ ExSortBufferPool *sortSendPool_; // pool reference handle used for sorting.
ExSortBufferPool *receivePool_; // pool reference handle used for receiving sorted rows
@@ -371,8 +384,9 @@ class ExSortBufferPool : public NABasicObject
SQL_BUFFER_TYPE
};
- ExSortBufferPool(void *pool, PoolType poolType)
+ ExSortBufferPool(void *pool, PoolType poolType, ExBMOStats *bmoStats)
{
+ bmoStats_ = bmoStats;
if(poolType == SIMPLE_BUFFER_TYPE)
{
simplePool_ = (ExSimpleSQLBuffer *)pool;
@@ -416,10 +430,14 @@ class ExSortBufferPool : public NABasicObject
SqlBufferBase * addBuffer(Lng32 totalBufferSize, bool failureIsFatal = true)
{
+ SqlBufferBase * buf = NULL;
if (sqlBufferPool_)
- return sqlBufferPool_->addBuffer(totalBufferSize, failureIsFatal);
- else
- return NULL;
+ {
+ SqlBufferBase * buf = sqlBufferPool_->addBuffer(totalBufferSize, failureIsFatal);
+ if (bmoStats_)
+ bmoStats_->setSpaceBufferCount(sqlBufferPool_->get_number_of_buffers());
+ }
+ return buf;
}
inline Lng32 get_number_of_buffers() const
@@ -475,6 +493,7 @@ class ExSortBufferPool : public NABasicObject
PoolType poolType_;
ExSimpleSQLBuffer *simplePool_;
sql_buffer_pool *sqlBufferPool_;
+ ExBMOStats *bmoStats_;
};
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/8a7deed6/core/sql/sort/SortUtil.cpp
----------------------------------------------------------------------
diff --git a/core/sql/sort/SortUtil.cpp b/core/sql/sort/SortUtil.cpp
index 74bb2c8..08573da 100644
--- a/core/sql/sort/SortUtil.cpp
+++ b/core/sql/sort/SortUtil.cpp
@@ -63,12 +63,15 @@ SortUtil::SortUtil(Lng32 explainNodeId) :
version_ = 1;
state_ = SORT_INIT;
config_ = NULL;
+ internalSort_ = TRUE;
+ sortReceivePrepared_ = FALSE;
sortAlgo_ = NULL;
scratch_ = NULL;
memoryQuotaUtil_ = 0;
memMonitor_ = NULL;
overheadPerRecord_ = 0;
bmoStats_ = NULL;
+
}
SortUtil::~SortUtil(void)
@@ -83,6 +86,21 @@ SortUtil::~SortUtil(void)
}
}
+void SortUtil::reInit()
+{
+ doCleanUp();
+
+ version_ = 1;
+ state_ = SORT_INIT;
+ internalSort_ = TRUE;
+ sortReceivePrepared_ = FALSE;
+ sortAlgo_ = NULL;
+ scratch_ = NULL;
+ memoryQuotaUtil_ = 0;
+ memMonitor_ = NULL;
+ overheadPerRecord_ = 0;
+}
+
void SortUtil::DeleteSortAlgo() //delete Tree or Qsort after each merge
{
if (sortAlgo_ != NULL) { //delete indirect space in Tree, Qsort
@@ -157,7 +175,7 @@ NABoolean SortUtil::sortInitialize(SortUtilConfig& config, ULng32 topNSize)
// Basically we delete any memory that was allocated dynamically
// but was not yet released. Also, the sortError is reset.
//---------------------------------------------------------------
- doCleanUp();
+ reInit();
//if topNSize_ is set, then use TopN.
if(config.topNSort_ && topNSize)
@@ -238,6 +256,10 @@ NABoolean SortUtil::sortEnd(void)
//gets used.
returnExcessMemoryQuota(overheadPerRecord_);
+ //update heap one final time.
+ if (bmoStats_)
+ bmoStats_->updateBMOHeapUsage((NAHeap *)config_->heapAddr_);
+
// SQLMXLoggingArea::logExecRtInfo(NULL,0,"Sort operation has ended", explainNodeId_);
return SORT_SUCCESS;
}
@@ -304,33 +326,25 @@ Lng32 SortUtil::sortSendEnd(NABoolean& internalSort)
retcode = sortAlgo_->sortSendEnd() ;
- if (retcode)
- return retcode;
-
if (sortAlgo_->isInternalSort())
+ internalSort_ = internalSort = TRUE_L;
+ else
+ internalSort_ = internalSort = FALSE_L;
+
+ if(config_->logInfoEvent())
{
- internalSort = TRUE_L;
- if(config_->logInfoEvent())
- {
- char msg[500];
- str_sprintf(msg,
- "Sort is performing internal sort: NumRecs:%d", stats_.numRecs_);
-
- SQLMXLoggingArea::logExecRtInfo(NULL, 0,msg, explainNodeId_);
- }
- }
- else
- {
- internalSort = FALSE_L;
- retcode = sortSendEndProcessing() ;
- return retcode;
+ char msg[500];
+ str_sprintf(msg,
+ "Sort is performing %s sort: NumRecs:%d",
+ internalSort? "internal":"external", stats_.numRecs_);
+ SQLMXLoggingArea::logExecRtInfo(NULL, 0,msg, explainNodeId_);
}
-
+
return retcode;
}
//----------------------------------------------------------------------
-// Name : sortSendEndProcessing
+// Name : sortReceivePrepare
//
// Parameters : ...
//
@@ -341,7 +355,7 @@ Lng32 SortUtil::sortSendEnd(NABoolean& internalSort)
// SORT_FAILURE if any error encounterd.
//
//----------------------------------------------------------------------
-Lng32 SortUtil::sortSendEndProcessing(void)
+Lng32 SortUtil::sortReceivePrepare(void)
{
ULng32 initialRunSize = 0;
Lng32 runnum = 1L;
@@ -490,6 +504,10 @@ Lng32 SortUtil::sortSendEndProcessing(void)
}
}
+ //update heap usage after allocating merge buffers.
+ if (bmoStats_)
+ bmoStats_->updateBMOHeapUsage((NAHeap *)config_->heapAddr_);
+
//This is the result merge order. Set this in stats_.mergeOrder_.
stats_.mergeOrder_ = config_->mergeOrder_;
@@ -566,7 +584,11 @@ Lng32 SortUtil::sortSendEndProcessing(void)
}
}
}
-
+
+ //after first merge order update heap usage.
+ if (bmoStats_)
+ bmoStats_->updateBMOHeapUsage((NAHeap *)config_->heapAddr_);
+
//subsequent merges in loop.
for (Int32 i = 0; i < stats_.numInterPasses_; i++)
{
@@ -661,10 +683,6 @@ Lng32 SortUtil::sortSendEndProcessing(void)
state_ = SORT_FINAL_MERGE;
stats_.scrNumBlocks_ = scratch_->getTotalNumOfScrBlocks();
- ScratchFileMap* tempFilesMap;
- tempFilesMap = scratch_->getScrFilesMap();
- //stats_.scrNumWrites_ = tempFilesMap->totalNumOfWrites();
- //stats_.scrNumAwaitio_ = tempFilesMap->totalNumOfAwaitio();
scratch_->getTotalIoWaitTime(stats_.ioWaitTime_);
stats_.memSizeB_ += stats_.finalMergeOrder_*stats_.scrBlockSize_*2 +
stats_.finalMergeOrder_*sizeof(TreeNode) +
@@ -697,7 +715,11 @@ Lng32 SortUtil::sortSendEndProcessing(void)
return SORT_FAILURE;
state_ = SORT_RECEIVE;
-
+
+ //final merge tree setup. update heap usage.
+ if (bmoStats_)
+ bmoStats_->updateBMOHeapUsage((NAHeap *)config_->heapAddr_);
+
return SORT_SUCCESS;
}
@@ -718,14 +740,20 @@ Lng32 SortUtil::sortSendEndProcessing(void)
Lng32 SortUtil::sortReceive(void* record, ULng32& len)
{
Lng32 status;
+
+ if(!internalSort_ && !sortReceivePrepared_)
+ {
+ Lng32 retCode = sortReceivePrepare();
+ if(retCode != SORT_SUCCESS)
+ return retCode;
+
+ sortReceivePrepared_ = TRUE;
+ }
+
status = sortAlgo_->sortReceive(record, len);
if ((len == 0) && (!config_->partialSort_)) {
if(scratch_)
{
- ScratchFileMap* tempFilesMap;
- tempFilesMap = scratch_->getScrFilesMap();
- //stats_.scrNumReads_ = tempFilesMap->totalNumOfReads();
- //stats_.scrNumAwaitio_ = tempFilesMap->totalNumOfAwaitio();
scratch_->getTotalIoWaitTime(stats_.ioWaitTime_);
}
stats_.numCompares_ += sortAlgo_->getNumOfCompares();
@@ -760,10 +788,6 @@ Lng32 SortUtil::sortReceive(void*& record,ULng32& len,void*& tupp)
if ((len == 0) && (!config_->partialSort_)) {
if(scratch_)
{
- ScratchFileMap* tempFilesMap;
- tempFilesMap = scratch_->getScrFilesMap();
- //stats_.scrNumReads_ = tempFilesMap->totalNumOfReads();
- //stats_.scrNumAwaitio_ = tempFilesMap->totalNumOfAwaitio();
scratch_->getTotalIoWaitTime(stats_.ioWaitTime_);
}
stats_.numCompares_ += sortAlgo_->getNumOfCompares();
@@ -771,7 +795,7 @@ Lng32 SortUtil::sortReceive(void*& record,ULng32& len,void*& tupp)
stats_.elapsedTime_ = currentTimeJ - stats_.beginSortTime_;
if (config_->logInfoEvent()) {
char msg[500];
- str_sprintf(msg, "Sort elapsed time : %Ld; Num runs : %d; runsize :%d",
+ str_sprintf(msg, "Internal sort performed. Sort elapsed time : %Ld; Num runs : %d; runsize :%d",
stats_.elapsedTime_,stats_.numInitRuns_,sortAlgo_->getRunSize());
SQLMXLoggingArea::logExecRtInfo(NULL, 0,msg, explainNodeId_);
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/8a7deed6/core/sql/sort/SortUtil.h
----------------------------------------------------------------------
diff --git a/core/sql/sort/SortUtil.h b/core/sql/sort/SortUtil.h
index 82b86f7..47af5ca 100644
--- a/core/sql/sort/SortUtil.h
+++ b/core/sql/sort/SortUtil.h
@@ -105,13 +105,15 @@ public:
UInt32 estimateMemoryToAvoidIntMerge(UInt32 numruns, Int32 sortMergeBlocksPerBuffer);
UInt32 estimateMergeOrder(UInt32 maxMergeMemory, Int32 sortMergeBlocksPerBuffer);
protected :
- Lng32 sortSendEndProcessing();
+ Lng32 sortReceivePrepare();
private:
-
+ void reInit();
short version_; // The ArkSort version
SORT_STATE state_;
SortUtilConfig *config_;
+ NABoolean internalSort_; //indicates if overflowed or not.
+ NABoolean sortReceivePrepared_; //if sort overflowed, prepare the merge tree for receive.
SortAlgo *sortAlgo_; // Algorithms are implemented as sub-classes
// of Sort algorithm base class.
// This implementation allows extensibility as
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/8a7deed6/core/sql/sqlcomp/nadefaults.cpp
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp
index 19fe1ac..4048be2 100644
--- a/core/sql/sqlcomp/nadefaults.cpp
+++ b/core/sql/sqlcomp/nadefaults.cpp
@@ -1641,7 +1641,7 @@ SDDkwd__(EXE_DIAGNOSTIC_EVENTS, "OFF"),
DDui1__(GEN_SORT_MAX_BUFFER_SIZE, "5242880"),
DDui1__(GEN_SORT_MAX_NUM_BUFFERS, "160"),
DDui___(GEN_SORT_MIN_BUFFER_SIZE, "0"),
- DDui1__(GEN_SORT_NUM_BUFFERS, "4"),
+ DDui1__(GEN_SORT_NUM_BUFFERS, "2"),
DDui1__(GEN_SORT_SIZE_DOWN, "2"),
DDui1__(GEN_SORT_SIZE_UP, "1024"),
DDkwd__(GEN_SORT_TOPN, "ON"),
[2/3] incubator-trafodion git commit: fix to address review comments.
NADELETE used where needed.
Posted by se...@apache.org.
fix to address review comments. NADELETE used where needed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/5d00a871
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/5d00a871
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/5d00a871
Branch: refs/heads/master
Commit: 5d00a87195005ac170aa4ba1be69d7a2571b3b24
Parents: 8a7deed
Author: Prashant Vasudev <pr...@esgyn.com>
Authored: Fri May 5 23:25:43 2017 +0000
Committer: Prashant Vasudev <pr...@esgyn.com>
Committed: Fri May 5 23:25:43 2017 +0000
----------------------------------------------------------------------
core/sql/executor/ex_sort.cpp | 39 +++++++++++++++++++-------------------
1 file changed, 20 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5d00a871/core/sql/executor/ex_sort.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ex_sort.cpp b/core/sql/executor/ex_sort.cpp
index d76124c..f631b45 100644
--- a/core/sql/executor/ex_sort.cpp
+++ b/core/sql/executor/ex_sort.cpp
@@ -110,15 +110,15 @@ void ExSortTcb::setupPoolBuffers(ex_queue_entry *pentry_down)
//receivePool_ always allocated outside of quota system.
//so no need to adjust quota system especially when sortSendPool_
//and receivePool_ are not the same.
- delete receivePool_;
+ NADELETE(receivePool_, ExSortBufferPool, sortSpace_);
receivePool_ = NULL;
- delete sortPool_;
+ NADELETE(sortPool_, sql_buffer_pool, sortSpace_);
sortPool_ = NULL;
//Also delete and reallocate the space object from which the sortPool_ is
//allocated. This will really release the memory.
- delete sortSpace_;
+ NADELETE(sortSpace_, Space, sortHeap_);
sortSpace_ = NULL;
}
@@ -237,19 +237,19 @@ void ExSortTcb::deleteAndReallocateSortPool()
(sortSendPool_->get_number_of_buffers() - initialNumOfPoolBuffers_) *
sortTdb().bufferSize_);
- delete sortSendPool_;
+ NADELETE(sortSendPool_, ExSortBufferPool, sortSpace_);
sortSendPool_ = NULL;
receivePool_ = NULL;
//delete actual pool.
//if we are here, sortPool_ must be valid since we should not
//reach here if topNSort or partial sort.
- delete sortPool_;
+ NADELETE(sortPool_, sql_buffer_pool, sortSpace_);
sortPool_ = NULL;
//Also delete and reallocate the space object from which the pool was
//allocated. This will really release the memory.
- delete sortSpace_;
+ NADELETE(sortSpace_, Space, sortHeap_);
sortSpace_ = new(sortHeap_)Space(Space::EXECUTOR_SPACE, TRUE,(char*)"Sort Space reallocated");
sortSpace_->setParent(sortHeap_);
@@ -457,14 +457,9 @@ ExSortTcb::ExSortTcb(const ExSortTdb & sort_tdb,
ExSortTcb::~ExSortTcb()
{
freeResources();
- if (sortUtil_)
- delete sortUtil_;
-
- if (sortCfg_)
- delete sortCfg_;
-
+
if (sortHeap_)
- delete sortHeap_;
+ NADELETE(sortHeap_, NAHeap, getHeap());
if (nfDiags_)
nfDiags_->deallocate();
@@ -476,19 +471,25 @@ ExSortTcb::~ExSortTcb()
//
void ExSortTcb::freeResources()
{
+ if (sortUtil_)
+ NADELETE(sortUtil_, SortUtil, sortHeap_);
+
+ if (sortCfg_)
+ NADELETE(sortCfg_, SortUtilConfig, sortHeap_);
+
if (partialSortPool_)
{
- delete partialSortPool_;
+ NADELETE(partialSortPool_, sql_buffer_pool, sortSpace_);
partialSortPool_ = NULL;
}
if (sortPool_)
{
- delete sortPool_;
+ NADELETE(sortPool_, sql_buffer_pool, sortSpace_);
sortPool_ = NULL;
}
if (topNSortPool_)
{
- delete topNSortPool_;
+ NADELETE(topNSortPool_, ExSimpleSQLBuffer, sortSpace_);
topNSortPool_ = NULL;
}
@@ -498,19 +499,19 @@ void ExSortTcb::freeResources()
{
if(sortSendPool_ != receivePool_)
{
- delete sortSendPool_;
+ NADELETE(sortSendPool_, ExSortBufferPool, sortSpace_);
}
sortSendPool_ = NULL;
}
if (receivePool_)
{
- delete receivePool_;
+ NADELETE(receivePool_, ExSortBufferPool, sortSpace_);
receivePool_ = NULL;
}
if(sortSpace_)
{
- delete sortSpace_;
+ NADELETE(sortSpace_, Space, sortHeap_);
sortSpace_ = NULL;
}
[3/3] incubator-trafodion git commit: Merge [TRAFODION-1604] PR 1081
sort merge phase memory pool improvements
Posted by se...@apache.org.
Merge [TRAFODION-1604] PR 1081 sort merge phase memory pool improvements
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/bf3e8d08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/bf3e8d08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/bf3e8d08
Branch: refs/heads/master
Commit: bf3e8d083584c0017170b514ad6bd30a7b03a88b
Parents: 33a9005 5d00a87
Author: selvaganesang <se...@apache.org>
Authored: Mon May 8 23:00:06 2017 +0000
Committer: selvaganesang <se...@apache.org>
Committed: Mon May 8 23:00:06 2017 +0000
----------------------------------------------------------------------
core/sql/executor/ex_sort.cpp | 304 +++++++++++++++++++++++------------
core/sql/executor/ex_sort.h | 35 +++-
core/sql/sort/SortUtil.cpp | 98 ++++++-----
core/sql/sort/SortUtil.h | 6 +-
core/sql/sqlcomp/nadefaults.cpp | 2 +-
5 files changed, 294 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/bf3e8d08/core/sql/sqlcomp/nadefaults.cpp
----------------------------------------------------------------------