You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by se...@apache.org on 2017/05/08 23:01:29 UTC

[1/3] incubator-trafodion git commit: [TRAFODION-2604] sort merge phase memory pool improvements

Repository: incubator-trafodion
Updated Branches:
  refs/heads/master 33a9005d2 -> bf3e8d083


[TRAFODION-2604] sort merge phase memory pool improvements


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/8a7deed6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/8a7deed6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/8a7deed6

Branch: refs/heads/master
Commit: 8a7deed640202084169091f89d320115f5a55aba
Parents: a9ec249
Author: Prashant Vasudev <pr...@esgyn.com>
Authored: Tue May 2 22:04:16 2017 +0000
Committer: Prashant Vasudev <pr...@esgyn.com>
Committed: Tue May 2 22:04:16 2017 +0000

----------------------------------------------------------------------
 core/sql/executor/ex_sort.cpp   | 283 +++++++++++++++++++++++------------
 core/sql/executor/ex_sort.h     |  35 ++++-
 core/sql/sort/SortUtil.cpp      |  98 +++++++-----
 core/sql/sort/SortUtil.h        |   6 +-
 core/sql/sqlcomp/nadefaults.cpp |   2 +-
 5 files changed, 283 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/8a7deed6/core/sql/executor/ex_sort.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ex_sort.cpp b/core/sql/executor/ex_sort.cpp
index b56cdce..d76124c 100644
--- a/core/sql/executor/ex_sort.cpp
+++ b/core/sql/executor/ex_sort.cpp
@@ -98,13 +98,41 @@ ExOperStats * ExSortTcb::doAllocateStatsEntry(CollHeap *heap,
 
 void ExSortTcb::setupPoolBuffers(ex_queue_entry *pentry_down)
 {
+  //In case of prepare once and execute many, if sort overflowed,
+  //sortSendPool_ is deallocated and receivePool_ is newly allocated
+  //when switching from sortSend to sortReceive. Note that if sort
+  //did not overflow, sortSendPool_ and receivePool_ are same.
+  //Here we need to reset these pools to start again.
+  if((sortSendPool_ == NULL) &&
+     (receivePool_ != NULL) &&  //pool reference
+     (sortPool_ != NULL))         //actual pool
+  {
+    //receivePool_ always allocated outside of quota system.
+    //so no need to adjust quota system especially when sortSendPool_ 
+    //and receivePool_ are not the same.
+    delete receivePool_;
+    receivePool_ = NULL;
+    
+    delete sortPool_;
+    sortPool_ = NULL;
+        
+    //Also delete and reallocate the space object from which the sortPool_ is 
+    //allocated. This will really release the memory.
+    delete sortSpace_;
+    sortSpace_ = NULL;
+  }
+  
   //if any of these pools is already allocated, most likely
   //from a prepare once execute many scenario, then no need 
   //to reallocate the pool again. Just return.
-  if(partialSortPool_ || topNSortPool_ || regularSortPool_)
+  if(partialSortPool_ || topNSortPool_ || sortPool_)
     return;
-  
-  CollHeap  *space = getGlobals()->getSpace();
+
+  if(!sortSpace_)
+  {
+    sortSpace_ = new (sortHeap_)Space(Space::EXECUTOR_SPACE, TRUE,(char*)"Sort Space setupPoolBuffers");
+    sortSpace_->setParent(sortHeap_);
+  }
   
   // Allocate the buffer pool.
   // Note that when memoryQuota system is enabled, we initialize the
@@ -114,64 +142,77 @@ void ExSortTcb::setupPoolBuffers(ex_queue_entry *pentry_down)
   // account the estimate number of rows by the compiler and limited by
   // maximum of GEN_SORT_MAX_BUFFER_SIZE. The memory quota system will
   // come into force for additional buffers following this initial buffer.
-  Lng32 numBuffs = (sortTdb().sortOptions_->memoryQuotaMB()<= 0)?sortTdb().numBuffers_:2;
+  initialNumOfPoolBuffers_ = sortTdb().numBuffers_;
   Lng32 numSortBuffs = 0;
 
   // need separate pools for sorting and saving result rows
   if (sortTdb().partialSort())
   {
      // give result pool and sort pool each half of the space
-     numSortBuffs = numBuffs = (numBuffs + 1)/2;
+     numSortBuffs = initialNumOfPoolBuffers_ = (initialNumOfPoolBuffers_ + 1)/2;
      if(numSortBuffs < 2) numSortBuffs = 2; //initialize the pool with atleast 2 buffers.
-     if(numBuffs < 2) numBuffs = 2;
+     if(initialNumOfPoolBuffers_ < 2) initialNumOfPoolBuffers_ = 2;
   }
 
-  //partial sort uses two pools. one partialSortPool_ and regularSortPool_
+  //partial sort uses two pools. one partialSortPool_ and sortPool_
   //partialSortPool_ will be used for receiving the sorted records.
   if (numSortBuffs > 0)
   {
-    partialSortPool_ = new(space) sql_buffer_pool(
-                          numSortBuffs, sortTdb().bufferSize_, space);
+    partialSortPool_ = new(sortSpace_) sql_buffer_pool(
+                          numSortBuffs, sortTdb().bufferSize_, sortSpace_);
   }
 
-  //setup sortPool_ reference handle. If TopN, topNSortPool_ will be allocated
-  //from ExSimpleSQLBuffer based on numBuffs. If not TopN, regularSortPool_ will
+  //setup sortSendPool_ reference handle. If TopN, topNSortPool_ will be allocated
+  //from ExSimpleSQLBuffer based on numBuffs. If not TopN, sortPool_ will
   //be allocated from sql_buffer_pool based on numBuffs.
-  //sortPool_ reference handle will either point to topNSortPool or 
-  //regularSortPool.
+  //sortSendPool_ reference handle will either point to topNSortPool or 
+  //sortPool_.
   if((pentry_down->downState.request == ex_queue::GET_N) &&
      (pentry_down->downState.requestValue > 0) &&
      (sortTdb().topNSortEnabled()))
   {
-    topNSortPool_ = new(space)
+    topNSortPool_ = new(sortSpace_)
                     ExSimpleSQLBuffer(pentry_down->downState.requestValue + 1,
-                        sortTdb().sortRecLen_, space);
+                        sortTdb().sortRecLen_, sortSpace_);
     
-    sortPool_ = new(space) ExSortBufferPool((void*)topNSortPool_, 
-                                          ExSortBufferPool::SIMPLE_BUFFER_TYPE);
+    sortSendPool_ = new(sortSpace_) ExSortBufferPool((void*)topNSortPool_, 
+                                          ExSortBufferPool::SIMPLE_BUFFER_TYPE,
+                                          bmoStats_);
   }
   else
   {
-    regularSortPool_ = new(space) sql_buffer_pool(numBuffs,sortTdb().bufferSize_,
-                                                  space);
+    sortPool_ = new(sortSpace_) sql_buffer_pool(initialNumOfPoolBuffers_,
+                                                sortTdb().bufferSize_,
+                                                sortSpace_);
     
-    sortPool_ = new(space)ExSortBufferPool((void*)regularSortPool_,
-                                            ExSortBufferPool::SQL_BUFFER_TYPE);
+    sortSendPool_ = new(sortSpace_)ExSortBufferPool((void*)sortPool_,
+                                            ExSortBufferPool::SQL_BUFFER_TYPE,
+                                            bmoStats_);
   }
    
   //setup the receive pool. Receive pool is the same as sortPool for all cases except
   //for partial sort.
   if(sortTdb().partialSort())
-    receivePool_ = new(space) ExSortBufferPool(partialSortPool_,
-                                            ExSortBufferPool::SQL_BUFFER_TYPE);
+  {
+    receivePool_ = new(sortSpace_) ExSortBufferPool(partialSortPool_,
+                                            ExSortBufferPool::SQL_BUFFER_TYPE,
+                                            bmoStats_);
+  }
   else
-    receivePool_ = sortPool_;
+  {
+    //Assume sort does not overflow to being with. 
+    //In this case, receivePool_ and sortSendpool_ are same.
+    //if overflow occured ( no overflow in topN case) then
+    //sortSendpool_( and actual sortPool_)is deleted and receivePool_
+    //is allocated new.
+    receivePool_ = sortSendPool_;
+  }
   
   //CIF defrag option only if NOT topNSortPool_
   defragTd_ = NULL;
   if (considerBufferDefrag() && (topNSortPool_ == NULL))
   {
-    defragTd_ = sortPool_->addDefragTuppDescriptor(sortTdb().sortRecLen_);
+    defragTd_ = sortSendPool_->addDefragTuppDescriptor(sortTdb().sortRecLen_);
   }
   
   if(bmoStats_)
@@ -182,6 +223,51 @@ void ExSortTcb::setupPoolBuffers(ex_queue_entry *pentry_down)
   
 }
 
