You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/29 20:07:15 UTC
[arrow-datafusion] branch master updated: Add integration test for erroring when memory limits are hit (#4406)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 49166ea55 Add integration test for erroring when memory limits are hit (#4406)
49166ea55 is described below
commit 49166ea55f317722ab7a37fbfc253bcd497c1672
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Nov 29 15:07:10 2022 -0500
Add integration test for erroring when memory limits are hit (#4406)
* Add test for runtime memory limiting
* Update datafusion/core/tests/memory_limit.rs
---
.../core/src/execution/memory_manager/proxy.rs | 7 +-
.../core/src/physical_plan/aggregates/hash.rs | 2 +-
.../src/physical_plan/aggregates/no_grouping.rs | 2 +-
.../core/src/physical_plan/aggregates/row_hash.rs | 2 +-
datafusion/core/src/physical_plan/sorts/sort.rs | 15 +--
datafusion/core/tests/join_fuzz.rs | 19 +---
datafusion/core/tests/memory_limit.rs | 112 +++++++++++++++++++++
datafusion/core/tests/merge_fuzz.rs | 18 +---
datafusion/core/tests/order_spill_fuzz.rs | 21 +---
test-utils/src/lib.rs | 32 +++++-
10 files changed, 165 insertions(+), 65 deletions(-)
diff --git a/datafusion/core/src/execution/memory_manager/proxy.rs b/datafusion/core/src/execution/memory_manager/proxy.rs
index 6ea52e909..2a5bd2507 100644
--- a/datafusion/core/src/execution/memory_manager/proxy.rs
+++ b/datafusion/core/src/execution/memory_manager/proxy.rs
@@ -88,9 +88,10 @@ impl MemoryConsumer for MemoryConsumerProxy {
}
async fn spill(&self) -> Result<usize, DataFusionError> {
- Err(DataFusionError::ResourcesExhausted(
- "Cannot spill AggregationState".to_owned(),
- ))
+ Err(DataFusionError::ResourcesExhausted(format!(
+ "Cannot spill {}",
+ self.name
+ )))
}
fn mem_used(&self) -> usize {
diff --git a/datafusion/core/src/physical_plan/aggregates/hash.rs b/datafusion/core/src/physical_plan/aggregates/hash.rs
index d3d5a337e..8bf929630 100644
--- a/datafusion/core/src/physical_plan/aggregates/hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/hash.rs
@@ -135,7 +135,7 @@ impl GroupedHashAggregateStream {
aggregate_expressions,
accumulators: Accumulators {
memory_consumer: MemoryConsumerProxy::new(
- "Accumulators",
+ "GroupBy Hash Accumulators",
MemoryConsumerId::new(partition),
Arc::clone(&context.runtime_env().memory_manager),
),
diff --git a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
index 8c3556bb6..64cc4f569 100644
--- a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
+++ b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
@@ -73,7 +73,7 @@ impl AggregateStream {
let aggregate_expressions = aggregate_expressions(&aggr_expr, &mode, 0)?;
let accumulators = create_accumulators(&aggr_expr)?;
let memory_consumer = MemoryConsumerProxy::new(
- "AggregationState",
+ "GroupBy None Accumulators",
MemoryConsumerId::new(partition),
Arc::clone(&context.runtime_env().memory_manager),
);
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index f0311f088..c73fa3da0 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -144,7 +144,7 @@ impl GroupedHashAggregateStreamV2 {
let aggr_state = AggregationState {
memory_consumer: MemoryConsumerProxy::new(
- "AggregationState",
+ "GroupBy Hash (Row) AggregationState",
MemoryConsumerId::new(partition),
Arc::clone(&context.runtime_env().memory_manager),
),
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index bfc33a954..0b3be0906 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -118,6 +118,7 @@ impl ExternalSorter {
) -> Result<()> {
if input.num_rows() > 0 {
let size = batch_byte_size(&input);
+ debug!("Inserting {} rows of {} bytes", input.num_rows(), size);
self.try_grow(size).await?;
self.metrics.mem_used().add(size);
let mut in_mem_batches = self.in_mem_batches.lock().await;
@@ -272,6 +273,13 @@ impl MemoryConsumer for ExternalSorter {
}
async fn spill(&self) -> Result<usize> {
+ let partition = self.partition_id();
+ let mut in_mem_batches = self.in_mem_batches.lock().await;
+ // we could always get a chance to free some memory as long as we are holding some
+ if in_mem_batches.len() == 0 {
+ return Ok(0);
+ }
+
debug!(
"{}[{}] spilling sort data of {} to disk while inserting ({} time(s) so far)",
self.name(),
@@ -280,13 +288,6 @@ impl MemoryConsumer for ExternalSorter {
self.spill_count()
);
- let partition = self.partition_id();
- let mut in_mem_batches = self.in_mem_batches.lock().await;
- // we could always get a chance to free some memory as long as we are holding some
- if in_mem_batches.len() == 0 {
- return Ok(0);
- }
-
let tracking_metrics = self
.metrics_set
.new_intermediate_tracking(partition, self.runtime.clone());
diff --git a/datafusion/core/tests/join_fuzz.rs b/datafusion/core/tests/join_fuzz.rs
index 1204b4428..40966843c 100644
--- a/datafusion/core/tests/join_fuzz.rs
+++ b/datafusion/core/tests/join_fuzz.rs
@@ -21,8 +21,7 @@ use arrow::array::{ArrayRef, Int32Array};
use arrow::compute::SortOptions;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
-use rand::rngs::StdRng;
-use rand::{Rng, SeedableRng};
+use rand::Rng;
use datafusion::physical_plan::collect;
use datafusion::physical_plan::expressions::Column;
@@ -31,7 +30,7 @@ use datafusion::physical_plan::memory::MemoryExec;
use datafusion_expr::JoinType;
use datafusion::prelude::{SessionConfig, SessionContext};
-use test_utils::add_empty_batches;
+use test_utils::stagger_batch_with_seed;
#[tokio::test]
async fn test_inner_join_1k() {
@@ -200,7 +199,7 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
let input4 = Int32Array::from_iter_values(input4.into_iter());
// split into several record batches
- let mut remainder = RecordBatch::try_from_iter(vec![
+ let batch = RecordBatch::try_from_iter(vec![
("a", Arc::new(input1) as ArrayRef),
("b", Arc::new(input2) as ArrayRef),
("x", Arc::new(input3) as ArrayRef),
@@ -208,16 +207,6 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
])
.unwrap();
- let mut batches = vec![];
-
// use a random number generator to pick a random sized output
- let mut rng = StdRng::seed_from_u64(42);
- while remainder.num_rows() > 0 {
- let batch_size = rng.gen_range(0..remainder.num_rows() + 1);
-
- batches.push(remainder.slice(0, batch_size));
- remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
- }
-
- add_empty_batches(batches, &mut rng)
+ stagger_batch_with_seed(batch, 42)
}
diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs
new file mode 100644
index 000000000..20ad555d6
--- /dev/null
+++ b/datafusion/core/tests/memory_limit.rs
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains tests for limiting memory at runtime in DataFusion
+
+use std::sync::Arc;
+
+use arrow::record_batch::RecordBatch;
+use datafusion::datasource::MemTable;
+use datafusion::execution::disk_manager::DiskManagerConfig;
+use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion_common::assert_contains;
+
+use datafusion::prelude::{SessionConfig, SessionContext};
+use test_utils::{stagger_batch, AccessLogGenerator};
+
+#[cfg(test)]
+#[ctor::ctor]
+fn init() {
+ let _ = env_logger::try_init();
+}
+
+#[tokio::test]
+async fn oom_sort() {
+ run_limit_test(
+ "select * from t order by host DESC",
+ "Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)",
+ )
+ .await
+}
+
+#[tokio::test]
+async fn group_by_none() {
+ run_limit_test(
+ "select median(image) from t",
+ "Resources exhausted: Cannot spill GroupBy None Accumulators",
+ )
+ .await
+}
+
+#[tokio::test]
+async fn group_by_row_hash() {
+ run_limit_test(
+ "select count(*) from t GROUP BY response_bytes",
+ "Resources exhausted: Cannot spill GroupBy Hash (Row) AggregationState",
+ )
+ .await
+}
+
+#[tokio::test]
+async fn group_by_hash() {
+ run_limit_test(
+ // group by dict column
+ "select count(*) from t GROUP BY service, host, pod, container",
+ "Resources exhausted: Cannot spill GroupBy Hash Accumulators",
+ )
+ .await
+}
+
+/// 50 byte memory limit
+const MEMORY_LIMIT_BYTES: usize = 50;
+const MEMORY_FRACTION: f64 = 0.95;
+
+/// runs the specified query against 1000 rows with a 50
+/// byte memory limit and no disk manager enabled.
+async fn run_limit_test(query: &str, expected_error: &str) {
+ let generator = AccessLogGenerator::new().with_row_limit(Some(1000));
+
+ let batches: Vec<RecordBatch> = generator
+ // split up into more than one batch, as the size limit in sort is not enforced until the second batch
+ .flat_map(stagger_batch)
+ .collect();
+
+ let table = MemTable::try_new(batches[0].schema(), vec![batches]).unwrap();
+
+ let rt_config = RuntimeConfig::new()
+ // do not allow spilling
+ .with_disk_manager(DiskManagerConfig::Disabled)
+ // Only allow 50 bytes
+ .with_memory_limit(MEMORY_LIMIT_BYTES, MEMORY_FRACTION);
+
+ let runtime = RuntimeEnv::new(rt_config).unwrap();
+
+ let ctx = SessionContext::with_config_rt(SessionConfig::new(), Arc::new(runtime));
+ ctx.register_table("t", Arc::new(table))
+ .expect("registering table");
+
+ let df = ctx.sql(query).await.expect("Planning query");
+
+ match df.collect().await {
+ Ok(_batches) => {
+ panic!("Unexpected success when running, expected memory limit failure")
+ }
+ Err(e) => {
+ assert_contains!(e.to_string(), expected_error);
+ }
+ }
+}
diff --git a/datafusion/core/tests/merge_fuzz.rs b/datafusion/core/tests/merge_fuzz.rs
index 2280cdeb6..64738c3ff 100644
--- a/datafusion/core/tests/merge_fuzz.rs
+++ b/datafusion/core/tests/merge_fuzz.rs
@@ -30,8 +30,7 @@ use datafusion::physical_plan::{
sorts::sort_preserving_merge::SortPreservingMergeExec,
};
use datafusion::prelude::{SessionConfig, SessionContext};
-use rand::{prelude::StdRng, Rng, SeedableRng};
-use test_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};
+use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch_with_seed};
#[tokio::test]
async fn test_merge_2() {
@@ -151,21 +150,10 @@ fn make_staggered_batches(low: i32, high: i32, seed: u64) -> Vec<RecordBatch> {
let input: Int32Array = (low..high).map(Some).collect();
// split into several record batches
- let mut remainder =
+ let batch =
RecordBatch::try_from_iter(vec![("x", Arc::new(input) as ArrayRef)]).unwrap();
- let mut batches = vec![];
-
- // use a random number generator to pick a random sized output
- let mut rng = StdRng::seed_from_u64(seed);
- while remainder.num_rows() > 0 {
- let batch_size = rng.gen_range(0..remainder.num_rows() + 1);
-
- batches.push(remainder.slice(0, batch_size));
- remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
- }
-
- add_empty_batches(batches, &mut rng)
+ stagger_batch_with_seed(batch, seed)
}
fn concat(mut v1: Vec<RecordBatch>, v2: Vec<RecordBatch>) -> Vec<RecordBatch> {
diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs
index ad39630af..cc700d5d2 100644
--- a/datafusion/core/tests/order_spill_fuzz.rs
+++ b/datafusion/core/tests/order_spill_fuzz.rs
@@ -29,10 +29,9 @@ use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
-use rand::prelude::StdRng;
-use rand::{Rng, SeedableRng};
+use rand::Rng;
use std::sync::Arc;
-use test_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};
+use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch_with_seed};
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
@@ -116,19 +115,7 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
let input = Int32Array::from_iter_values(input.into_iter());
// split into several record batches
- let mut remainder =
+ let batch =
RecordBatch::try_from_iter(vec![("x", Arc::new(input) as ArrayRef)]).unwrap();
-
- let mut batches = vec![];
-
- // use a random number generator to pick a random sized output
- let mut rng = StdRng::seed_from_u64(42);
- while remainder.num_rows() > 0 {
- let batch_size = rng.gen_range(0..remainder.num_rows() + 1);
-
- batches.push(remainder.slice(0, batch_size));
- remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
- }
-
- add_empty_batches(batches, &mut rng)
+ stagger_batch_with_seed(batch, 42)
}
diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs
index 5c3b64574..4002a49cf 100644
--- a/test-utils/src/lib.rs
+++ b/test-utils/src/lib.rs
@@ -19,7 +19,7 @@
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_int32_array;
use rand::prelude::StdRng;
-use rand::Rng;
+use rand::{Rng, SeedableRng};
mod data_gen;
@@ -50,10 +50,7 @@ pub fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) -> Vec<Option<i
}
/// Adds a random number of empty record batches into the stream
-pub fn add_empty_batches(
- batches: Vec<RecordBatch>,
- rng: &mut StdRng,
-) -> Vec<RecordBatch> {
+fn add_empty_batches(batches: Vec<RecordBatch>, rng: &mut StdRng) -> Vec<RecordBatch> {
let schema = batches[0].schema();
batches
@@ -68,3 +65,28 @@ pub fn add_empty_batches(
})
.collect()
}
+
+/// "stagger" batches: split the batches into random sized batches
+pub fn stagger_batch(batch: RecordBatch) -> Vec<RecordBatch> {
+ let seed = 42;
+ stagger_batch_with_seed(batch, seed)
+}
+
+/// "stagger" batches: split the batches into random sized batches
+/// using the specified value for a rng seed
+pub fn stagger_batch_with_seed(batch: RecordBatch, seed: u64) -> Vec<RecordBatch> {
+ let mut batches = vec![];
+
+ // use a random number generator to pick a random sized output
+ let mut rng = StdRng::seed_from_u64(seed);
+
+ let mut remainder = batch;
+ while remainder.num_rows() > 0 {
+ let batch_size = rng.gen_range(0..remainder.num_rows() + 1);
+
+ batches.push(remainder.slice(0, batch_size));
+ remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
+ }
+
+ add_empty_batches(batches, &mut rng)
+}