You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ka...@apache.org on 2023/03/06 12:30:43 UTC

[druid] branch master updated: Suggested memory calculation in case NOT_ENOUGH_MEMORY_FAULT is thrown. (#13846)

This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 94cfabea18 Suggested memory calculation in case NOT_ENOUGH_MEMORY_FAULT is thrown. (#13846)
94cfabea18 is described below

commit 94cfabea1822eaadb9b36e700645d729a57a250e
Author: Karan Kumar <ka...@gmail.com>
AuthorDate: Mon Mar 6 18:00:36 2023 +0530

    Suggested memory calculation in case NOT_ENOUGH_MEMORY_FAULT is thrown. (#13846)
    
    * Suggested memory calculation in case NOT_ENOUGH_MEMORY_FAULT is thrown.
    
    Co-authored-by: Charles Smith <te...@gmail.com>
---
 docs/multi-stage-query/reference.md                |  2 +-
 .../druid/msq/exec/WorkerMemoryParameters.java     | 90 +++++++++++++++++++---
 .../msq/indexing/error/NotEnoughMemoryFault.java   | 39 +++++++---
 .../druid/msq/exec/WorkerMemoryParametersTest.java | 62 +++++++++------
 .../msq/indexing/error/MSQFaultSerdeTest.java      |  2 +-
 .../druid/msq/test/CalciteSelectQueryTestMSQ.java  |  4 +-
 .../org/apache/druid/msq/test/MSQTestBase.java     |  4 +-
 .../org/apache/druid/server/QueryResultPusher.java |  2 +-
 8 files changed, 155 insertions(+), 50 deletions(-)

diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md
index de95b56923..5c05b68bf3 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -690,7 +690,7 @@ The following table describes error codes you may encounter in the `multiStageQu
 | <a name="error_TooManyColumns">`TooManyColumns`</a> | Exceeded the maximum number of columns for a stage (2,000 columns). | `numColumns`: The number of columns requested.<br /><br />`maxColumns`: The limit on columns which was exceeded. |
 | <a name="error_TooManyWarnings">`TooManyWarnings`</a> | Exceeded the maximum allowed number of warnings of a particular type. | `rootErrorCode`: The error code corresponding to the exception that exceeded the required limit. <br /><br />`maxWarnings`: Maximum number of warnings that are allowed for the corresponding `rootErrorCode`. |
 | <a name="error_TooManyWorkers">`TooManyWorkers`</a> | Exceeded the maximum number of simultaneously-running workers. See the [Limits](#limits) table for more details. | `workers`: The number of simultaneously running workers that exceeded a hard or soft limit. This may be larger than the number of workers in any one stage if multiple stages are running simultaneously. <br /><br />`maxWorkers`: The hard or soft limit on workers that was exceeded. If this is lower than the hard limit (1, [...]
-| <a name="error_NotEnoughMemory">`NotEnoughMemory`</a> | Insufficient memory to launch a stage. | `serverMemory`: The amount of memory available to a single process.<br /><br />`serverWorkers`: The number of workers running in a single process.<br /><br />`serverThreads`: The number of threads in a single process. |
+| <a name="error_NotEnoughMemory">`NotEnoughMemory`</a> | Insufficient memory to launch a stage. | `suggestedServerMemory`: Suggested number of bytes of memory to allocate to a given process. <br /><br />`serverMemory`: The number of bytes of memory available to a single process.<br /><br />`usableMemory`: The number of usable bytes of memory for a single process.<br /><br />`serverWorkers`: The number of workers running in a single process.<br /><br />`serverThreads`: The number of thre [...]
 | <a name="error_WorkerFailed">`WorkerFailed`</a> | A worker task failed unexpectedly. | `errorMsg`<br /><br />`workerTaskId`: The ID of the worker task. |
 | <a name="error_WorkerRpcFailed">`WorkerRpcFailed`</a> | A remote procedure call to a worker task failed and could not recover. | `workerTaskId`: the id of the worker task |
 | <a name="error_UnknownError">`UnknownError`</a> | All other errors. | `message` |
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
index fe812e2a29..4c038f921d 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.msq.exec;
 
+import com.google.common.base.Preconditions;
 import com.google.common.primitives.Ints;
 import com.google.inject.Injector;
 import it.unimi.dsi.fastutil.ints.IntSet;
@@ -128,6 +129,10 @@ public class WorkerMemoryParameters
    * we use a value somewhat lower than 0.5.
    */
   static final double BROADCAST_JOIN_MEMORY_FRACTION = 0.3;
+  /**
+   * In case {@link NotEnoughMemoryFault} is thrown, a fixed estimation overhead is added when estimating total memory required for the process.
+   */
+  private static final long BUFFER_BYTES_FOR_ESTIMATION = 1000;
 
   private final int superSorterMaxActiveProcessors;
   private final int superSorterMaxChannelsPerProcessor;
@@ -155,12 +160,13 @@ public class WorkerMemoryParameters
    */
   public static WorkerMemoryParameters createProductionInstanceForController(final Injector injector)
   {
+    long totalLookupFootprint = computeTotalLookupFootprint(injector);
     return createInstance(
         Runtime.getRuntime().maxMemory(),
-        computeUsableMemoryInJvm(injector),
         computeNumWorkersInJvm(injector),
         computeNumProcessorsInJvm(injector),
-        0
+        0,
+        totalLookupFootprint
     );
   }
 
@@ -179,13 +185,14 @@ public class WorkerMemoryParameters
         inputStageNumbers.intStream()
                          .map(inputStageNumber -> queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
                          .sum();
+    long totalLookupFootprint = computeTotalLookupFootprint(injector);
 
     return createInstance(
         Runtime.getRuntime().maxMemory(),
-        computeUsableMemoryInJvm(injector),
         computeNumWorkersInJvm(injector),
         computeNumProcessorsInJvm(injector),
-        numInputWorkers
+        numInputWorkers,
+        totalLookupFootprint
     );
   }
 
@@ -200,15 +207,30 @@ public class WorkerMemoryParameters
    *                                  the task capacity.
    * @param numProcessingThreadsInJvm size of the processing thread pool in the JVM.
    * @param numInputWorkers           number of workers across input stages that need to be merged together.
+   * @param totalLookUpFootprint      estimated size of the lookups loaded by the process.
    */
   public static WorkerMemoryParameters createInstance(
       final long maxMemoryInJvm,
-      final long usableMemoryInJvm,
       final int numWorkersInJvm,
       final int numProcessingThreadsInJvm,
-      final int numInputWorkers
+      final int numInputWorkers,
+      final long totalLookUpFootprint
   )
   {
+    Preconditions.checkArgument(maxMemoryInJvm > 0, "Max memory passed: [%s] should be > 0", maxMemoryInJvm);
+    Preconditions.checkArgument(numWorkersInJvm > 0, "Number of workers: [%s] in jvm should be > 0", numWorkersInJvm);
+    Preconditions.checkArgument(
+        numProcessingThreadsInJvm > 0,
+        "Number of processing threads [%s] should be > 0",
+        numProcessingThreadsInJvm
+    );
+    Preconditions.checkArgument(numInputWorkers >= 0, "Number of input workers: [%s] should be >=0", numInputWorkers);
+    Preconditions.checkArgument(
+        totalLookUpFootprint >= 0,
+        "Lookup memory footprint: [%s] should be >= 0",
+        totalLookUpFootprint
+    );
+    final long usableMemoryInJvm = computeUsableMemoryInJvm(maxMemoryInJvm, totalLookUpFootprint);
     final long workerMemory = memoryPerWorker(usableMemoryInJvm, numWorkersInJvm);
     final long bundleMemory = memoryPerBundle(usableMemoryInJvm, numWorkersInJvm, numProcessingThreadsInJvm);
     final long bundleMemoryForInputChannels = memoryNeededForInputChannels(numInputWorkers);
@@ -223,6 +245,12 @@ public class WorkerMemoryParameters
         // Not enough memory for even one worker. More of a NotEnoughMemory situation than a TooManyWorkers situation.
         throw new MSQException(
             new NotEnoughMemoryFault(
+                calculateSuggestedMinMemoryFromUsableMemory(
+                    estimateUsableMemory(
+                        numWorkersInJvm,
+                        numProcessingThreadsInJvm,
+                        PROCESSING_MINIMUM_BYTES + BUFFER_BYTES_FOR_ESTIMATION + bundleMemoryForInputChannels
+                    ), totalLookUpFootprint),
                 maxMemoryInJvm,
                 usableMemoryInJvm,
                 numWorkersInJvm,
@@ -238,6 +266,13 @@ public class WorkerMemoryParameters
     if (maxNumFramesForSuperSorter < MIN_SUPER_SORTER_FRAMES) {
       throw new MSQException(
           new NotEnoughMemoryFault(
+              calculateSuggestedMinMemoryFromUsableMemory(
+                  estimateUsableMemory(
+                      numWorkersInJvm,
+                      (MIN_SUPER_SORTER_FRAMES + BUFFER_BYTES_FOR_ESTIMATION) * LARGE_FRAME_SIZE
+                  ),
+                  totalLookUpFootprint
+              ),
               maxMemoryInJvm,
               usableMemoryInJvm,
               numWorkersInJvm,
@@ -412,7 +447,7 @@ public class WorkerMemoryParameters
   }
 
   /**
-   * Compute the memory allocated to each processing bundle.
+   * Compute the memory allocated to each processing bundle. Any computation changes done to this method should also be done in its corresponding method {@link WorkerMemoryParameters#estimateUsableMemory(int, int, long)}
    */
   private static long memoryPerBundle(
       final long usableMemoryInJvm,
@@ -431,6 +466,32 @@ public class WorkerMemoryParameters
     return memoryForBundles / bundleCount;
   }
 
+  /**
+   * Used for estimating the usable memory for better exception messages when {@link NotEnoughMemoryFault} is thrown.
+   */
+  private static long estimateUsableMemory(
+      final int numWorkersInJvm,
+      final int numProcessingThreadsInJvm,
+      final long estimatedEachBundleMemory
+  )
+  {
+    final int bundleCount = numWorkersInJvm + numProcessingThreadsInJvm;
+    return estimateUsableMemory(numWorkersInJvm, estimatedEachBundleMemory * bundleCount);
+
+  }
+
+  /**
+   * Add overheads to the estimated bundle memoery for all the workers. Checkout {@link WorkerMemoryParameters#memoryPerWorker(long, int)}
+   * for the overhead calculation outside the processing bundles.
+   */
+  private static long estimateUsableMemory(final int numWorkersInJvm, final long estimatedTotalBundleMemory)
+  {
+
+    // Currently, we only add the partition stats overhead since it will be the single largest overhead per worker.
+    final long estimateStatOverHeadPerWorker = PARTITION_STATS_MEMORY_MAX_BYTES;
+    return estimatedTotalBundleMemory + (estimateStatOverHeadPerWorker * numWorkersInJvm);
+  }
+
   private static long memoryNeededForInputChannels(final int numInputWorkers)
   {
     // Workers that read sorted inputs must open all channels at once to do an N-way merge. Calculate memory needs.
@@ -439,11 +500,20 @@ public class WorkerMemoryParameters
   }
 
   /**
-   * Amount of heap memory available for our usage.
+   * Amount of heap memory available for our usage. Any computation changes done to this method should also be done in its corresponding method {@link WorkerMemoryParameters#calculateSuggestedMinMemoryFromUsableMemory}
+   */
+  private static long computeUsableMemoryInJvm(final long maxMemory, final long totalLookupFootprint)
+  {
+    // since lookups are essentially in memory hashmap's, the object overhead is trivial hence its subtracted prior to usable memory calculations.
+    return (long) ((maxMemory - totalLookupFootprint) * USABLE_MEMORY_FRACTION);
+  }
+
+  /**
+   * Estimate amount of heap memory for the given workload to use in case usable memory is provided. This method is used for better exception messages when {@link NotEnoughMemoryFault} is thrown.
    */
-  private static long computeUsableMemoryInJvm(final Injector injector)
+  private static long calculateSuggestedMinMemoryFromUsableMemory(long usuableMemeory, final long totalLookupFootprint)
   {
-    return (long) ((Runtime.getRuntime().maxMemory() - computeTotalLookupFootprint(injector)) * USABLE_MEMORY_FRACTION);
+    return (long) ((usuableMemeory / USABLE_MEMORY_FRACTION) + totalLookupFootprint);
   }
 
   /**
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
index dfd2129161..0ea447958e 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
@@ -30,6 +30,7 @@ public class NotEnoughMemoryFault extends BaseMSQFault
 {
   static final String CODE = "NotEnoughMemory";
 
+  private final long suggestedServerMemory;
   private final long serverMemory;
   private final long usableMemory;
   private final int serverWorkers;
@@ -37,6 +38,7 @@ public class NotEnoughMemoryFault extends BaseMSQFault
 
   @JsonCreator
   public NotEnoughMemoryFault(
+      @JsonProperty("suggestedServerMemory") final long suggestedServerMemory,
       @JsonProperty("serverMemory") final long serverMemory,
       @JsonProperty("usableMemory") final long usableMemory,
       @JsonProperty("serverWorkers") final int serverWorkers,
@@ -45,19 +47,28 @@ public class NotEnoughMemoryFault extends BaseMSQFault
   {
     super(
         CODE,
-        "Not enough memory (total = %,d; usable = %,d; server workers = %,d; server threads = %,d)",
+        "Not enough memory. Required al teast %,d bytes. (total = %,d bytes; usable = %,d bytes; server workers = %,d; server threads = %,d). Increase JVM memory with the -xmx option"
+        + (serverWorkers > 1 ? " or reduce number of server workers" : ""),
+        suggestedServerMemory,
         serverMemory,
         usableMemory,
         serverWorkers,
         serverThreads
     );
 
+    this.suggestedServerMemory = suggestedServerMemory;
     this.serverMemory = serverMemory;
     this.usableMemory = usableMemory;
     this.serverWorkers = serverWorkers;
     this.serverThreads = serverThreads;
   }
 
+  @JsonProperty
+  public long getSuggestedServerMemory()
+  {
+    return suggestedServerMemory;
+  }
+
   @JsonProperty
   public long getServerMemory()
   {
@@ -95,25 +106,35 @@ public class NotEnoughMemoryFault extends BaseMSQFault
       return false;
     }
     NotEnoughMemoryFault that = (NotEnoughMemoryFault) o;
-    return serverMemory == that.serverMemory
-           && usableMemory == that.usableMemory
-           && serverWorkers == that.serverWorkers
-           && serverThreads == that.serverThreads;
+    return
+        suggestedServerMemory == that.suggestedServerMemory
+        && serverMemory == that.serverMemory
+        && usableMemory == that.usableMemory
+        && serverWorkers == that.serverWorkers
+        && serverThreads == that.serverThreads;
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(super.hashCode(), serverMemory, usableMemory, serverWorkers, serverThreads);
+    return Objects.hash(
+        super.hashCode(),
+        suggestedServerMemory,
+        serverMemory,
+        usableMemory,
+        serverWorkers,
+        serverThreads
+    );
   }
 
   @Override
   public String toString()
   {
     return "NotEnoughMemoryFault{" +
-           "serverMemory=" + serverMemory +
-           ", usableMemory=" + usableMemory +
-           ", serverWorkers=" + serverWorkers +
+           "suggestedServerMemory=" + suggestedServerMemory +
+           " bytes, serverMemory=" + serverMemory +
+           " bytes, usableMemory=" + usableMemory +
+           " bytes, serverWorkers=" + serverWorkers +
            ", serverThreads=" + serverThreads +
            '}';
   }
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerMemoryParametersTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerMemoryParametersTest.java
index f0413ddcd6..a5ec9e48ca 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerMemoryParametersTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerMemoryParametersTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.exec;
 
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.MSQFault;
 import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
 import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
 import org.junit.Assert;
@@ -31,29 +32,34 @@ public class WorkerMemoryParametersTest
   @Test
   public void test_oneWorkerInJvm_alone()
   {
-    Assert.assertEquals(parameters(1, 41, 224_785_000, 100_650_000, 75_000_000), compute(1_000_000_000, 1, 1, 1));
-    Assert.assertEquals(parameters(2, 13, 149_410_000, 66_900_000, 75_000_000), compute(1_000_000_000, 1, 2, 1));
-    Assert.assertEquals(parameters(4, 3, 89_110_000, 39_900_000, 75_000_000), compute(1_000_000_000, 1, 4, 1));
-    Assert.assertEquals(parameters(3, 2, 48_910_000, 21_900_000, 75_000_000), compute(1_000_000_000, 1, 8, 1));
-    Assert.assertEquals(parameters(2, 2, 33_448_460, 14_976_922, 75_000_000), compute(1_000_000_000, 1, 12, 1));
+    Assert.assertEquals(parameters(1, 41, 224_785_000, 100_650_000, 75_000_000), compute(1_000_000_000, 1, 1, 1, 0));
+    Assert.assertEquals(parameters(2, 13, 149_410_000, 66_900_000, 75_000_000), compute(1_000_000_000, 1, 2, 1, 0));
+    Assert.assertEquals(parameters(4, 3, 89_110_000, 39_900_000, 75_000_000), compute(1_000_000_000, 1, 4, 1, 0));
+    Assert.assertEquals(parameters(3, 2, 48_910_000, 21_900_000, 75_000_000), compute(1_000_000_000, 1, 8, 1, 0));
+    Assert.assertEquals(parameters(2, 2, 33_448_460, 14_976_922, 75_000_000), compute(1_000_000_000, 1, 12, 1, 0));
 
     final MSQException e = Assert.assertThrows(
         MSQException.class,
-        () -> compute(1_000_000_000, 1, 32, 1)
+        () -> compute(1_000_000_000, 1, 32, 1, 0)
     );
+    Assert.assertEquals(new NotEnoughMemoryFault(1_588_044_000, 1_000_000_000, 750_000_000, 1, 32), e.getFault());
+
+    final MSQFault fault = Assert.assertThrows(MSQException.class, () -> compute(1_000_000_000, 2, 32, 1, 0))
+                                 .getFault();
+
+    Assert.assertEquals(new NotEnoughMemoryFault(2024045333, 1_000_000_000, 750_000_000, 2, 32), fault);
 
-    Assert.assertEquals(new NotEnoughMemoryFault(1_000_000_000, 750_000_000, 1, 32), e.getFault());
   }
 
   @Test
   public void test_oneWorkerInJvm_twoHundredWorkersInCluster()
   {
-    Assert.assertEquals(parameters(1, 83, 317_580_000, 142_200_000, 150_000_000), compute(2_000_000_000, 1, 1, 200));
-    Assert.assertEquals(parameters(2, 27, 166_830_000, 74_700_000, 150_000_000), compute(2_000_000_000, 1, 2, 200));
+    Assert.assertEquals(parameters(1, 83, 317_580_000, 142_200_000, 150_000_000), compute(2_000_000_000, 1, 1, 200, 0));
+    Assert.assertEquals(parameters(2, 27, 166_830_000, 74_700_000, 150_000_000), compute(2_000_000_000, 1, 2, 200, 0));
 
     final MSQException e = Assert.assertThrows(
         MSQException.class,
-        () -> compute(1_000_000_000, 1, 4, 200)
+        () -> compute(1_000_000_000, 1, 4, 200, 0)
     );
 
     Assert.assertEquals(new TooManyWorkersFault(200, 109), e.getFault());
@@ -62,32 +68,39 @@ public class WorkerMemoryParametersTest
   @Test
   public void test_fourWorkersInJvm_twoHundredWorkersInCluster()
   {
-    Assert.assertEquals(parameters(1, 150, 679_380_000, 304_200_000, 168_750_000), compute(9_000_000_000L, 4, 1, 200));
-    Assert.assertEquals(parameters(2, 62, 543_705_000, 243_450_000, 168_750_000), compute(9_000_000_000L, 4, 2, 200));
-    Assert.assertEquals(parameters(4, 22, 374_111_250, 167_512_500, 168_750_000), compute(9_000_000_000L, 4, 4, 200));
-    Assert.assertEquals(parameters(4, 14, 204_517_500, 91_575_000, 168_750_000), compute(9_000_000_000L, 4, 8, 200));
-    Assert.assertEquals(parameters(4, 8, 68_842_500, 30_825_000, 168_750_000), compute(9_000_000_000L, 4, 16, 200));
+    Assert.assertEquals(
+        parameters(1, 150, 679_380_000, 304_200_000, 168_750_000),
+        compute(9_000_000_000L, 4, 1, 200, 0)
+    );
+    Assert.assertEquals(
+        parameters(2, 62, 543_705_000, 243_450_000, 168_750_000),
+        compute(9_000_000_000L, 4, 2, 200, 0)
+    );
+    Assert.assertEquals(
+        parameters(4, 22, 374_111_250, 167_512_500, 168_750_000),
+        compute(9_000_000_000L, 4, 4, 200, 0)
+    );
+    Assert.assertEquals(parameters(4, 14, 204_517_500, 91_575_000, 168_750_000), compute(9_000_000_000L, 4, 8, 200, 0));
+    Assert.assertEquals(parameters(4, 8, 68_842_500, 30_825_000, 168_750_000), compute(9_000_000_000L, 4, 16, 200, 0));
 
     final MSQException e = Assert.assertThrows(
         MSQException.class,
-        () -> compute(8_000_000_000L, 4, 32, 200)
+        () -> compute(8_000_000_000L, 4, 32, 200, 0)
     );
 
     Assert.assertEquals(new TooManyWorkersFault(200, 124), e.getFault());
 
     // Make sure 107 actually works. (Verify the error message above.)
-    Assert.assertEquals(parameters(4, 3, 28_140_000, 12_600_000, 150_000_000), compute(8_000_000_000L, 4, 32, 107));
+    Assert.assertEquals(parameters(4, 3, 28_140_000, 12_600_000, 150_000_000), compute(8_000_000_000L, 4, 32, 107, 0));
   }
 
   @Test
   public void test_oneWorkerInJvm_negativeUsableMemory()
   {
-    final MSQException e = Assert.assertThrows(
-        MSQException.class,
+    Exception e = Assert.assertThrows(
+        IllegalArgumentException.class,
         () -> WorkerMemoryParameters.createInstance(100, -50, 1, 32, 1)
     );
-
-    Assert.assertEquals(new NotEnoughMemoryFault(100, -50, 1, 32), e.getFault());
   }
 
   @Test
@@ -117,15 +130,16 @@ public class WorkerMemoryParametersTest
       final long maxMemoryInJvm,
       final int numWorkersInJvm,
       final int numProcessingThreadsInJvm,
-      final int numInputWorkers
+      final int numInputWorkers,
+      final int totalLookUpFootprint
   )
   {
     return WorkerMemoryParameters.createInstance(
         maxMemoryInJvm,
-        (long) (maxMemoryInJvm * WorkerMemoryParameters.USABLE_MEMORY_FRACTION),
         numWorkersInJvm,
         numProcessingThreadsInJvm,
-        numInputWorkers
+        numInputWorkers,
+        totalLookUpFootprint
     );
   }
 }
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
index 7a0ee66ea6..aae47170a2 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
@@ -60,7 +60,7 @@ public class MSQFaultSerdeTest
     assertFaultSerde(InsertTimeNullFault.INSTANCE);
     assertFaultSerde(new InsertTimeOutOfBoundsFault(Intervals.ETERNITY));
     assertFaultSerde(new InvalidNullByteFault("the column"));
-    assertFaultSerde(new NotEnoughMemoryFault(1000, 900, 1, 2));
+    assertFaultSerde(new NotEnoughMemoryFault(1000, 1000, 900, 1, 2));
     assertFaultSerde(QueryNotSupportedFault.INSTANCE);
     assertFaultSerde(new RowTooLargeFault(1000));
     assertFaultSerde(new TaskStartTimeoutFault(10));
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryTestMSQ.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryTestMSQ.java
index 31f935fc07..1042387020 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryTestMSQ.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryTestMSQ.java
@@ -72,11 +72,11 @@ public class CalciteSelectQueryTestMSQ extends CalciteQueryTest
   {
     final WorkerMemoryParameters workerMemoryParameters =
         WorkerMemoryParameters.createInstance(
-            WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
             WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
             2,
             10,
-            2
+            2,
+            0
         );
     indexingServiceClient = new MSQTestOverlordServiceClient(
         queryJsonMapper,
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 8842e16958..50da3c1f75 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -282,11 +282,11 @@ public class MSQTestBase extends BaseCalciteQueryTest
   private TestGroupByBuffers groupByBuffers;
   protected final WorkerMemoryParameters workerMemoryParameters = Mockito.spy(
       WorkerMemoryParameters.createInstance(
-          WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
           WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
           2,
           10,
-          2
+          2,
+          0
       )
   );
 
diff --git a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
index b25e31d363..947283fe2b 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
@@ -207,7 +207,7 @@ public abstract class QueryResultPusher
       resultsWriter.recordFailure(e);
 
       // This case is always a failure because the error happened mid-stream of sending results back.  Therefore,
-      // we do not believe that the response stream was actually useable
+      // we do not believe that the response stream was actually usable
       counter.incrementFailed();
       return null;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org