+//This method is called only if sort overflowed and transitioning
+//from sortSend to sortReceive. SortPool_ is deallocated and space 
+//object is deallocated and reallocated minimum to reuse memory (quota)
+//for sort Receive.
+void ExSortTcb::deleteAndReallocateSortPool()
+{
+  //delete reference to sortPool_
+  ex_assert(sortSendPool_ == receivePool_, "sortSendPool_ != receivePool_");
+  
+  //initialNumOfPoolBuffers_ is allocated outside of quota system.
+  sortUtil_->returnConsumedMemoryQuota(
+      (sortSendPool_->get_number_of_buffers() - initialNumOfPoolBuffers_) * 
+       sortTdb().bufferSize_);
+  
+  delete sortSendPool_;
+  sortSendPool_ = NULL;
+  receivePool_ = NULL; 
+  
+  //delete actual pool.
+  //if we are here, sortPool_ must be valid since we should not
+  //reach here if topNSort or partial sort.
+  delete sortPool_;
+  sortPool_ = NULL;
+  
+  //Also delete and reallocate the space object from which the pool was 
+  //allocated. This will really release the memory.
+  delete sortSpace_;
+  sortSpace_ = new(sortHeap_)Space(Space::EXECUTOR_SPACE, TRUE,(char*)"Sort Space reallocated");
+  sortSpace_->setParent(sortHeap_);
+  
+          
+  //now allocate a pool and assign it to receivePool_ handle.
+  //Allocated outside of memory quota.
+  sortPool_ = new(sortSpace_) sql_buffer_pool(initialNumOfPoolBuffers_
+                                              ,sortTdb().bufferSize_,
+                                              sortSpace_);
+
+  receivePool_ = new(sortSpace_)ExSortBufferPool((void*)sortPool_,
+                                ExSortBufferPool::SQL_BUFFER_TYPE,
+                                bmoStats_);
+  
+  if (bmoStats_)
+    bmoStats_->setSpaceBufferCount(initialNumOfPoolBuffers_);
+}
+
 //
 // Build a sort tcb
 //
