You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ka...@apache.org on 2023/03/06 12:30:43 UTC
[druid] branch master updated: Suggested memory calculation in case NOT_ENOUGH_MEMORY_FAULT is thrown. (#13846)
This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 94cfabea18 Suggested memory calculation in case NOT_ENOUGH_MEMORY_FAULT is thrown. (#13846)
94cfabea18 is described below
commit 94cfabea1822eaadb9b36e700645d729a57a250e
Author: Karan Kumar <ka...@gmail.com>
AuthorDate: Mon Mar 6 18:00:36 2023 +0530
Suggested memory calculation in case NOT_ENOUGH_MEMORY_FAULT is thrown. (#13846)
* Suggested memory calculation in case NOT_ENOUGH_MEMORY_FAULT is thrown.
Co-authored-by: Charles Smith <te...@gmail.com>
---
docs/multi-stage-query/reference.md | 2 +-
.../druid/msq/exec/WorkerMemoryParameters.java | 90 +++++++++++++++++++---
.../msq/indexing/error/NotEnoughMemoryFault.java | 39 +++++++---
.../druid/msq/exec/WorkerMemoryParametersTest.java | 62 +++++++++------
.../msq/indexing/error/MSQFaultSerdeTest.java | 2 +-
.../druid/msq/test/CalciteSelectQueryTestMSQ.java | 4 +-
.../org/apache/druid/msq/test/MSQTestBase.java | 4 +-
.../org/apache/druid/server/QueryResultPusher.java | 2 +-
8 files changed, 155 insertions(+), 50 deletions(-)
diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md
index de95b56923..5c05b68bf3 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -690,7 +690,7 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_TooManyColumns">`TooManyColumns`</a> | Exceeded the maximum number of columns for a stage (2,000 columns). | `numColumns`: The number of columns requested.<br /><br />`maxColumns`: The limit on columns which was exceeded. |
| <a name="error_TooManyWarnings">`TooManyWarnings`</a> | Exceeded the maximum allowed number of warnings of a particular type. | `rootErrorCode`: The error code corresponding to the exception that exceeded the required limit. <br /><br />`maxWarnings`: Maximum number of warnings that are allowed for the corresponding `rootErrorCode`. |
| <a name="error_TooManyWorkers">`TooManyWorkers`</a> | Exceeded the maximum number of simultaneously-running workers. See the [Limits](#limits) table for more details. | `workers`: The number of simultaneously running workers that exceeded a hard or soft limit. This may be larger than the number of workers in any one stage if multiple stages are running simultaneously. <br /><br />`maxWorkers`: The hard or soft limit on workers that was exceeded. If this is lower than the hard limit (1, [...]
-| <a name="error_NotEnoughMemory">`NotEnoughMemory`</a> | Insufficient memory to launch a stage. | `serverMemory`: The amount of memory available to a single process.<br /><br />`serverWorkers`: The number of workers running in a single process.<br /><br />`serverThreads`: The number of threads in a single process. |
+| <a name="error_NotEnoughMemory">`NotEnoughMemory`</a> | Insufficient memory to launch a stage. | `suggestedServerMemory`: Suggested number of bytes of memory to allocate to a given process. <br /><br />`serverMemory`: The number of bytes of memory available to a single process.<br /><br />`usableMemory`: The number of usable bytes of memory for a single process.<br /><br />`serverWorkers`: The number of workers running in a single process.<br /><br />`serverThreads`: The number of thre [...]
| <a name="error_WorkerFailed">`WorkerFailed`</a> | A worker task failed unexpectedly. | `errorMsg`<br /><br />`workerTaskId`: The ID of the worker task. |
| <a name="error_WorkerRpcFailed">`WorkerRpcFailed`</a> | A remote procedure call to a worker task failed and could not recover. | `workerTaskId`: the id of the worker task |
| <a name="error_UnknownError">`UnknownError`</a> | All other errors. | `message` |
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
index fe812e2a29..4c038f921d 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
@@ -19,6 +19,7 @@
package org.apache.druid.msq.exec;
+import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.google.inject.Injector;
import it.unimi.dsi.fastutil.ints.IntSet;
@@ -128,6 +129,10 @@ public class WorkerMemoryParameters
* we use a value somewhat lower than 0.5.
*/
static final double BROADCAST_JOIN_MEMORY_FRACTION = 0.3;
+ /**
+ * In case {@link NotEnoughMemoryFault} is thrown, a fixed estimation overhead is added when estimating total memory required for the process.
+ */
+ private static final long BUFFER_BYTES_FOR_ESTIMATION = 1000;
private final int superSorterMaxActiveProcessors;
private final int superSorterMaxChannelsPerProcessor;
@@ -155,12 +160,13 @@ public class WorkerMemoryParameters
*/
public static WorkerMemoryParameters createProductionInstanceForController(final Injector injector)
{
+ long totalLookupFootprint = computeTotalLookupFootprint(injector);
return createInstance(
Runtime.getRuntime().maxMemory(),
- computeUsableMemoryInJvm(injector),
computeNumWorkersInJvm(injector),
computeNumProcessorsInJvm(injector),
- 0
+ 0,
+ totalLookupFootprint
);
}
@@ -179,13 +185,14 @@ public class WorkerMemoryParameters
inputStageNumbers.intStream()
.map(inputStageNumber -> queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
.sum();
+ long totalLookupFootprint = computeTotalLookupFootprint(injector);
return createInstance(
Runtime.getRuntime().maxMemory(),
- computeUsableMemoryInJvm(injector),
computeNumWorkersInJvm(injector),
computeNumProcessorsInJvm(injector),
- numInputWorkers
+ numInputWorkers,
+ totalLookupFootprint
);
}
@@ -200,15 +207,30 @@ public class WorkerMemoryParameters
* the task capacity.
* @param numProcessingThreadsInJvm size of the processing thread pool in the JVM.
* @param numInputWorkers number of workers across input stages that need to be merged together.
+ * @param totalLookUpFootprint estimated size of the lookups loaded by the process.
*/
public static WorkerMemoryParameters createInstance(
final long maxMemoryInJvm,
- final long usableMemoryInJvm,
final int numWorkersInJvm,
final int numProcessingThreadsInJvm,
- final int numInputWorkers
+ final int numInputWorkers,
+ final long totalLookUpFootprint
)
{
+ Preconditions.checkArgument(maxMemoryInJvm > 0, "Max memory passed: [%s] should be > 0", maxMemoryInJvm);
+ Preconditions.checkArgument(numWorkersInJvm > 0, "Number of workers: [%s] in jvm should be > 0", numWorkersInJvm);
+ Preconditions.checkArgument(
+ numProcessingThreadsInJvm > 0,
+ "Number of processing threads [%s] should be > 0",
+ numProcessingThreadsInJvm
+ );
+ Preconditions.checkArgument(numInputWorkers >= 0, "Number of input workers: [%s] should be >=0", numInputWorkers);
+ Preconditions.checkArgument(
+ totalLookUpFootprint >= 0,
+ "Lookup memory footprint: [%s] should be >= 0",
+ totalLookUpFootprint
+ );
+ final long usableMemoryInJvm = computeUsableMemoryInJvm(maxMemoryInJvm, totalLookUpFootprint);
final long workerMemory = memoryPerWorker(usableMemoryInJvm, numWorkersInJvm);
final long bundleMemory = memoryPerBundle(usableMemoryInJvm, numWorkersInJvm, numProcessingThreadsInJvm);
final long bundleMemoryForInputChannels = memoryNeededForInputChannels(numInputWorkers);
@@ -223,6 +245,12 @@ public class WorkerMemoryParameters
// Not enough memory for even one worker. More of a NotEnoughMemory situation than a TooManyWorkers situation.
throw new MSQException(
new NotEnoughMemoryFault(
+ calculateSuggestedMinMemoryFromUsableMemory(
+ estimateUsableMemory(
+ numWorkersInJvm,
+ numProcessingThreadsInJvm,
+ PROCESSING_MINIMUM_BYTES + BUFFER_BYTES_FOR_ESTIMATION + bundleMemoryForInputChannels
+ ), totalLookUpFootprint),
maxMemoryInJvm,
usableMemoryInJvm,
numWorkersInJvm,
@@ -238,6 +266,13 @@ public class WorkerMemoryParameters
if (maxNumFramesForSuperSorter < MIN_SUPER_SORTER_FRAMES) {
throw new MSQException(
new NotEnoughMemoryFault(
+ calculateSuggestedMinMemoryFromUsableMemory(
+ estimateUsableMemory(
+ numWorkersInJvm,
+ (MIN_SUPER_SORTER_FRAMES + BUFFER_BYTES_FOR_ESTIMATION) * LARGE_FRAME_SIZE
+ ),
+ totalLookUpFootprint
+ ),
maxMemoryInJvm,
usableMemoryInJvm,
numWorkersInJvm,
@@ -412,7 +447,7 @@ public class WorkerMemoryParameters
}
/**
- * Compute the memory allocated to each processing bundle.
+ * Compute the memory allocated to each processing bundle. Any computation changes done to this method should also be done in its corresponding method {@link WorkerMemoryParameters#estimateUsableMemory(int, int, long)}
*/
private static long memoryPerBundle(
final long usableMemoryInJvm,
@@ -431,6 +466,32 @@ public class WorkerMemoryParameters
return memoryForBundles / bundleCount;
}
+ /**
+ * Used for estimating the usable memory for better exception messages when {@link NotEnoughMemoryFault} is thrown.
+ */
+ private static long estimateUsableMemory(
+ final int numWorkersInJvm,
+ final int numProcessingThreadsInJvm,
+ final long estimatedEachBundleMemory
+ )
+ {
+ final int bundleCount = numWorkersInJvm + numProcessingThreadsInJvm;
+ return estimateUsableMemory(numWorkersInJvm, estimatedEachBundleMemory * bundleCount);
+
+ }
+
+ /**
+ * Add overheads to the estimated bundle memoery for all the workers. Checkout {@link WorkerMemoryParameters#memoryPerWorker(long, int)}
+ * for the overhead calculation outside the processing bundles.
+ */
+ private static long estimateUsableMemory(final int numWorkersInJvm, final long estimatedTotalBundleMemory)
+ {
+
+ // Currently, we only add the partition stats overhead since it will be the single largest overhead per worker.
+ final long estimateStatOverHeadPerWorker = PARTITION_STATS_MEMORY_MAX_BYTES;
+ return estimatedTotalBundleMemory + (estimateStatOverHeadPerWorker * numWorkersInJvm);
+ }
+
private static long memoryNeededForInputChannels(final int numInputWorkers)
{
// Workers that read sorted inputs must open all channels at once to do an N-way merge. Calculate memory needs.
@@ -439,11 +500,20 @@ public class WorkerMemoryParameters
}
/**
- * Amount of heap memory available for our usage.
+ * Amount of heap memory available for our usage. Any computation changes done to this method should also be done in its corresponding method {@link WorkerMemoryParameters#calculateSuggestedMinMemoryFromUsableMemory}
+ */
+ private static long computeUsableMemoryInJvm(final long maxMemory, final long totalLookupFootprint)
+ {
+ // since lookups are essentially in memory hashmap's, the object overhead is trivial hence its subtracted prior to usable memory calculations.
+ return (long) ((maxMemory - totalLookupFootprint) * USABLE_MEMORY_FRACTION);
+ }
+
+ /**
+ * Estimate amount of heap memory for the given workload to use in case usable memory is provided. This method is used for better exception messages when {@link NotEnoughMemoryFault} is thrown.
*/
- private static long computeUsableMemoryInJvm(final Injector injector)
+ private static long calculateSuggestedMinMemoryFromUsableMemory(long usuableMemeory, final long totalLookupFootprint)
{
- return (long) ((Runtime.getRuntime().maxMemory() - computeTotalLookupFootprint(injector)) * USABLE_MEMORY_FRACTION);
+ return (long) ((usuableMemeory / USABLE_MEMORY_FRACTION) + totalLookupFootprint);
}
/**
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
index dfd2129161..0ea447958e 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
@@ -30,6 +30,7 @@ public class NotEnoughMemoryFault extends BaseMSQFault
{
static final String CODE = "NotEnoughMemory";
+ private final long suggestedServerMemory;
private final long serverMemory;
private final long usableMemory;
private final int serverWorkers;
@@ -37,6 +38,7 @@ public class NotEnoughMemoryFault extends BaseMSQFault
@JsonCreator
public NotEnoughMemoryFault(
+ @JsonProperty("suggestedServerMemory") final long suggestedServerMemory,
@JsonProperty("serverMemory") final long serverMemory,
@JsonProperty("usableMemory") final long usableMemory,
@JsonProperty("serverWorkers") final int serverWorkers,
@@ -45,19 +47,28 @@ public class NotEnoughMemoryFault extends BaseMSQFault
{
super(
CODE,
- "Not enough memory (total = %,d; usable = %,d; server workers = %,d; server threads = %,d)",
+ "Not enough memory. Required al teast %,d bytes. (total = %,d bytes; usable = %,d bytes; server workers = %,d; server threads = %,d). Increase JVM memory with the -xmx option"
+ + (serverWorkers > 1 ? " or reduce number of server workers" : ""),
+ suggestedServerMemory,
serverMemory,
usableMemory,
serverWorkers,
serverThreads
);
+ this.suggestedServerMemory = suggestedServerMemory;
this.serverMemory = serverMemory;
this.usableMemory = usableMemory;
this.serverWorkers = serverWorkers;
this.serverThreads = serverThreads;
}
+ @JsonProperty
+ public long getSuggestedServerMemory()
+ {
+ return suggestedServerMemory;
+ }
+
@JsonProperty
public long getServerMemory()
{
@@ -95,25 +106,35 @@ public class NotEnoughMemoryFault extends BaseMSQFault
return false;
}
NotEnoughMemoryFault that = (NotEnoughMemoryFault) o;
- return serverMemory == that.serverMemory
- && usableMemory == that.usableMemory
- && serverWorkers == that.serverWorkers
- && serverThreads == that.serverThreads;
+ return
+ suggestedServerMemory == that.suggestedServerMemory
+ && serverMemory == that.serverMemory
+ && usableMemory == that.usableMemory
+ && serverWorkers == that.serverWorkers
+ && serverThreads == that.serverThreads;
}
@Override
public int hashCode()
{
- return Objects.hash(super.hashCode(), serverMemory, usableMemory, serverWorkers, serverThreads);
+ return Objects.hash(
+ super.hashCode(),
+ suggestedServerMemory,
+ serverMemory,
+ usableMemory,
+ serverWorkers,
+ serverThreads
+ );
}
@Override
public String toString()
{
return "NotEnoughMemoryFault{" +
- "serverMemory=" + serverMemory +
- ", usableMemory=" + usableMemory +
- ", serverWorkers=" + serverWorkers +
+ "suggestedServerMemory=" + suggestedServerMemory +
+ " bytes, serverMemory=" + serverMemory +
+ " bytes, usableMemory=" + usableMemory +
+ " bytes, serverWorkers=" + serverWorkers +
", serverThreads=" + serverThreads +
'}';
}
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerMemoryParametersTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerMemoryParametersTest.java
index f0413ddcd6..a5ec9e48ca 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerMemoryParametersTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerMemoryParametersTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.exec;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.MSQFault;
import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
import org.junit.Assert;
@@ -31,29 +32,34 @@ public class WorkerMemoryParametersTest
@Test
public void test_oneWorkerInJvm_alone()
{
- Assert.assertEquals(parameters(1, 41, 224_785_000, 100_650_000, 75_000_000), compute(1_000_000_000, 1, 1, 1));
- Assert.assertEquals(parameters(2, 13, 149_410_000, 66_900_000, 75_000_000), compute(1_000_000_000, 1, 2, 1));
- Assert.assertEquals(parameters(4, 3, 89_110_000, 39_900_000, 75_000_000), compute(1_000_000_000, 1, 4, 1));
- Assert.assertEquals(parameters(3, 2, 48_910_000, 21_900_000, 75_000_000), compute(1_000_000_000, 1, 8, 1));
- Assert.assertEquals(parameters(2, 2, 33_448_460, 14_976_922, 75_000_000), compute(1_000_000_000, 1, 12, 1));
+ Assert.assertEquals(parameters(1, 41, 224_785_000, 100_650_000, 75_000_000), compute(1_000_000_000, 1, 1, 1, 0));
+ Assert.assertEquals(parameters(2, 13, 149_410_000, 66_900_000, 75_000_000), compute(1_000_000_000, 1, 2, 1, 0));
+ Assert.assertEquals(parameters(4, 3, 89_110_000, 39_900_000, 75_000_000), compute(1_000_000_000, 1, 4, 1, 0));
+ Assert.assertEquals(parameters(3, 2, 48_910_000, 21_900_000, 75_000_000), compute(1_000_000_000, 1, 8, 1, 0));
+ Assert.assertEquals(parameters(2, 2, 33_448_460, 14_976_922, 75_000_000), compute(1_000_000_000, 1, 12, 1, 0));
final MSQException e = Assert.assertThrows(
MSQException.class,
- () -> compute(1_000_000_000, 1, 32, 1)
+ () -> compute(1_000_000_000, 1, 32, 1, 0)
);
+ Assert.assertEquals(new NotEnoughMemoryFault(1_588_044_000, 1_000_000_000, 750_000_000, 1, 32), e.getFault());
+
+ final MSQFault fault = Assert.assertThrows(MSQException.class, () -> compute(1_000_000_000, 2, 32, 1, 0))
+ .getFault();
+
+ Assert.assertEquals(new NotEnoughMemoryFault(2024045333, 1_000_000_000, 750_000_000, 2, 32), fault);
- Assert.assertEquals(new NotEnoughMemoryFault(1_000_000_000, 750_000_000, 1, 32), e.getFault());
}
@Test
public void test_oneWorkerInJvm_twoHundredWorkersInCluster()
{
- Assert.assertEquals(parameters(1, 83, 317_580_000, 142_200_000, 150_000_000), compute(2_000_000_000, 1, 1, 200));
- Assert.assertEquals(parameters(2, 27, 166_830_000, 74_700_000, 150_000_000), compute(2_000_000_000, 1, 2, 200));
+ Assert.assertEquals(parameters(1, 83, 317_580_000, 142_200_000, 150_000_000), compute(2_000_000_000, 1, 1, 200, 0));
+ Assert.assertEquals(parameters(2, 27, 166_830_000, 74_700_000, 150_000_000), compute(2_000_000_000, 1, 2, 200, 0));
final MSQException e = Assert.assertThrows(
MSQException.class,
- () -> compute(1_000_000_000, 1, 4, 200)
+ () -> compute(1_000_000_000, 1, 4, 200, 0)
);
Assert.assertEquals(new TooManyWorkersFault(200, 109), e.getFault());
@@ -62,32 +68,39 @@ public class WorkerMemoryParametersTest
@Test
public void test_fourWorkersInJvm_twoHundredWorkersInCluster()
{
- Assert.assertEquals(parameters(1, 150, 679_380_000, 304_200_000, 168_750_000), compute(9_000_000_000L, 4, 1, 200));
- Assert.assertEquals(parameters(2, 62, 543_705_000, 243_450_000, 168_750_000), compute(9_000_000_000L, 4, 2, 200));
- Assert.assertEquals(parameters(4, 22, 374_111_250, 167_512_500, 168_750_000), compute(9_000_000_000L, 4, 4, 200));
- Assert.assertEquals(parameters(4, 14, 204_517_500, 91_575_000, 168_750_000), compute(9_000_000_000L, 4, 8, 200));
- Assert.assertEquals(parameters(4, 8, 68_842_500, 30_825_000, 168_750_000), compute(9_000_000_000L, 4, 16, 200));
+ Assert.assertEquals(
+ parameters(1, 150, 679_380_000, 304_200_000, 168_750_000),
+ compute(9_000_000_000L, 4, 1, 200, 0)
+ );
+ Assert.assertEquals(
+ parameters(2, 62, 543_705_000, 243_450_000, 168_750_000),
+ compute(9_000_000_000L, 4, 2, 200, 0)
+ );
+ Assert.assertEquals(
+ parameters(4, 22, 374_111_250, 167_512_500, 168_750_000),
+ compute(9_000_000_000L, 4, 4, 200, 0)
+ );
+ Assert.assertEquals(parameters(4, 14, 204_517_500, 91_575_000, 168_750_000), compute(9_000_000_000L, 4, 8, 200, 0));
+ Assert.assertEquals(parameters(4, 8, 68_842_500, 30_825_000, 168_750_000), compute(9_000_000_000L, 4, 16, 200, 0));
final MSQException e = Assert.assertThrows(
MSQException.class,
- () -> compute(8_000_000_000L, 4, 32, 200)
+ () -> compute(8_000_000_000L, 4, 32, 200, 0)
);
Assert.assertEquals(new TooManyWorkersFault(200, 124), e.getFault());
// Make sure 107 actually works. (Verify the error message above.)
- Assert.assertEquals(parameters(4, 3, 28_140_000, 12_600_000, 150_000_000), compute(8_000_000_000L, 4, 32, 107));
+ Assert.assertEquals(parameters(4, 3, 28_140_000, 12_600_000, 150_000_000), compute(8_000_000_000L, 4, 32, 107, 0));
}
@Test
public void test_oneWorkerInJvm_negativeUsableMemory()
{
- final MSQException e = Assert.assertThrows(
- MSQException.class,
+ Exception e = Assert.assertThrows(
+ IllegalArgumentException.class,
() -> WorkerMemoryParameters.createInstance(100, -50, 1, 32, 1)
);
-
- Assert.assertEquals(new NotEnoughMemoryFault(100, -50, 1, 32), e.getFault());
}
@Test
@@ -117,15 +130,16 @@ public class WorkerMemoryParametersTest
final long maxMemoryInJvm,
final int numWorkersInJvm,
final int numProcessingThreadsInJvm,
- final int numInputWorkers
+ final int numInputWorkers,
+ final int totalLookUpFootprint
)
{
return WorkerMemoryParameters.createInstance(
maxMemoryInJvm,
- (long) (maxMemoryInJvm * WorkerMemoryParameters.USABLE_MEMORY_FRACTION),
numWorkersInJvm,
numProcessingThreadsInJvm,
- numInputWorkers
+ numInputWorkers,
+ totalLookUpFootprint
);
}
}
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
index 7a0ee66ea6..aae47170a2 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
@@ -60,7 +60,7 @@ public class MSQFaultSerdeTest
assertFaultSerde(InsertTimeNullFault.INSTANCE);
assertFaultSerde(new InsertTimeOutOfBoundsFault(Intervals.ETERNITY));
assertFaultSerde(new InvalidNullByteFault("the column"));
- assertFaultSerde(new NotEnoughMemoryFault(1000, 900, 1, 2));
+ assertFaultSerde(new NotEnoughMemoryFault(1000, 1000, 900, 1, 2));
assertFaultSerde(QueryNotSupportedFault.INSTANCE);
assertFaultSerde(new RowTooLargeFault(1000));
assertFaultSerde(new TaskStartTimeoutFault(10));
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryTestMSQ.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryTestMSQ.java
index 31f935fc07..1042387020 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryTestMSQ.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryTestMSQ.java
@@ -72,11 +72,11 @@ public class CalciteSelectQueryTestMSQ extends CalciteQueryTest
{
final WorkerMemoryParameters workerMemoryParameters =
WorkerMemoryParameters.createInstance(
- WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
2,
10,
- 2
+ 2,
+ 0
);
indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 8842e16958..50da3c1f75 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -282,11 +282,11 @@ public class MSQTestBase extends BaseCalciteQueryTest
private TestGroupByBuffers groupByBuffers;
protected final WorkerMemoryParameters workerMemoryParameters = Mockito.spy(
WorkerMemoryParameters.createInstance(
- WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
2,
10,
- 2
+ 2,
+ 0
)
);
diff --git a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
index b25e31d363..947283fe2b 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java
@@ -207,7 +207,7 @@ public abstract class QueryResultPusher
resultsWriter.recordFailure(e);
// This case is always a failure because the error happened mid-stream of sending results back. Therefore,
- // we do not believe that the response stream was actually useable
+ // we do not believe that the response stream was actually usable
counter.incrementFailed();
return null;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org