You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2019/10/09 17:08:11 UTC

[GitHub] [incubator-pinot] kishoreg commented on a change in pull request #4602: First pass of GROUP BY with ORDER BY support

kishoreg commented on a change in pull request #4602: First pass of GROUP BY with ORDER BY support
URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r324936447
 
 

 ##########
 File path: pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
 ##########
 @@ -101,37 +125,72 @@ public int size() {
 
   @Override
   public Iterator<Record> iterator() {
+    if (_sort && CollectionUtils.isNotEmpty(_orderBy)) {
+      List<Record> sortedList = new ArrayList<>(_lookupMap.values());
+      sortedList.sort(_orderByComparator);
+      return sortedList.iterator();
+    }
     return _lookupMap.values().iterator();
   }
 
   private void resize(int trimToSize) {
 
     if (_lookupMap.size() > trimToSize) {
+      long startTime = System.currentTimeMillis();
+
+      if (CollectionUtils.isNotEmpty(_orderBy)) {
+        // drop bottom
 
-      // make min heap of elements to evict
-      int heapSize = _lookupMap.size() - trimToSize;
-      PriorityQueue<Record> minHeap = new PriorityQueue<>(heapSize, _minHeapComparator);
-
-      for (Record record : _lookupMap.values()) {
-        if (minHeap.size() < heapSize) {
-          minHeap.offer(record);
-        } else {
-          Record peek = minHeap.peek();
-          if (minHeap.comparator().compare(record, peek) < 0) {
-            minHeap.poll();
+        // make min heap of elements to evict
+        int heapSize = _lookupMap.size() - trimToSize;
+        PriorityQueue<Record> minHeap = new PriorityQueue<>(heapSize, _minHeapComparator);
+
+        for (Record record : _lookupMap.values()) {
+          if (minHeap.size() < heapSize) {
             minHeap.offer(record);
+          } else {
+            Record peek = minHeap.peek();
+            if (minHeap.comparator().compare(record, peek) < 0) {
+              minHeap.poll();
+              minHeap.offer(record);
+            }
           }
         }
-      }
 
-      for (Record evictRecord : minHeap) {
-        _lookupMap.remove(evictRecord.getKey());
+        for (Record evictRecord : minHeap) {
+          _lookupMap.remove(evictRecord.getKey());
+        }
+      } else {
+        // drop randomly
+
+        int numRecordsToDrop = _lookupMap.size() - trimToSize;
+        for (Key evictKey : _lookupMap.keySet()) {
+          _lookupMap.remove(evictKey);
+          numRecordsToDrop --;
+          if (numRecordsToDrop == 0) {
+            break;
+          }
+        }
       }
+      long endTime = System.currentTimeMillis();
+      long timeElapsed = endTime - startTime;
+
+      _numResizes.increment();
+      _resizeTime.accumulate(timeElapsed);
     }
   }
 
   @Override
   public void finish() {
     resize(_maxCapacity);
+    long numResizes = _numResizes.sum();
+    long resizeTime = _resizeTime.get();
+    LOGGER.info("Num resizes : {}, Total time spent in resizing : {}, Avg resize time : {}", numResizes, resizeTime,
 
 Review comment:
   this will add an additional line per query, better to have make this debug and enable it in perf testing or even better is to emit a metric

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org