@@ -218,7 +304,8 @@ ExSortTcb::ExSortTcb(const ExSortTdb & sort_tdb,
   sortStats_ = NULL;
   childTcb_ = &child_tcb;
 
-  CollHeap * space = glob->getSpace();
+  //Create heap to be used by sort.
+  sortHeap_ = new(getHeap()) NAHeap("Sort Heap", (NAHeap *)getHeap(), 204800);
   
   // cast sort tdb to non-const
   ExSortTdb * st = (ExSortTdb *)&sort_tdb;
@@ -230,16 +317,18 @@ ExSortTcb::ExSortTcb(const ExSortTdb & sort_tdb,
   allocateParentQueues(qparent_);
  // Intialize processedInputs_ to the next request to process
   processedInputs_ = qparent_.down->getTailIndex();
-  workAtp_ = allocateAtp(sort_tdb.workCriDesc_, space);
-  workAtp_->getTupp(2) = new(space) tupp_descriptor();
+  workAtp_ = allocateAtp(sort_tdb.workCriDesc_, glob->getSpace());
+  workAtp_->getTupp(2) = new(glob->getSpace()) tupp_descriptor();
   
-  //buffer pools are allocated in SORT_PREP work phase.
+  //buffer pools are allocated from sortSpace_ in SORT_PREP work phase.
+  sortSpace_ = NULL;
   topNSortPool_ = NULL;
-  regularSortPool_ = NULL;
+  sortPool_ = NULL;
   partialSortPool_ = NULL;
+  initialNumOfPoolBuffers_ = 0;
   
   //pool reference handles. Initialized in SORT_PREP phase.
-  sortPool_ = NULL;
+  sortSendPool_ = NULL;
   receivePool_ = NULL;
   
   *(short *)&sortType_ = 0;
@@ -267,14 +356,11 @@ ExSortTcb::ExSortTcb(const ExSortTdb & sort_tdb,
         break;
   }   
   
-  sortUtil_ = new(space) SortUtil(sort_tdb.getExplainNodeId());
+  sortUtil_ = new(sortHeap_) SortUtil(sort_tdb.getExplainNodeId());
 
   sortDiag_ = NULL;
 
-  // Create heap to be used by sort.
-  sortHeap_ = new(space) NAHeap("Sort Heap", (NAHeap *)getHeap(), 204800);
-
-  sortCfg_ = new(space) SortUtilConfig(sortHeap_);
+  sortCfg_ = new(sortHeap_) SortUtilConfig(sortHeap_);
 
   sortCfg_->setSortType(sortType_);
   sortCfg_->setScratchThreshold(st->sortOptions_->scratchFreeSpaceThresholdPct());
@@ -395,29 +481,39 @@ void ExSortTcb::freeResources()
     delete partialSortPool_;
     partialSortPool_ = NULL;
   }
-  if (regularSortPool_)
+  if (sortPool_)
   {
-    delete regularSortPool_;
-    regularSortPool_ = NULL;
+    delete sortPool_;
+    sortPool_ = NULL;
   }
   if (topNSortPool_)
   {
     delete topNSortPool_;
     topNSortPool_ = NULL;
   }
-  if (sortPool_)
+  
+  //sortSendPool_ and receivePool_
+  //are ExSortBufferPool class objects.
+  if (sortSendPool_)
   {
-    if(sortPool_ != receivePool_)
+    if(sortSendPool_ != receivePool_)
     {
-      delete sortPool_;
+      delete sortSendPool_;
     }
-    sortPool_ = NULL;
+    sortSendPool_ = NULL;
   }
   if (receivePool_)
   {
     delete receivePool_;
     receivePool_ = NULL;
   }
+  
+  if(sortSpace_)
+  {
+    delete sortSpace_;
+    sortSpace_ = NULL;
+  }
+  
   delete qparent_.up;
   delete qparent_.down;
 };
