You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/29 20:07:15 UTC

[arrow-datafusion] branch master updated: Add integration test for erroring when memory limits are hit (#4406)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 49166ea55 Add integration test for erroring when memory limits are hit (#4406)
49166ea55 is described below

commit 49166ea55f317722ab7a37fbfc253bcd497c1672
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Nov 29 15:07:10 2022 -0500

    Add integration test for erroring when memory limits are hit (#4406)
    
    * Add test for runtime memory limiting
    
    * Update datafusion/core/tests/memory_limit.rs
---
 .../core/src/execution/memory_manager/proxy.rs     |   7 +-
 .../core/src/physical_plan/aggregates/hash.rs      |   2 +-
 .../src/physical_plan/aggregates/no_grouping.rs    |   2 +-
 .../core/src/physical_plan/aggregates/row_hash.rs  |   2 +-
 datafusion/core/src/physical_plan/sorts/sort.rs    |  15 +--
 datafusion/core/tests/join_fuzz.rs                 |  19 +---
 datafusion/core/tests/memory_limit.rs              | 112 +++++++++++++++++++++
 datafusion/core/tests/merge_fuzz.rs                |  18 +---
 datafusion/core/tests/order_spill_fuzz.rs          |  21 +---
 test-utils/src/lib.rs                              |  32 +++++-
 10 files changed, 165 insertions(+), 65 deletions(-)

diff --git a/datafusion/core/src/execution/memory_manager/proxy.rs b/datafusion/core/src/execution/memory_manager/proxy.rs
index 6ea52e909..2a5bd2507 100644
--- a/datafusion/core/src/execution/memory_manager/proxy.rs
+++ b/datafusion/core/src/execution/memory_manager/proxy.rs
@@ -88,9 +88,10 @@ impl MemoryConsumer for MemoryConsumerProxy {
     }
 
     async fn spill(&self) -> Result<usize, DataFusionError> {
-        Err(DataFusionError::ResourcesExhausted(
-            "Cannot spill AggregationState".to_owned(),
-        ))
+        Err(DataFusionError::ResourcesExhausted(format!(
+            "Cannot spill {}",
+            self.name
+        )))
     }
 
     fn mem_used(&self) -> usize {
diff --git a/datafusion/core/src/physical_plan/aggregates/hash.rs b/datafusion/core/src/physical_plan/aggregates/hash.rs
index d3d5a337e..8bf929630 100644
--- a/datafusion/core/src/physical_plan/aggregates/hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/hash.rs
@@ -135,7 +135,7 @@ impl GroupedHashAggregateStream {
             aggregate_expressions,
             accumulators: Accumulators {
                 memory_consumer: MemoryConsumerProxy::new(
-                    "Accumulators",
+                    "GroupBy Hash Accumulators",
                     MemoryConsumerId::new(partition),
                     Arc::clone(&context.runtime_env().memory_manager),
                 ),
diff --git a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
index 8c3556bb6..64cc4f569 100644
--- a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
+++ b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
@@ -73,7 +73,7 @@ impl AggregateStream {
         let aggregate_expressions = aggregate_expressions(&aggr_expr, &mode, 0)?;
         let accumulators = create_accumulators(&aggr_expr)?;
         let memory_consumer = MemoryConsumerProxy::new(
-            "AggregationState",
+            "GroupBy None Accumulators",
             MemoryConsumerId::new(partition),
             Arc::clone(&context.runtime_env().memory_manager),
         );
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index f0311f088..c73fa3da0 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -144,7 +144,7 @@ impl GroupedHashAggregateStreamV2 {
 
         let aggr_state = AggregationState {
             memory_consumer: MemoryConsumerProxy::new(
-                "AggregationState",
+                "GroupBy Hash (Row) AggregationState",
                 MemoryConsumerId::new(partition),
                 Arc::clone(&context.runtime_env().memory_manager),
             ),
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index bfc33a954..0b3be0906 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -118,6 +118,7 @@ impl ExternalSorter {
     ) -> Result<()> {
         if input.num_rows() > 0 {
             let size = batch_byte_size(&input);
+            debug!("Inserting {} rows of {} bytes", input.num_rows(), size);
             self.try_grow(size).await?;
             self.metrics.mem_used().add(size);
             let mut in_mem_batches = self.in_mem_batches.lock().await;
@@ -272,6 +273,13 @@ impl MemoryConsumer for ExternalSorter {
     }
 
     async fn spill(&self) -> Result<usize> {
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+        // we could always get a chance to free some memory as long as we are holding some
+        if in_mem_batches.len() == 0 {
+            return Ok(0);
+        }
+
         debug!(
             "{}[{}] spilling sort data of {} to disk while inserting ({} time(s) so far)",
             self.name(),
@@ -280,13 +288,6 @@ impl MemoryConsumer for ExternalSorter {
             self.spill_count()
         );
 
-        let partition = self.partition_id();
-        let mut in_mem_batches = self.in_mem_batches.lock().await;
-        // we could always get a chance to free some memory as long as we are holding some
-        if in_mem_batches.len() == 0 {
-            return Ok(0);
-        }
-
         let tracking_metrics = self
             .metrics_set
             .new_intermediate_tracking(partition, self.runtime.clone());
diff --git a/datafusion/core/tests/join_fuzz.rs b/datafusion/core/tests/join_fuzz.rs
index 1204b4428..40966843c 100644
--- a/datafusion/core/tests/join_fuzz.rs
+++ b/datafusion/core/tests/join_fuzz.rs
@@ -21,8 +21,7 @@ use arrow::array::{ArrayRef, Int32Array};
 use arrow::compute::SortOptions;
 use arrow::record_batch::RecordBatch;
 use arrow::util::pretty::pretty_format_batches;
-use rand::rngs::StdRng;
-use rand::{Rng, SeedableRng};
+use rand::Rng;
 
 use datafusion::physical_plan::collect;
 use datafusion::physical_plan::expressions::Column;
@@ -31,7 +30,7 @@ use datafusion::physical_plan::memory::MemoryExec;
 use datafusion_expr::JoinType;
 
 use datafusion::prelude::{SessionConfig, SessionContext};
-use test_utils::add_empty_batches;
+use test_utils::stagger_batch_with_seed;
 
 #[tokio::test]
 async fn test_inner_join_1k() {
@@ -200,7 +199,7 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
     let input4 = Int32Array::from_iter_values(input4.into_iter());
 
     // split into several record batches
-    let mut remainder = RecordBatch::try_from_iter(vec![
+    let batch = RecordBatch::try_from_iter(vec![
         ("a", Arc::new(input1) as ArrayRef),
         ("b", Arc::new(input2) as ArrayRef),
         ("x", Arc::new(input3) as ArrayRef),
@@ -208,16 +207,6 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
     ])
     .unwrap();
 
-    let mut batches = vec![];
-
     // use a random number generator to pick a random sized output
-    let mut rng = StdRng::seed_from_u64(42);
-    while remainder.num_rows() > 0 {
-        let batch_size = rng.gen_range(0..remainder.num_rows() + 1);
-
-        batches.push(remainder.slice(0, batch_size));
-        remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
-    }
-
-    add_empty_batches(batches, &mut rng)
+    stagger_batch_with_seed(batch, 42)
 }
diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs
new file mode 100644
index 000000000..20ad555d6
--- /dev/null
+++ b/datafusion/core/tests/memory_limit.rs
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains tests for limiting memory at runtime in DataFusion
+
+use std::sync::Arc;
+
+use arrow::record_batch::RecordBatch;
+use datafusion::datasource::MemTable;
+use datafusion::execution::disk_manager::DiskManagerConfig;
+use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion_common::assert_contains;
+
+use datafusion::prelude::{SessionConfig, SessionContext};
+use test_utils::{stagger_batch, AccessLogGenerator};
+
+#[cfg(test)]
+#[ctor::ctor]
+fn init() {
+    let _ = env_logger::try_init();
+}
+
+#[tokio::test]
+async fn oom_sort() {
+    run_limit_test(
+        "select * from t order by host DESC",
+        "Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn group_by_none() {
+    run_limit_test(
+        "select median(image) from t",
+        "Resources exhausted: Cannot spill GroupBy None Accumulators",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn group_by_row_hash() {
+    run_limit_test(
+        "select count(*) from t GROUP BY response_bytes",
+        "Resources exhausted: Cannot spill GroupBy Hash (Row) AggregationState",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn group_by_hash() {
+    run_limit_test(
+        // group by dict column
+        "select count(*) from t GROUP BY service, host, pod, container",
+        "Resources exhausted: Cannot spill GroupBy Hash Accumulators",
+    )
+    .await
+}
+
+/// 50 byte memory limit
+const MEMORY_LIMIT_BYTES: usize = 50;
+const MEMORY_FRACTION: f64 = 0.95;
+
+/// runs the specified query against 1000 rows with a 50
+/// byte memory limit and no disk manager enabled.
+async fn run_limit_test(query: &str, expected_error: &str) {
+    let generator = AccessLogGenerator::new().with_row_limit(Some(1000));
+
+    let batches: Vec<RecordBatch> = generator
+        // split up into more than one batch, as the size limit in sort is not enforced until the second batch
+        .flat_map(stagger_batch)
+        .collect();
+
+    let table = MemTable::try_new(batches[0].schema(), vec![batches]).unwrap();
+
+    let rt_config = RuntimeConfig::new()
+        // do not allow spilling
+        .with_disk_manager(DiskManagerConfig::Disabled)
+        // Only allow 50 bytes
+        .with_memory_limit(MEMORY_LIMIT_BYTES, MEMORY_FRACTION);
+
+    let runtime = RuntimeEnv::new(rt_config).unwrap();
+
+    let ctx = SessionContext::with_config_rt(SessionConfig::new(), Arc::new(runtime));
+    ctx.register_table("t", Arc::new(table))
+        .expect("registering table");
+
+    let df = ctx.sql(query).await.expect("Planning query");
+
+    match df.collect().await {
+        Ok(_batches) => {
+            panic!("Unexpected success when running, expected memory limit failure")
+        }
+        Err(e) => {
+            assert_contains!(e.to_string(), expected_error);
+        }
+    }
+}
diff --git a/datafusion/core/tests/merge_fuzz.rs b/datafusion/core/tests/merge_fuzz.rs
index 2280cdeb6..64738c3ff 100644
--- a/datafusion/core/tests/merge_fuzz.rs
+++ b/datafusion/core/tests/merge_fuzz.rs
@@ -30,8 +30,7 @@ use datafusion::physical_plan::{
     sorts::sort_preserving_merge::SortPreservingMergeExec,
 };
 use datafusion::prelude::{SessionConfig, SessionContext};
-use rand::{prelude::StdRng, Rng, SeedableRng};
-use test_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};
+use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch_with_seed};
 
 #[tokio::test]
 async fn test_merge_2() {
@@ -151,21 +150,10 @@ fn make_staggered_batches(low: i32, high: i32, seed: u64) -> Vec<RecordBatch> {
     let input: Int32Array = (low..high).map(Some).collect();
 
     // split into several record batches
-    let mut remainder =
+    let batch =
         RecordBatch::try_from_iter(vec![("x", Arc::new(input) as ArrayRef)]).unwrap();
 
-    let mut batches = vec![];
-
-    // use a random number generator to pick a random sized output
-    let mut rng = StdRng::seed_from_u64(seed);
-    while remainder.num_rows() > 0 {
-        let batch_size = rng.gen_range(0..remainder.num_rows() + 1);
-
-        batches.push(remainder.slice(0, batch_size));
-        remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
-    }
-
-    add_empty_batches(batches, &mut rng)
+    stagger_batch_with_seed(batch, seed)
 }
 
 fn concat(mut v1: Vec<RecordBatch>, v2: Vec<RecordBatch>) -> Vec<RecordBatch> {
diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs
index ad39630af..cc700d5d2 100644
--- a/datafusion/core/tests/order_spill_fuzz.rs
+++ b/datafusion/core/tests/order_spill_fuzz.rs
@@ -29,10 +29,9 @@ use datafusion::physical_plan::memory::MemoryExec;
 use datafusion::physical_plan::sorts::sort::SortExec;
 use datafusion::physical_plan::{collect, ExecutionPlan};
 use datafusion::prelude::{SessionConfig, SessionContext};
-use rand::prelude::StdRng;
-use rand::{Rng, SeedableRng};
+use rand::Rng;
 use std::sync::Arc;
-use test_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};
+use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch_with_seed};
 
 #[tokio::test]
 #[cfg_attr(tarpaulin, ignore)]
@@ -116,19 +115,7 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
     let input = Int32Array::from_iter_values(input.into_iter());
 
     // split into several record batches
-    let mut remainder =
+    let batch =
         RecordBatch::try_from_iter(vec![("x", Arc::new(input) as ArrayRef)]).unwrap();
-
-    let mut batches = vec![];
-
-    // use a random number generator to pick a random sized output
-    let mut rng = StdRng::seed_from_u64(42);
-    while remainder.num_rows() > 0 {
-        let batch_size = rng.gen_range(0..remainder.num_rows() + 1);
-
-        batches.push(remainder.slice(0, batch_size));
-        remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
-    }
-
-    add_empty_batches(batches, &mut rng)
+    stagger_batch_with_seed(batch, 42)
 }
diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs
index 5c3b64574..4002a49cf 100644
--- a/test-utils/src/lib.rs
+++ b/test-utils/src/lib.rs
@@ -19,7 +19,7 @@
 use arrow::record_batch::RecordBatch;
 use datafusion_common::cast::as_int32_array;
 use rand::prelude::StdRng;
-use rand::Rng;
+use rand::{Rng, SeedableRng};
 
 mod data_gen;
 
@@ -50,10 +50,7 @@ pub fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) -> Vec<Option<i
 }
 
 /// Adds a random number of empty record batches into the stream
-pub fn add_empty_batches(
-    batches: Vec<RecordBatch>,
-    rng: &mut StdRng,
-) -> Vec<RecordBatch> {
+fn add_empty_batches(batches: Vec<RecordBatch>, rng: &mut StdRng) -> Vec<RecordBatch> {
     let schema = batches[0].schema();
 
     batches
@@ -68,3 +65,28 @@ pub fn add_empty_batches(
         })
         .collect()
 }
+
+/// "stagger" batches: split the batches into random sized batches
+pub fn stagger_batch(batch: RecordBatch) -> Vec<RecordBatch> {
+    let seed = 42;
+    stagger_batch_with_seed(batch, seed)
+}
+
+/// "stagger" batches: split the batches into random sized batches
+/// using the specified value for a rng seed
+pub fn stagger_batch_with_seed(batch: RecordBatch, seed: u64) -> Vec<RecordBatch> {
+    let mut batches = vec![];
+
+    // use a random number generator to pick a random sized output
+    let mut rng = StdRng::seed_from_u64(seed);
+
+    let mut remainder = batch;
+    while remainder.num_rows() > 0 {
+        let batch_size = rng.gen_range(0..remainder.num_rows() + 1);
+
+        batches.push(remainder.slice(0, batch_size));
+        remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
+    }
+
+    add_empty_batches(batches, &mut rng)
+}