You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/08/22 18:28:07 UTC

[GitHub] Ben-Zvi closed pull request #1438: DRILL-6566: Reduce Hash Agg Batch size and estimate when low available memory

Ben-Zvi closed pull request #1438: DRILL-6566: Reduce Hash Agg Batch size and estimate when low available memory
URL: https://github.com/apache/drill/pull/1438
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index ba928ae8f2d..72837a89121 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -48,6 +48,7 @@
 import org.apache.drill.exec.physical.impl.common.Comparator;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -185,6 +186,23 @@ public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentConte
 
     // get the output batch size from config.
     int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+
+    // If needed - reduce the size to allow enough batches in the available memory
+    long memAvail = oContext.getAllocator().getLimit();
+    long minBatchesPerPartition = context.getOptions().getOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR);
+    long minBatchesNeeded = 2 * minBatchesPerPartition; // 2 - to cover overheads, etc.
+    boolean is2ndPhase = popConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2;
+    boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
+    if ( is2ndPhase && !fallbackEnabled ) {
+      minBatchesNeeded *= 2;  // 2nd phase (w/o fallback) needs at least 2 partitions
+    }
+    if ( configuredBatchSize > memAvail / minBatchesNeeded ) { // no cast - memAvail may be bigger than max-int
+      int reducedBatchSize = (int)(memAvail / minBatchesNeeded);
+      logger.trace("Reducing configured batch size from: {} to: {}, due to Mem limit: {}",
+        configuredBatchSize, reducedBatchSize, memAvail);
+      configuredBatchSize = reducedBatchSize;
+    }
+
     hashAggMemoryManager = new HashAggMemoryManager(configuredBatchSize);
     logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 1954c79a7e4..65ca82972a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -80,7 +80,7 @@
 import org.apache.drill.exec.vector.VariableWidthVector;
 
 import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_MASK;