@@ -676,8 +772,6 @@ short ExSortTcb::workUp()
           // LCOV_EXCL_STOP
 	case ExSortTcb::SORT_PREP:
 	  {
-	    sortHeap_->reInitialize();
-
 	    if ( sortDiag_ != NULL )
 	      {
                 // LCOV_EXCL_START
@@ -846,7 +940,22 @@ short ExSortTcb::workUp()
 	    if (qparent_.up->isFull()){
 	      return workStatus(WORK_OK); // parent queue is full. Just return
 	    }
-
+	    
+	    //First time reaching here and before calling
+	    //sortReceive, release the buffers used during
+	    //sortSend phase ONLY if sort overflowed( by this
+	    //time, all sort records are in scratch files).
+	    //Overflow does not happen in TopNSort. Partial sort
+	    //has a separate receive pool.
+	    if((sortSendPool_ != NULL) &&  //not yet released
+	       (!pstate.noOverflow_) &&    //overflow happened
+	       (!sortTdb().partialSort())) //not partial sort
+	    {
+	      
+	      deleteAndReallocateSortPool();
+	    }
+	        
+	    
 	    ex_queue_entry * pentry = qparent_.up->getTailEntry();
 	    rc = sortReceive(pentry_down, request, pentry, FALSE,
 			     pentry_down->downState.parentIndex,
@@ -948,7 +1057,7 @@ short ExSortTcb::sortSend(ex_queue_entry * srcEntry,
 	    td = NULL;
 
             if (defragTd_ && //considerBufferDefrag() && //resizeCifRecord() &&
-                !sortPool_->currentBufferHasEnoughSpace(sortTdb().sortRecLen_))
+                !sortSendPool_->currentBufferHasEnoughSpace(sortTdb().sortRecLen_))
             {
 #if defined(_DEBUG)
               assert(resizeCifRecord());
@@ -978,14 +1087,14 @@ short ExSortTcb::sortSend(ex_queue_entry * srcEntry,
                 {
                   defragLength = *rowLenPtr;
                   td =
-                    sortPool_->get_free_tupp_descriptor(defragLength + dataOffset, &buf);// do we need &buf here??
+                    sortSendPool_->get_free_tupp_descriptor(defragLength + dataOffset, &buf);// do we need &buf here??
                 }
                }
             }
             else
             {
               td =
-                sortPool_->get_free_tupp_descriptor(sortTdb().sortRecLen_, &buf);
+                sortSendPool_->get_free_tupp_descriptor(sortTdb().sortRecLen_, &buf);
             }
 
 
@@ -996,12 +1105,12 @@ short ExSortTcb::sortSend(ex_queue_entry * srcEntry,
 		if (sortTdb().sortOptions_->dontOverflow())
 		  {
                     // LCOV_EXCL_START
-		  sortPool_->addBuffer(sortTdb().bufferSize_);
+		  sortSendPool_->addBuffer(sortTdb().bufferSize_);
                     // LCOV_EXCL_STOP
 		  }
 		// add more buffers if there is more space 
 		//available in the pool.
-		else if (sortPool_->get_number_of_buffers() < 
+		else if (sortSendPool_->get_number_of_buffers() < 
 			 sortTdb().maxNumBuffers_)
 		  {
 		    //No more space in the pool to allocate sorted rows.
@@ -1013,7 +1122,7 @@ short ExSortTcb::sortSend(ex_queue_entry * srcEntry,
 		       sortUtil_->consumeMemoryQuota(sortTdb().bufferSize_))
 		      {
 			// Add a new buffer.
-                        sortPool_->addBuffer(sortTdb().bufferSize_);
+		      sortSendPool_->addBuffer(sortTdb().bufferSize_);
 		      }
 		    else 
 		      {
@@ -1061,41 +1170,19 @@ short ExSortTcb::sortSend(ex_queue_entry * srcEntry,
 		// have been added or tupples freed because of overflow
 		// completion.
 		td =
-		  sortPool_->get_free_tupp_descriptor(sortTdb().sortRecLen_, &buf);
+		  sortSendPool_->get_free_tupp_descriptor(sortTdb().sortRecLen_, &buf);
 
-		if (td == NULL)
-		  {
-		    // This is a bad situation where executor does not
-		    // have enough space to proceed giving rows to Sort
-		    // Add another buffer to the dynamic buffer list
-		    // after increasing maxNumBuffers (which is set at
-		    // compile time)
-		    
-		    // Increase the max number of buffers in the 
-		    // dynamic array list
-		    // LCOV_EXCL_START
-		    sortPool_->set_max_number_of_buffers
-		      (sortPool_->get_max_number_of_buffers() +1);
-		    
-		    sortPool_->addBuffer(sortPool_->defaultBufferSize());
-		    
-		    // allocate the tuple yet again.
-		    td =
-		      sortPool_->get_free_tupp_descriptor(sortTdb().sortRecLen_, 
-						      &buf); 
-		    
-		    if (td == NULL)
-		      {
+    if (td == NULL)
+    {
 			// This is definitely a problem
-			ex_assert(0,"Must have space to allocate tuple in newly added buffer");	  
+			ex_assert(0,"Must get a tuple from pool as they must be available");	  
 			step = ExSortTcb::SORT_ERROR;
 			break;
-                       // LCOV_EXCL_STOP
-		      } 
-		  }
-	      }
-	    if (bmoStats_)
-	      bmoStats_->setSpaceBufferCount(sortPool_->get_number_of_buffers());
+      // LCOV_EXCL_STOP
+		} 
+		  
+	  }
+	  //reaching here td is not NULL.
 	  }
 	else
 	  {
@@ -1105,7 +1192,7 @@ short ExSortTcb::sortSend(ex_queue_entry * srcEntry,
 	    allocatedTuppDesc = NULL;
             // LCOV_EXCL_STOP
 	  }
-
+	  //reaching here td is not NULL.
         ex_expr::exp_return_type retCode = ex_expr::EXPR_OK;
         char *dataPointer = td->getTupleAddress();
 
@@ -1433,13 +1520,26 @@ short ExSortTcb::sortReceive(ex_queue_entry * pentry_down,
 
           td = receivePool_->get_free_tupp_descriptor(sortTdb().sortRecLen_, 
 						      &buf);
-          if(td == NULL)
+      if(td == NULL)
 	    {
-	      // no more space in the pool to allocate sorted rows from.
-	      // Return and come back later when some space gets freed up.
-              // LCOV_EXCL_START
-	      workRC = WORK_POOL_BLOCKED;
-	      return workStatus(1);
+        //if sortSendPool_ is NULL, means there is option to 
+        //try and add additional buffers as long as upqueue is not full.
+        //Upqueue will drive addition of buffers(sortSend is not called
+        //if upQueue is full), assumption is very few buffers.
+        //Add buffer outside of memory quota.
+        receivePool_->addBuffer(sortTdb().bufferSize_);
+        
+        //try getting a tupp now.
+        td = receivePool_->get_free_tupp_descriptor(sortTdb().sortRecLen_, 
+            &buf);
+        if(td == NULL)
+        {
+          // no more space in the pool to allocate sorted rows from.
+          //Return and come back later when some space gets freed up.
+          // LCOV_EXCL_START
+          workRC = WORK_POOL_BLOCKED;
+          return workStatus(1);
+        }
               // LCOV_EXCL_STOP
 	    }
           tgtEntry->getAtp()->getTupp(sortTdb().tuppIndex_) = td;  
@@ -1752,7 +1852,6 @@ short ExSortTcb::done(
   step = ExSortTcb::SORT_EMPTY;
 
   sortUtil_->sortEnd();
-  sortHeap_->reInitialize();
   if(setCompareTd_)
     {
       // LCOV_EXCL_START
@@ -1980,8 +2079,6 @@ short ExSortFromTopTcb::work()
 
 	case ExSortTcb::SORT_PREP:
 	  {
-	    sortHeap_->reInitialize();
-
 	    if ( sortDiag_ != NULL )
 	      {
                 // LCOV_EXCL_START

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/8a7deed6/core/sql/executor/ex_sort.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/ex_sort.h b/core/sql/executor/ex_sort.h
index 72229cd..0e435f3 100644
--- a/core/sql/executor/ex_sort.h
+++ b/core/sql/executor/ex_sort.h
@@ -46,6 +46,7 @@
 #include "NABoolean.h"
 #include "ComTdbSort.h"
 #include "ExSimpleSqlBuffer.h"
+#include "ExStats.h"
 
 // -----------------------------------------------------------------------
 // Classes defined in this file
@@ -136,7 +137,8 @@ public:
     SORT_CANCEL_CHILD,
     SORT_DONE_WITH_QND
     };
-  
+private:
+  void deleteAndReallocateSortPool();
 protected:
   friend class   ExSortTdb;
   friend class   ExSortPrivateState;
@@ -155,17 +157,28 @@ protected:
   ExSubtask *ioEventHandler_;  
   // this heap is used by sort.
   NAHeap         * sortHeap_;
+  
+  // this space is used for sort pools.
+  Space          * sortSpace_;
 
   // store a sort error in case up queue is full we still have a handle on it
   ComDiagsArea   * sortDiag_;
   NAList<ComDiagsArea *>  *nfDiags_;
 
   sql_buffer_pool *partialSortPool_;   //used exclusively by partial sort
-  sql_buffer_pool *regularSortPool_;   //used by regular sort and partial sort.
+  sql_buffer_pool *sortPool_;          //used by regular sort and partial sort.
   ExSimpleSQLBuffer *topNSortPool_;    //used only by topNSort
-  
 
-  ExSortBufferPool *sortPool_;     // pool reference handle used for sorting.
+  //sortPool_ or partialSortPool_ initial number of 
+  //buffers used outside of memoryQuota.
+  Lng32 initialNumOfPoolBuffers_;
+  
+  //sortSendPool_ and receivepool_ are generic handles
+  //that point to the actual pool like partialSortPool_ or 
+  //sortPool_ or topNSortPool_. 
+  //If sort does not overflow, sortSendPool_ and receivePool_ 
+  //point to the same pools.
+  ExSortBufferPool *sortSendPool_;     // pool reference handle used for sorting.
   ExSortBufferPool *receivePool_;  // pool reference handle used for receiving sorted rows
 
 
@@ -371,8 +384,9 @@ class ExSortBufferPool : public NABasicObject
     SQL_BUFFER_TYPE
      };
 
-    ExSortBufferPool(void *pool, PoolType poolType)
+    ExSortBufferPool(void *pool, PoolType poolType, ExBMOStats *bmoStats)
     {
+      bmoStats_ = bmoStats;
       if(poolType == SIMPLE_BUFFER_TYPE)
       {
         simplePool_ = (ExSimpleSQLBuffer *)pool;
@@ -416,10 +430,14 @@ class ExSortBufferPool : public NABasicObject
     
     SqlBufferBase * addBuffer(Lng32 totalBufferSize, bool failureIsFatal = true)
     {
+      SqlBufferBase * buf = NULL;
       if (sqlBufferPool_)
-        return sqlBufferPool_->addBuffer(totalBufferSize, failureIsFatal);
-      else
-        return NULL; 
+      {
+        SqlBufferBase * buf = sqlBufferPool_->addBuffer(totalBufferSize, failureIsFatal);
+        if (bmoStats_)
+          bmoStats_->setSpaceBufferCount(sqlBufferPool_->get_number_of_buffers());
+      }
+      return buf;
     }
     
     inline Lng32 get_number_of_buffers() const
@@ -475,6 +493,7 @@ class ExSortBufferPool : public NABasicObject
     PoolType poolType_;
     ExSimpleSQLBuffer *simplePool_;
     sql_buffer_pool   *sqlBufferPool_;
+    ExBMOStats *bmoStats_;
     
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/8a7deed6/core/sql/sort/SortUtil.cpp
----------------------------------------------------------------------
diff --git a/core/sql/sort/SortUtil.cpp b/core/sql/sort/SortUtil.cpp
index 74bb2c8..08573da 100644
--- a/core/sql/sort/SortUtil.cpp
+++ b/core/sql/sort/SortUtil.cpp
@@ -63,12 +63,15 @@ SortUtil::SortUtil(Lng32 explainNodeId) :
   version_             = 1;
   state_               = SORT_INIT;
   config_              = NULL;
+  internalSort_        = TRUE;
+  sortReceivePrepared_ = FALSE;
   sortAlgo_            = NULL;
   scratch_             = NULL;
   memoryQuotaUtil_     = 0;
   memMonitor_          = NULL;
   overheadPerRecord_   = 0;
   bmoStats_ = NULL;
+  
 }
 
 SortUtil::~SortUtil(void)
@@ -83,6 +86,21 @@ SortUtil::~SortUtil(void)
  }
 }
 
+void SortUtil::reInit()
+{
+  doCleanUp();
+  
+  version_             = 1;
+  state_               = SORT_INIT;
+  internalSort_        = TRUE;
+  sortReceivePrepared_ = FALSE;
+  sortAlgo_            = NULL;
+  scratch_             = NULL;
+  memoryQuotaUtil_     = 0;
+  memMonitor_          = NULL;
+  overheadPerRecord_   = 0;
+}
+
 void SortUtil::DeleteSortAlgo() //delete Tree or Qsort after each merge
 {
  if (sortAlgo_ != NULL) { //delete indirect space in Tree, Qsort
@@ -157,7 +175,7 @@ NABoolean SortUtil::sortInitialize(SortUtilConfig& config, ULng32 topNSize)
   // Basically we delete any memory that was allocated dynamically
   // but was not yet released.  Also, the sortError is reset.
   //---------------------------------------------------------------
-  doCleanUp();
+  reInit();
   
   //if topNSize_ is set, then use TopN.
   if(config.topNSort_ && topNSize)
@@ -238,6 +256,10 @@ NABoolean SortUtil::sortEnd(void)
   //gets used.
   returnExcessMemoryQuota(overheadPerRecord_);
 
+  //update heap one final time.
+  if (bmoStats_)
+    bmoStats_->updateBMOHeapUsage((NAHeap *)config_->heapAddr_);
+    
 //  SQLMXLoggingArea::logExecRtInfo(NULL,0,"Sort operation has ended", explainNodeId_);
   return SORT_SUCCESS;
 }
@@ -304,33 +326,25 @@ Lng32 SortUtil::sortSendEnd(NABoolean& internalSort)
 
   retcode =   sortAlgo_->sortSendEnd() ;
 
-  if (retcode)
-   return retcode;
-
   if (sortAlgo_->isInternalSort()) 
+      internalSort_ = internalSort = TRUE_L;
+  else
+      internalSort_ = internalSort = FALSE_L;
+  
+  if(config_->logInfoEvent())
   {
-    internalSort = TRUE_L;
-    if(config_->logInfoEvent())
-    {
-      char msg[500];
-      str_sprintf(msg,
-      "Sort is performing internal sort: NumRecs:%d", stats_.numRecs_);
-      
-      SQLMXLoggingArea::logExecRtInfo(NULL, 0,msg, explainNodeId_);
-    }
-  }
-  else 
-  {
-    internalSort = FALSE_L;
-    retcode =  sortSendEndProcessing() ; 
-    return retcode;
+    char msg[500];
+    str_sprintf(msg,
+    "Sort is performing %s sort: NumRecs:%d",
+        internalSort? "internal":"external", stats_.numRecs_);
+    SQLMXLoggingArea::logExecRtInfo(NULL, 0,msg, explainNodeId_);
   }
-
+  
   return retcode;
 }
 
 //----------------------------------------------------------------------
-// Name         : sortSendEndProcessing
+// Name         : sortReceivePrepare
 // 
 // Parameters   : ...
 //
@@ -341,7 +355,7 @@ Lng32 SortUtil::sortSendEnd(NABoolean& internalSort)
 //   SORT_FAILURE if any error encounterd. 
 //
 //----------------------------------------------------------------------
-Lng32 SortUtil::sortSendEndProcessing(void)
+Lng32 SortUtil::sortReceivePrepare(void)
 {
   ULng32 initialRunSize = 0;
   Lng32 runnum = 1L;
@@ -490,6 +504,10 @@ Lng32 SortUtil::sortSendEndProcessing(void)
     }
   }
 
+  //update heap usage after allocating merge buffers.
+  if (bmoStats_)
+    bmoStats_->updateBMOHeapUsage((NAHeap *)config_->heapAddr_);
+    
   //This is the result merge order. Set this in stats_.mergeOrder_.
   stats_.mergeOrder_ = config_->mergeOrder_;
 
@@ -566,7 +584,11 @@ Lng32 SortUtil::sortSendEndProcessing(void)
           }
         }
       }
-
+     
+     //after first merge order update heap usage.
+     if (bmoStats_)
+       bmoStats_->updateBMOHeapUsage((NAHeap *)config_->heapAddr_);
+       
       //subsequent merges in loop.
       for (Int32 i = 0; i < stats_.numInterPasses_; i++)
       {   
@@ -661,10 +683,6 @@ Lng32 SortUtil::sortSendEndProcessing(void)
   state_ = SORT_FINAL_MERGE;
 
   stats_.scrNumBlocks_ = scratch_->getTotalNumOfScrBlocks();
-  ScratchFileMap* tempFilesMap;
-  tempFilesMap = scratch_->getScrFilesMap();
-  //stats_.scrNumWrites_ = tempFilesMap->totalNumOfWrites();
-  //stats_.scrNumAwaitio_ = tempFilesMap->totalNumOfAwaitio();  
   scratch_->getTotalIoWaitTime(stats_.ioWaitTime_);
   stats_.memSizeB_ += stats_.finalMergeOrder_*stats_.scrBlockSize_*2 +
                      stats_.finalMergeOrder_*sizeof(TreeNode) +
@@ -697,7 +715,11 @@ Lng32 SortUtil::sortSendEndProcessing(void)
      return SORT_FAILURE;
 
   state_ = SORT_RECEIVE;
-
+  
+  //final merge tree setup. update heap usage.
+  if (bmoStats_)
+    bmoStats_->updateBMOHeapUsage((NAHeap *)config_->heapAddr_);
+    
  return SORT_SUCCESS;
 
 }
@@ -718,14 +740,20 @@ Lng32 SortUtil::sortSendEndProcessing(void)
 Lng32 SortUtil::sortReceive(void* record, ULng32& len)
 {
   Lng32 status;
+  
+  if(!internalSort_ && !sortReceivePrepared_)
+  {
+      Lng32 retCode = sortReceivePrepare();
+      if(retCode != SORT_SUCCESS)
+          return retCode;
+      
+      sortReceivePrepared_ = TRUE;
+  }
+  
   status = sortAlgo_->sortReceive(record, len);
   if ((len == 0) && (!config_->partialSort_)) {
     if(scratch_)
      {
-        ScratchFileMap* tempFilesMap;
-        tempFilesMap = scratch_->getScrFilesMap();
-        //stats_.scrNumReads_ = tempFilesMap->totalNumOfReads();
-        //stats_.scrNumAwaitio_ = tempFilesMap->totalNumOfAwaitio();
         scratch_->getTotalIoWaitTime(stats_.ioWaitTime_);    
      }
     stats_.numCompares_ += sortAlgo_->getNumOfCompares();
@@ -760,10 +788,6 @@ Lng32 SortUtil::sortReceive(void*& record,ULng32& len,void*& tupp)
   if ((len == 0) && (!config_->partialSort_)) {
     if(scratch_)
      {
-      ScratchFileMap* tempFilesMap;
-      tempFilesMap = scratch_->getScrFilesMap();
-      //stats_.scrNumReads_ = tempFilesMap->totalNumOfReads();
-      //stats_.scrNumAwaitio_ = tempFilesMap->totalNumOfAwaitio();      
       scratch_->getTotalIoWaitTime(stats_.ioWaitTime_);    
      }
     stats_.numCompares_ += sortAlgo_->getNumOfCompares();  
@@ -771,7 +795,7 @@ Lng32 SortUtil::sortReceive(void*& record,ULng32& len,void*& tupp)
     stats_.elapsedTime_ = currentTimeJ - stats_.beginSortTime_; 
     if (config_->logInfoEvent()) {
       char msg[500];
-      str_sprintf(msg, "Sort elapsed time : %Ld; Num runs : %d; runsize :%d",
+      str_sprintf(msg, "Internal sort performed. Sort elapsed time : %Ld; Num runs : %d; runsize :%d",
 		stats_.elapsedTime_,stats_.numInitRuns_,sortAlgo_->getRunSize());
       SQLMXLoggingArea::logExecRtInfo(NULL, 0,msg, explainNodeId_);
     }

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/8a7deed6/core/sql/sort/SortUtil.h
----------------------------------------------------------------------
diff --git a/core/sql/sort/SortUtil.h b/core/sql/sort/SortUtil.h
index 82b86f7..47af5ca 100644
--- a/core/sql/sort/SortUtil.h
+++ b/core/sql/sort/SortUtil.h
@@ -105,13 +105,15 @@ public:
   UInt32 estimateMemoryToAvoidIntMerge(UInt32 numruns, Int32 sortMergeBlocksPerBuffer);
   UInt32 estimateMergeOrder(UInt32 maxMergeMemory, Int32 sortMergeBlocksPerBuffer);
 protected :
-  Lng32 sortSendEndProcessing();
+  Lng32 sortReceivePrepare();
 
 private:
-  
+  void reInit();
   short    version_;     // The ArkSort version
   SORT_STATE state_;      
   SortUtilConfig *config_;
+  NABoolean internalSort_; //indicates if overflowed or not.
+  NABoolean sortReceivePrepared_; //if sort overflowed, prepare the merge tree for receive.
   SortAlgo *sortAlgo_;   // Algorithms are implemented as sub-classes
                          // of  Sort algorithm base class.      
                          // This implementation  allows extensibility as

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/8a7deed6/core/sql/sqlcomp/nadefaults.cpp
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp
index 19fe1ac..4048be2 100644
--- a/core/sql/sqlcomp/nadefaults.cpp
+++ b/core/sql/sqlcomp/nadefaults.cpp
@@ -1641,7 +1641,7 @@ SDDkwd__(EXE_DIAGNOSTIC_EVENTS,		"OFF"),
   DDui1__(GEN_SORT_MAX_BUFFER_SIZE,		"5242880"),
   DDui1__(GEN_SORT_MAX_NUM_BUFFERS,             "160"),
   DDui___(GEN_SORT_MIN_BUFFER_SIZE,		"0"),
-  DDui1__(GEN_SORT_NUM_BUFFERS,			"4"),
+  DDui1__(GEN_SORT_NUM_BUFFERS,			"2"),
   DDui1__(GEN_SORT_SIZE_DOWN,			"2"),
   DDui1__(GEN_SORT_SIZE_UP,			"1024"),
   DDkwd__(GEN_SORT_TOPN,		        "ON"),


[2/3] incubator-trafodion git commit: fix to address review comments. NADELETE used where needed.

Posted by se...@apache.org.
fix to address review comments. NADELETE used where needed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/5d00a871
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/5d00a871
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/5d00a871

Branch: refs/heads/master
Commit: 5d00a87195005ac170aa4ba1be69d7a2571b3b24
Parents: 8a7deed
Author: Prashant Vasudev <pr...@esgyn.com>
Authored: Fri May 5 23:25:43 2017 +0000
Committer: Prashant Vasudev <pr...@esgyn.com>
Committed: Fri May 5 23:25:43 2017 +0000

----------------------------------------------------------------------
 core/sql/executor/ex_sort.cpp | 39 +++++++++++++++++++-------------------
 1 file changed, 20 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5d00a871/core/sql/executor/ex_sort.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ex_sort.cpp b/core/sql/executor/ex_sort.cpp
index d76124c..f631b45 100644
--- a/core/sql/executor/ex_sort.cpp
+++ b/core/sql/executor/ex_sort.cpp
@@ -110,15 +110,15 @@ void ExSortTcb::setupPoolBuffers(ex_queue_entry *pentry_down)
     //receivePool_ always allocated outside of quota system.
     //so no need to adjust quota system especially when sortSendPool_ 
     //and receivePool_ are not the same.
-    delete receivePool_;
+    NADELETE(receivePool_, ExSortBufferPool, sortSpace_);
     receivePool_ = NULL;
     
-    delete sortPool_;
+    NADELETE(sortPool_, sql_buffer_pool, sortSpace_);
     sortPool_ = NULL;
         
     //Also delete and reallocate the space object from which the sortPool_ is 
     //allocated. This will really release the memory.
-    delete sortSpace_;
+    NADELETE(sortSpace_, Space, sortHeap_);
     sortSpace_ = NULL;
   }
   
@@ -237,19 +237,19 @@ void ExSortTcb::deleteAndReallocateSortPool()
       (sortSendPool_->get_number_of_buffers() - initialNumOfPoolBuffers_) * 
        sortTdb().bufferSize_);
   
-  delete sortSendPool_;
+  NADELETE(sortSendPool_, ExSortBufferPool, sortSpace_);
   sortSendPool_ = NULL;
   receivePool_ = NULL; 
   
   //delete actual pool.
   //if we are here, sortPool_ must be valid since we should not
   //reach here if topNSort or partial sort.
-  delete sortPool_;
+  NADELETE(sortPool_, sql_buffer_pool, sortSpace_);
   sortPool_ = NULL;
   
   //Also delete and reallocate the space object from which the pool was 
   //allocated. This will really release the memory.
-  delete sortSpace_;
+  NADELETE(sortSpace_, Space, sortHeap_);
   sortSpace_ = new(sortHeap_)Space(Space::EXECUTOR_SPACE, TRUE,(char*)"Sort Space reallocated");
   sortSpace_->setParent(sortHeap_);
   
@@ -457,14 +457,9 @@ ExSortTcb::ExSortTcb(const ExSortTdb & sort_tdb,
 ExSortTcb::~ExSortTcb()
 {
   freeResources();
-  if (sortUtil_)
-    delete sortUtil_;
-
-  if (sortCfg_)
-    delete sortCfg_;
-
+  
   if (sortHeap_)
-    delete sortHeap_;
+    NADELETE(sortHeap_, NAHeap, getHeap());
  
   if (nfDiags_)
      nfDiags_->deallocate();
@@ -476,19 +471,25 @@ ExSortTcb::~ExSortTcb()
 //
 void ExSortTcb::freeResources()
 {
+  if (sortUtil_)
+    NADELETE(sortUtil_, SortUtil, sortHeap_);
+
+  if (sortCfg_)
+    NADELETE(sortCfg_, SortUtilConfig, sortHeap_);
+
   if (partialSortPool_)
   {
-    delete partialSortPool_;
+    NADELETE(partialSortPool_, sql_buffer_pool, sortSpace_);
     partialSortPool_ = NULL;
   }
   if (sortPool_)
   {
-    delete sortPool_;
+    NADELETE(sortPool_, sql_buffer_pool, sortSpace_);
     sortPool_ = NULL;
   }
   if (topNSortPool_)
   {
-    delete topNSortPool_;
+    NADELETE(topNSortPool_, ExSimpleSQLBuffer, sortSpace_);
     topNSortPool_ = NULL;
   }
   
@@ -498,19 +499,19 @@ void ExSortTcb::freeResources()
   {
     if(sortSendPool_ != receivePool_)
     {
-      delete sortSendPool_;
+      NADELETE(sortSendPool_, ExSortBufferPool, sortSpace_);
     }
     sortSendPool_ = NULL;
   }
   if (receivePool_)
   {
-    delete receivePool_;
+    NADELETE(receivePool_, ExSortBufferPool, sortSpace_);
     receivePool_ = NULL;
   }
   
   if(sortSpace_)
   {
-    delete sortSpace_;
+    NADELETE(sortSpace_, Space, sortHeap_);
     sortSpace_ = NULL;
   }
   


[3/3] incubator-trafodion git commit: Merge [TRAFODION-1604] PR 1081 sort merge phase memory pool improvements

Posted by se...@apache.org.
Merge [TRAFODION-1604] PR 1081 sort merge phase memory pool improvements


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/bf3e8d08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/bf3e8d08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/bf3e8d08

Branch: refs/heads/master
Commit: bf3e8d083584c0017170b514ad6bd30a7b03a88b
Parents: 33a9005 5d00a87
Author: selvaganesang <se...@apache.org>
Authored: Mon May 8 23:00:06 2017 +0000
Committer: selvaganesang <se...@apache.org>
Committed: Mon May 8 23:00:06 2017 +0000

----------------------------------------------------------------------
 core/sql/executor/ex_sort.cpp   | 304 +++++++++++++++++++++++------------
 core/sql/executor/ex_sort.h     |  35 +++-
 core/sql/sort/SortUtil.cpp      |  98 ++++++-----
 core/sql/sort/SortUtil.h        |   6 +-
 core/sql/sqlcomp/nadefaults.cpp |   2 +-
 5 files changed, 294 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/bf3e8d08/core/sql/sqlcomp/nadefaults.cpp
----------------------------------------------------------------------