-import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_SIZE;
+import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_ROW_COUNT;
 
 public abstract class HashAggTemplate implements HashAggregator {
   protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
@@ -562,9 +562,15 @@ private void updateEstMaxBatchSize(RecordBatch incoming) {
       else { estValuesRowWidth += fieldSize; }
     }
     // multiply by the max number of rows in a batch to get the final estimated max size
-    estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * MAX_BATCH_SIZE;
+    long estimatedMaxWidth = Math.max(estRowWidth, estInputRowWidth);
+    estMaxBatchSize = estimatedMaxWidth * MAX_BATCH_ROW_COUNT;
+    // estimated batch size should not exceed the configuration given size
+    int configuredBatchSize = outgoing.getRecordBatchMemoryManager().getOutputBatchSize();
+    estMaxBatchSize = Math.min(estMaxBatchSize, configuredBatchSize);
+    // work back the number of rows (may have been reduced from MAX_BATCH_ROW_COUNT)
+    long rowsInBatch = estMaxBatchSize / estimatedMaxWidth;
     // (When there are no aggr functions, use '1' as later code relies on this size being non-zero)
-    estValuesBatchSize = Math.max(estValuesRowWidth, 1) * MAX_BATCH_SIZE;
+    estValuesBatchSize = Math.max(estValuesRowWidth, 1) * rowsInBatch;
     estOutgoingAllocSize = estValuesBatchSize; // initially assume same size
 
     logger.trace("{} phase. Estimated internal row width: {} Values row width: {} batch size: {}  memory limit: {}  max column width: {}",
@@ -1490,13 +1496,13 @@ private void spillIfNeeded(int currentPartition, boolean forceSpill) {
     long maxMemoryNeeded = 0;
     if ( !forceSpill ) { // need to check the memory in order to decide
       // calculate the (max) new memory needed now; plan ahead for at least MIN batches
-      maxMemoryNeeded = minBatchesPerPartition * Math.max(1, plannedBatches) * (estMaxBatchSize + MAX_BATCH_SIZE * (4 + 4 /* links + hash-values */));
+      maxMemoryNeeded = minBatchesPerPartition * Math.max(1, plannedBatches) * (estMaxBatchSize + MAX_BATCH_ROW_COUNT * (4 + 4 /* links + hash-values */));
       // Add the (max) size of the current hash table, in case it will double
       int maxSize = 1;
       for (int insp = 0; insp < numPartitions; insp++) {
         maxSize = Math.max(maxSize, batchHolders[insp].size());
       }
-      maxMemoryNeeded += MAX_BATCH_SIZE * 2 * 2 * 4 * maxSize; // 2 - double, 2 - max when %50 full, 4 - Uint4
+      maxMemoryNeeded += MAX_BATCH_ROW_COUNT * 2 * 2 * 4 * maxSize; // 2 - double, 2 - max when %50 full, 4 - Uint4
 
       // log a detailed debug message explaining why a spill may be needed
       logger.trace("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to add to partition {} with {} batches. " + "Max memory needed {}, Est batch size {}, mem limit {}",
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 0bd6fe62437..477e30c9c07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -737,7 +737,7 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
     {
       // Initializing build calculator
       // Limit scope of these variables to this block
-      int maxBatchSize = firstCycle? RecordBatch.MAX_BATCH_SIZE: RECORDS_PER_BATCH;
+      int maxBatchSize = firstCycle? RecordBatch.MAX_BATCH_ROW_COUNT : RECORDS_PER_BATCH;
       boolean hasProbeData = leftUpstream != IterOutcome.NONE;
       boolean doMemoryCalculation = canSpill && hasProbeData;
       HashJoinMemoryCalculator calc = getCalculatorImpl();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
index a17ea2f7397..f804277c5d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelperSizeCalculatorImpl.java
@@ -34,7 +34,7 @@ public long calculateSize(HashJoinMemoryCalculator.PartitionStat partitionStat,
     Preconditions.checkArgument(!partitionStat.isSpilled());
 
     // Account for the size of the SV4 in a hash join helper
-    long joinHelperSize = IntVector.VALUE_WIDTH * RecordBatch.MAX_BATCH_SIZE;
+    long joinHelperSize = IntVector.VALUE_WIDTH * RecordBatch.MAX_BATCH_ROW_COUNT;
 
     // Account for the SV4 for each batch that holds links for each batch
     for (HashJoinMemoryCalculator.BatchStat batchStat: partitionStat.getInMemoryBatches()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
index a351cbcaf1c..2ab42e5a046 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -66,9 +66,9 @@ public BuildSidePartitioning next() {
       final HashTableSizeCalculator hashTableSizeCalculator;
 
       if (hashTableCalculatorType.equals(HashTableSizeCalculatorLeanImpl.TYPE)) {
-        hashTableSizeCalculator = new HashTableSizeCalculatorLeanImpl(RecordBatch.MAX_BATCH_SIZE, hashTableDoublingFactor);
+        hashTableSizeCalculator = new HashTableSizeCalculatorLeanImpl(RecordBatch.MAX_BATCH_ROW_COUNT, hashTableDoublingFactor);
       } else if (hashTableCalculatorType.equals(HashTableSizeCalculatorConservativeImpl.TYPE)) {
-        hashTableSizeCalculator = new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, hashTableDoublingFactor);
+        hashTableSizeCalculator = new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, hashTableDoublingFactor);
       } else {
         throw new IllegalArgumentException("Invalid calc type: " + hashTableCalculatorType);
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 378c9800632..88f4c7dfd26 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -307,12 +307,12 @@ public IterOutcome next() {
         }
         // It's legal for a batch to have zero field. For instance, a relational table could have
         // zero columns. Querying such table requires execution operator to process batch with 0 field.
-        if (incoming.getRecordCount() > MAX_BATCH_SIZE) {
+        if (incoming.getRecordCount() > MAX_BATCH_ROW_COUNT) {
           throw new IllegalStateException(
               String.format(
                   "Incoming batch [#%d, %s] has size %d, which is beyond the"
                   + " limit of %d",
-                  instNum, batchTypeName, incoming.getRecordCount(), MAX_BATCH_SIZE
+                  instNum, batchTypeName, incoming.getRecordCount(), MAX_BATCH_ROW_COUNT
                   ));
         }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 0f3f8a3434a..cf7763eca49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -512,7 +512,7 @@ public IterOutcome innerNext() {
             estimatedRecordSize += 50;
           }
         }
-        targetRecordCount = Math.min(MAX_BATCH_SIZE, Math.max(1, COPIER_BATCH_MEM_LIMIT / estimatedRecordSize));
+        targetRecordCount = Math.min(MAX_BATCH_ROW_COUNT, Math.max(1, COPIER_BATCH_MEM_LIMIT / estimatedRecordSize));
         int count = copier.next(targetRecordCount);
         container.buildSchema(SelectionVectorMode.NONE);
         container.setRecordCount(count);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index f0cab26c432..b44a3629211 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -38,8 +38,8 @@
  */
 public interface RecordBatch extends VectorAccessible {
 
-  /** max batch size, limited by 2-byte length in SV2: 65536 = 2^16 */
-  int MAX_BATCH_SIZE = ValueVector.MAX_ROW_COUNT;
+  /** max num of rows in a batch, limited by 2-byte length in SV2: 65536 = 2^16 */
+  int MAX_BATCH_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
 
   /**
    * Describes the outcome of incrementing RecordBatch forward by a call to
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
index ceebc811c0f..af943ecb7cc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestBuildSidePartitioningImpl.java
@@ -33,7 +33,7 @@ public void testSimpleReserveMemoryCalculationNoHash() {
     final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
       new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
         BatchSizePredictorImpl.Factory.INSTANCE,
-        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
         safetyFactor);
@@ -78,7 +78,7 @@ public void testSimpleReserveMemoryCalculationHash() {
     final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
       new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
         BatchSizePredictorImpl.Factory.INSTANCE,
-        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
         safetyFactor);
@@ -123,7 +123,7 @@ public void testAdjustInitialPartitions() {
     final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
       new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
         BatchSizePredictorImpl.Factory.INSTANCE,
-        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
         safetyFactor);
@@ -171,7 +171,7 @@ public void testHasDataProbeEmpty() {
     final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
       new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
         BatchSizePredictorImpl.Factory.INSTANCE,
-        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
         safetyFactor);
@@ -206,7 +206,7 @@ public void testNoProbeDataForStats() {
     final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
       new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
         BatchSizePredictorImpl.Factory.INSTANCE,
-        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
         safetyFactor);
@@ -252,7 +252,7 @@ public void testProbeEmpty() {
     final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
       new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
         BatchSizePredictorImpl.Factory.INSTANCE,
-        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
         safetyFactor);
@@ -298,7 +298,7 @@ public void testNoRoomInMemoryForBatch1() {
     final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
       new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
         BatchSizePredictorImpl.Factory.INSTANCE,
-        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
         safetyFactor);
@@ -349,7 +349,7 @@ public void testCompleteLifeCycle() {
     final HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl calc =
       new HashJoinMemoryCalculatorImpl.BuildSidePartitioningImpl(
         BatchSizePredictorImpl.Factory.INSTANCE,
-        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_SIZE, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
+        new HashTableSizeCalculatorConservativeImpl(RecordBatch.MAX_BATCH_ROW_COUNT, HashTableSizeCalculatorConservativeImpl.HASHTABLE_DOUBLING_FACTOR),
         HashJoinHelperSizeCalculatorImpl.INSTANCE,
         fragmentationFactor,
         safetyFactor);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
index ed25d788f49..b9ae58d9a95 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinHelperSizeCalculatorImpl.java
@@ -30,7 +30,7 @@ public void simpleCalculateSize() {
       ((long) TypeHelper.getSize(TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.INT).build()));
 
     // Account for the overhead of a selection vector
-    long expected = intSize * RecordBatch.MAX_BATCH_SIZE;
+    long expected = intSize * RecordBatch.MAX_BATCH_ROW_COUNT;
     // Account for sv4 vector for batches
     expected += intSize * 3500;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 6626176c423..36c9eecab75 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -1275,7 +1275,7 @@ public void testHandlingNonEmptyEMITAfterOK() throws Exception {
    * Temporary test to validate LATERAL handling output batch getting filled without consuming full output from left
    * and right batch join.
    * <p>
-   * For this test we are updating {@link LateralJoinBatch#MAX_BATCH_SIZE} by making it public, which might not expected
+   * For this test we are updating {@link LateralJoinBatch#MAX_BATCH_ROW_COUNT} by making it public, which might not expected
    * after including the BatchSizing logic
    * TODO: Update the test after incorporating the BatchSizing change.
    *
@@ -1943,7 +1943,7 @@ public void testMultiLevelLateral() throws Exception {
    * This test generates an operator tree for multi level LATERAL by stacking 2 LATERAL and finally an UNNEST pair
    * (using MockRecord Batch) as left and right child of lower level LATERAL. Then we call next() on top level
    * LATERAL to simulate the operator tree and compare the outcome and record count generated with expected values.
-   * This test also changes the MAX_BATCH_SIZE to simulate the output being produced in multiple batches.
+   * This test also changes the MAX_BATCH_ROW_COUNT to simulate the output being produced in multiple batches.
    * @throws Exception
    */
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services