You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/31 06:48:30 UTC

[4/5] impala git commit: IMPALA-7296: bytes limit for row batch queue

IMPALA-7296: bytes limit for row batch queue

https://goo.gl/N9LgQt summarises the memory problems I'm trying to solve
here.

Limit the number of enqueued row batches to a number of bytes,
instead of limiting the total number of batches. This helps
avoid pathologically high memory consumption for wide rows where the #
batches limit does not effectively limit the memory consumption.

The bytes limit only lowers the effective capacity of the queue
for wider rows, typically 150 bytes or wider. These are the
cases when we want to reduce the queue's capacity.

E.g. on a system with 10 disks, the previous sizing gave a queue
of 100 batches. If we assume rows with 10x16 byte columns, then
100 batches is ~16MB of data.

Remove RowBatchQueueCapacity counter that is less relevant now
and was not correctly initialised.

Testing:
Added some basic unit tests.

Add regression test that fails reliably before this change.

Ran exhaustive build.

Change-Id: Iaa06d1d8da2a6d101efda08f620c0bf84a71e681
Reviewed-on: http://gerrit.cloudera.org:8080/10977
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3f8375d3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3f8375d3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3f8375d3

Branch: refs/heads/master
Commit: 3f8375d3e642554b5506f3e731f94e6328fcbcf9
Parents: 47b4606
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Jul 16 00:30:42 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 31 02:45:21 2018 +0000

----------------------------------------------------------------------
 be/src/exec/scan-node.cc                   |  8 ++-
 be/src/exec/scan-node.h                    |  3 -
 be/src/runtime/row-batch-queue.cc          |  4 +-
 be/src/runtime/row-batch-queue.h           | 21 +++++-
 be/src/util/blocking-queue-test.cc         | 58 +++++++++++++++--
 be/src/util/blocking-queue.h               | 85 +++++++++++++++++++++----
 tests/common/test_dimensions.py            |  5 ++
 tests/query_test/test_mem_usage_scaling.py | 49 ++++++++++++++
 8 files changed, 206 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3f8375d3/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 4d59eed..85e1953 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -42,6 +42,9 @@ DEFINE_int32(runtime_filter_wait_time_ms, 1000, "(Advanced) the maximum time, in
 DEFINE_int32_hidden(max_queued_row_batches_per_scanner_thread, 5,
     "(Advanced) the maximum number of queued row batches per scanner thread.");
 
+DEFINE_int64(max_queued_row_batch_bytes, 16L * 1024 * 1024,
+    "(Advanced) the maximum bytes of queued rows per multithreaded scan node.");
+
 using boost::algorithm::join;
 
 namespace impala {
@@ -202,8 +205,6 @@ void ScanNode::ScannerThreadState::Prepare(ScanNode* parent) {
       ADD_COUNTER(profile, "RowBatchBytesEnqueued", TUnit::BYTES);
   row_batches_get_timer_ = ADD_TIMER(profile, "RowBatchQueueGetWaitTime");
   row_batches_put_timer_ = ADD_TIMER(profile, "RowBatchQueuePutWaitTime");
-  row_batches_max_capacity_ =
-      profile->AddHighWaterMarkCounter("RowBatchQueueCapacity", TUnit::UNIT);
   row_batches_peak_mem_consumption_ =
       ADD_COUNTER(profile, "RowBatchQueuePeakMemoryUsage", TUnit::BYTES);
 }
@@ -240,7 +241,8 @@ void ScanNode::ScannerThreadState::Open(
   VLOG_QUERY << "Max row batch queue size for scan node '" << parent->id()
       << "' in fragment instance '" << PrintId(state->fragment_instance_id())
       << "': " << max_row_batches;
-  batch_queue_.reset(new RowBatchQueue(max_row_batches));
+  batch_queue_.reset(
+      new RowBatchQueue(max_row_batches, FLAGS_max_queued_row_batch_bytes));
 
   // Start measuring the scanner thread concurrency only once the node is opened.
   average_concurrency_ = parent->runtime_profile()->AddSamplingCounter(

http://git-wip-us.apache.org/repos/asf/impala/blob/3f8375d3/be/src/exec/scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 1d0728c..9f47c45 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -303,9 +303,6 @@ class ScanNode : public ExecNode {
     /// The wait time for enqueuing a row batch into the row batch queue.
     RuntimeProfile::Counter* row_batches_put_timer_ = nullptr;
 
-    /// Maximum capacity of the row batch queue.
-    RuntimeProfile::HighWaterMarkCounter* row_batches_max_capacity_ = nullptr;
-
     /// Peak memory consumption of the materialized batch queue. Updated in Close().
     RuntimeProfile::Counter* row_batches_peak_mem_consumption_ = nullptr;
   };

http://git-wip-us.apache.org/repos/asf/impala/blob/3f8375d3/be/src/runtime/row-batch-queue.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch-queue.cc b/be/src/runtime/row-batch-queue.cc
index 1fd5555..e694338 100644
--- a/be/src/runtime/row-batch-queue.cc
+++ b/be/src/runtime/row-batch-queue.cc
@@ -23,8 +23,8 @@
 
 namespace impala {
 
-RowBatchQueue::RowBatchQueue(int max_batches)
-  : BlockingQueue<unique_ptr<RowBatch>>(max_batches) {}
+RowBatchQueue::RowBatchQueue(int max_batches, int64_t max_bytes)
+  : BlockingQueue<unique_ptr<RowBatch>,RowBatchBytesFn>(max_batches, max_bytes) {}
 
 RowBatchQueue::~RowBatchQueue() {
   DCHECK(cleanup_queue_.empty());

http://git-wip-us.apache.org/repos/asf/impala/blob/3f8375d3/be/src/runtime/row-batch-queue.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch-queue.h b/be/src/runtime/row-batch-queue.h
index bd2f551..79e8293 100644
--- a/be/src/runtime/row-batch-queue.h
+++ b/be/src/runtime/row-batch-queue.h
@@ -21,6 +21,7 @@
 #include <list>
 #include <memory>
 
+#include "runtime/row-batch.h"
 #include "util/blocking-queue.h"
 #include "util/spinlock.h"
 
@@ -28,18 +29,32 @@ namespace impala {
 
 class RowBatch;
 
+/// Functor that returns the bytes in MemPool chunks for a row batch.
+/// Note that we don't include attached BufferPool::BufferHandle objects because this
+/// queue is only used in scan nodes that don't attach buffers.
+struct RowBatchBytesFn {
+  int64_t operator()(const std::unique_ptr<RowBatch>& batch) {
+    return batch->tuple_data_pool()->total_reserved_bytes();
+  }
+};
+
 /// Extends blocking queue for row batches. Row batches have a property that
 /// they must be processed in the order they were produced, even in cancellation
 /// paths. Preceding row batches can contain ptrs to memory in subsequent row batches
 /// and we need to make sure those ptrs stay valid.
 /// Row batches that are added after Shutdown() are queued in a separate "cleanup"
 /// queue, which can be cleaned up during Close().
+///
+/// The queue supports limiting the capacity in terms of bytes enqueued.
+///
 /// All functions are thread safe.
-class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>> {
+class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>, RowBatchBytesFn> {
  public:
-  /// max_batches is the maximum number of row batches that can be queued.
+  /// 'max_batches' is the maximum number of row batches that can be queued.
+  /// 'max_bytes' is the maximum number of bytes of row batches that can be queued (-1
+  /// means no limit).
   /// When the queue is full, producers will block.
-  RowBatchQueue(int max_batches);
+  RowBatchQueue(int max_batches, int64_t max_bytes);
   ~RowBatchQueue();
 
   /// Adds a batch to the queue. This is blocking if the queue is full.

http://git-wip-us.apache.org/repos/asf/impala/blob/3f8375d3/be/src/util/blocking-queue-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/blocking-queue-test.cc b/be/src/util/blocking-queue-test.cc
index f61de21..8a91275 100644
--- a/be/src/util/blocking-queue-test.cc
+++ b/be/src/util/blocking-queue-test.cc
@@ -28,6 +28,14 @@
 
 namespace impala {
 
+/// Functor that returns the size of T.
+template <typename T>
+struct SizeofFn {
+  int64_t operator()(const T& item) {
+    return sizeof(T);
+  }
+};
+
 TEST(BlockingQueueTest, TestBasic) {
   int32_t i;
   BlockingQueue<int32_t> test_queue(5);
@@ -67,12 +75,49 @@ TEST(BlockingQueueTest, TestPutWithTimeout) {
   ASSERT_TRUE(test_queue.BlockingPutWithTimeout(3, timeout_micros));
 }
 
+TEST(BlockingQueueTest, TestBytesLimit) {
+  // 10 bytes => limit of 2 elements
+  BlockingQueue<int32_t, SizeofFn<int32_t>> test_queue(1000, 10);
+  int64_t SHORT_TIMEOUT_MICROS = 1 * 1000L; // 1ms
+  int64_t LONG_TIMEOUT_MICROS = 1000L * 1000L * 60L; // 1m
+
+  // First two should succeed.
+  ASSERT_TRUE(test_queue.BlockingPut(1));
+  ASSERT_TRUE(test_queue.BlockingPutWithTimeout(2, SHORT_TIMEOUT_MICROS));
+  EXPECT_EQ(2, test_queue.Size());
+
+  // Put should timeout - no capacity.
+  ASSERT_FALSE(test_queue.BlockingPutWithTimeout(3, SHORT_TIMEOUT_MICROS));
+  EXPECT_EQ(2, test_queue.Size());
+
+  // Test that puts of both types get blocked then unblocked when bytes are
+  // removed from queue.
+  thread put_thread([&] () { test_queue.BlockingPut(4); });
+  thread put_with_timeout_thread([&] () {
+    test_queue.BlockingPutWithTimeout(4, LONG_TIMEOUT_MICROS);
+  });
+  EXPECT_EQ(2, test_queue.Size());
+  int32_t v;
+  EXPECT_TRUE(test_queue.BlockingGet(&v));
+  EXPECT_EQ(1, v);
+  EXPECT_TRUE(test_queue.BlockingGet(&v));
+  EXPECT_EQ(2, v);
+  EXPECT_TRUE(test_queue.BlockingGet(&v));
+  EXPECT_EQ(4, v);
+  EXPECT_TRUE(test_queue.BlockingGet(&v));
+  EXPECT_EQ(4, v);
+
+  put_thread.join();
+  put_with_timeout_thread.join();
+}
+
+template <typename ElemBytesFn>
 class MultiThreadTest { // NOLINT: members are not arranged for minimal padding
  public:
-  MultiThreadTest()
+  MultiThreadTest(int64_t bytes_limit = -1)
     : iterations_(10000),
       nthreads_(5),
-      queue_(iterations_*nthreads_/10),
+      queue_(iterations_*nthreads_/10, bytes_limit),
       num_inserters_(nthreads_) {
   }
 
@@ -134,7 +179,7 @@ class MultiThreadTest { // NOLINT: members are not arranged for minimal padding
 
   int iterations_;
   int nthreads_;
-  BlockingQueue<int32_t> queue_;
+  BlockingQueue<int32_t, ElemBytesFn> queue_;
   // Lock for gotten_ and num_inserters_.
   mutex lock_;
   // Map from inserter thread id to number of consumed elements from that id.
@@ -148,7 +193,12 @@ class MultiThreadTest { // NOLINT: members are not arranged for minimal padding
 };
 
 TEST(BlockingQueueTest, TestMultipleThreads) {
-  MultiThreadTest test;
+  MultiThreadTest<ByteLimitDisabledFn<int32_t>> test;
+  test.Run();
+}
+
+TEST(BlockingQueueTest, TestMultipleThreadsWithBytesLimit) {
+  MultiThreadTest<SizeofFn<int32_t>> test(100);
   test.Run();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3f8375d3/be/src/util/blocking-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h
index 8b07dc5..34c2453 100644
--- a/be/src/util/blocking-queue.h
+++ b/be/src/util/blocking-queue.h
@@ -34,28 +34,46 @@
 
 namespace impala {
 
+/// Default functor that always returns 0 bytes. This disables the byte limit
+/// functionality for the queue.
+template <typename T>
+struct ByteLimitDisabledFn {
+  int64_t operator()(const T& item) {
+    return 0;
+  }
+};
+
 /// Fixed capacity FIFO queue, where both BlockingGet() and BlockingPut() operations block
 /// if the queue is empty or full, respectively.
 ///
+/// The queue always has a hard maximum capacity of elements. It also has an optional
+/// limit on the bytes enqueued. This limit is a soft limit - one element can always be
+/// enqueued regardless of the size in bytes. In order to use the bytes limit, the queue
+/// must be instantiated with a functor that returns the size in bytes of an enqueued
+/// item. The functor is invoked multiple times and must always return the same value for
+/// the same item.
+///
 /// FIFO is made up of a 'get_list_' that BlockingGet() consumes from and a 'put_list_'
 /// that BlockingPut() enqueues into. They are protected by 'get_lock_' and 'put_lock_'
 /// respectively. If both locks need to be held at the same time, 'get_lock_' must be
 /// held before 'put_lock_'. When the 'get_list_' is empty, the caller of BlockingGet()
 /// will atomically swap the 'put_list_' with 'get_list_'. The swapping happens with both
 /// the 'get_lock_' and 'put_lock_' held.
-template <typename T>
+template <typename T, typename ElemBytesFn = ByteLimitDisabledFn<T>>
 class BlockingQueue : public CacheLineAligned {
  public:
-  BlockingQueue(size_t max_elements)
+  BlockingQueue(size_t max_elements, int64_t max_bytes = -1)
     : shutdown_(false),
       max_elements_(max_elements),
       total_put_wait_time_(0),
       get_list_size_(0),
-      total_get_wait_time_(0) {
+      total_get_wait_time_(0),
+      max_bytes_(max_bytes) {
+    DCHECK(max_bytes == -1 || max_bytes > 0) << max_bytes;
     DCHECK_GT(max_elements_, 0);
     // Make sure class members commonly used in BlockingPut() don't alias with class
-    // members used in BlockingGet(). 'pad_' is the point of division.
-    DCHECK_NE(offsetof(BlockingQueue, pad_) / 64,
+    // members used in BlockingGet(). 'put_bytes_enqueued_' is the point of division.
+    DCHECK_NE(offsetof(BlockingQueue, put_bytes_enqueued_) / 64,
         offsetof(BlockingQueue, get_lock_) / 64);
   }
 
@@ -96,11 +114,20 @@ class BlockingQueue : public CacheLineAligned {
     get_list_.pop_front();
     get_list_size_.Store(get_list_.size());
     read_lock.unlock();
+    int64_t val_bytes = ElemBytesFn()(*out);
+    DCHECK_GE(val_bytes, 0);
+    get_bytes_dequeued_.Add(val_bytes);
     // Note that there is a race with any writer if NotifyOne() is called between when
     // a writer checks the queue size and when it calls put_cv_.Wait(). If this race
     // occurs, a writer can stay blocked even if the queue is not full until the next
     // BlockingGet(). The race is benign correctness wise as BlockingGet() will always
     // notify a writer with 'put_lock_' held when both lists are empty.
+    //
+    // Relatedly, if multiple writers hit the bytes limit of the queue and queue elements
+    // vary in size, we may not immediately unblock all writers. E.g. if two writers are
+    // waiting to enqueue elements of N bytes and we dequeue an element of 2N bytes, we
+    // could wake up both writers but actually only wake up one. This is also benign
+    // correctness-wise because we will continue to make progress.
     put_cv_.NotifyOne();
     return true;
   }
@@ -112,9 +139,10 @@ class BlockingQueue : public CacheLineAligned {
   template <typename V>
   bool BlockingPut(V&& val) {
     MonotonicStopWatch timer;
+    int64_t val_bytes = ElemBytesFn()(val);
+    DCHECK_GE(val_bytes, 0);
     boost::unique_lock<boost::mutex> write_lock(put_lock_);
-
-    while (SizeLocked(write_lock) >= max_elements_ && !shutdown_) {
+    while (!HasCapacityInternal(write_lock, val_bytes) && !shutdown_) {
       timer.Start();
       put_cv_.Wait(write_lock);
       timer.Stop();
@@ -123,6 +151,7 @@ class BlockingQueue : public CacheLineAligned {
     if (UNLIKELY(shutdown_)) return false;
 
     DCHECK_LT(put_list_.size(), max_elements_);
+    put_bytes_enqueued_ += val_bytes;
     Put(std::forward<V>(val));
     write_lock.unlock();
     get_cv_.NotifyOne();
@@ -137,11 +166,13 @@ class BlockingQueue : public CacheLineAligned {
   template <typename V>
   bool BlockingPutWithTimeout(V&& val, int64_t timeout_micros) {
     MonotonicStopWatch timer;
+    int64_t val_bytes = ElemBytesFn()(val);
+    DCHECK_GE(val_bytes, 0);
     boost::unique_lock<boost::mutex> write_lock(put_lock_);
     timespec abs_time;
     TimeFromNowMicros(timeout_micros, &abs_time);
     bool notified = true;
-    while (SizeLocked(write_lock) >= max_elements_ && !shutdown_ && notified) {
+    while (!HasCapacityInternal(write_lock, val_bytes) && !shutdown_ && notified) {
       timer.Start();
       // Wait until we're notified or until the timeout expires.
       notified = put_cv_.WaitUntil(write_lock, abs_time);
@@ -152,8 +183,9 @@ class BlockingQueue : public CacheLineAligned {
     // NOTE: We don't check 'notified' here as it appears that pthread condition variables
     // have a weird behavior in which they can return ETIMEDOUT from timed_wait even if
     // another thread did in fact signal
-    if (SizeLocked(write_lock) >= max_elements_ || shutdown_) return false;
+    if (!HasCapacityInternal(write_lock, val_bytes)) return false;
     DCHECK_LT(put_list_.size(), max_elements_);
+    put_bytes_enqueued_ += val_bytes;
     Put(std::forward<V>(val));
     write_lock.unlock();
     get_cv_.NotifyOne();
@@ -203,6 +235,26 @@ class BlockingQueue : public CacheLineAligned {
     return get_list_size_.Load() + put_list_.size();
   }
 
+  /// Return true if the queue has capacity to add one more element with size 'val_bytes'.
+  /// Caller must hold 'put_lock_' via 'lock'.
+  bool HasCapacityInternal(
+      const boost::unique_lock<boost::mutex>& lock, int64_t val_bytes) {
+    DCHECK(lock.mutex() == &put_lock_ && lock.owns_lock());
+    uint32_t size = SizeLocked(lock);
+    if (size >= max_elements_) return false;
+    if (val_bytes == 0 || max_bytes_ == -1 || size == 0) return true;
+
+    // At this point we can enqueue the item if there is sufficient bytes capacity.
+    if (put_bytes_enqueued_ + val_bytes <= max_bytes_) return true;
+
+    // No bytes capacity left - swap over dequeued bytes to account for elements the
+    // consumer has dequeued. All decrementers of 'get_bytes_dequeued_' hold 'put_lock_'
+    // races with other decrementers are impossible.
+    int64_t dequeued = get_bytes_dequeued_.Swap(0);
+    put_bytes_enqueued_ -= dequeued;
+    return put_bytes_enqueued_ + val_bytes <= max_bytes_;
+  }
+
   /// Overloads for inserting an item into the list, depending on whether it should be
   /// moved or copied.
   void Put(const T& val) { put_list_.push_back(val); }
@@ -227,9 +279,10 @@ class BlockingQueue : public CacheLineAligned {
   /// Total amount of time threads blocked in BlockingPut(). Guarded by 'put_lock_'.
   int64_t total_put_wait_time_;
 
-  /// Padding to avoid data structures used in BlockingGet() to share cache lines
-  /// with data structures used in BlockingPut().
-  int64_t pad_;
+  /// Running counter for bytes enqueued, incremented through the producer thread.
+  /// Decremented by transferring value from 'get_bytes_dequeued_'.
+  /// Guarded by 'put_lock_'
+  int64_t put_bytes_enqueued_ = 0;
 
   /// Guards against concurrent access to 'get_list_'.
   mutable boost::mutex get_lock_;
@@ -249,6 +302,14 @@ class BlockingQueue : public CacheLineAligned {
   /// variable doesn't include the time which other threads block waiting for 'get_lock_'.
   int64_t total_get_wait_time_;
 
+  /// Running count of bytes dequeued. Decremented from 'put_bytes_enqueued_' when it
+  /// exceeds the queue capacity. Kept separate from 'put_bytes_enqueued_' so that
+  /// producers and consumers are not updating the same cache line for every put and get.
+  /// Decrementers must hold 'put_lock_'.
+  AtomicInt64 get_bytes_dequeued_{0};
+
+  /// Soft limit on total bytes in queue. -1 if no limit.
+  const int64_t max_bytes_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3f8375d3/tests/common/test_dimensions.py
----------------------------------------------------------------------
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index 0460ea7..af1f8e1 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -108,6 +108,11 @@ def create_parquet_dimension(workload):
   return ImpalaTestDimension('table_format',
       TableFormatInfo.create_from_string(dataset, 'parquet/none'))
 
+def create_avro_snappy_dimension(workload):
+  dataset = get_dataset_from_workload(workload)
+  return ImpalaTestDimension('table_format',
+      TableFormatInfo.create_from_string(dataset, 'avro/snap/block'))
+
 # Common sets of values for the exec option vectors
 ALL_BATCH_SIZES = [0]
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3f8375d3/tests/query_test/test_mem_usage_scaling.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_mem_usage_scaling.py b/tests/query_test/test_mem_usage_scaling.py
index 12316db..6535feb 100644
--- a/tests/query_test/test_mem_usage_scaling.py
+++ b/tests/query_test/test_mem_usage_scaling.py
@@ -19,6 +19,7 @@ import pytest
 from copy import copy
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.test_dimensions import create_avro_snappy_dimension
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfNotHdfsMinicluster
@@ -300,3 +301,51 @@ class TestTpcdsMemLimitError(TestLowMemoryLimits):
   def test_low_mem_limit_q53(self, vector):
     self.low_memory_limit_test(
         vector, 'tpcds-decimal_v2-q53', self.MIN_MEM_FOR_TPCDS['q53'])
+
+
+@SkipIfNotHdfsMinicluster.tuned_for_minicluster
+class TestScanMemLimit(ImpalaTestSuite):
+  """Targeted test for scan memory limits."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestScanMemLimit, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+    cls.ImpalaTestMatrix.add_dimension(create_avro_snappy_dimension(cls.get_workload()))
+
+  def test_wide_avro_mem_usage(self, vector, unique_database):
+    """Create a wide avro table with large strings and test scans that can cause OOM."""
+    if self.exploration_strategy() != 'exhaustive':
+      pytest.skip("only run resource-intensive query on exhaustive")
+    NUM_COLS = 250
+    NUM_ROWS = 50000
+    TBL = "wide_250_cols"
+    # This query caused OOM with the below memory limit before the IMPALA-7296 fix.
+    # When the sort starts to spill it causes row batches to accumulate rapidly in the
+    # scan node's queue.
+    SELECT_QUERY = """select * from {0}.{1} order by col224 limit 100""".format(
+        unique_database, TBL)
+    # Use disable_outermost_topn to enable spilling sort but prevent returning excessive
+    # rows. Limit NUM_SCANNER_THREADS to avoid higher memory consumption on systems with
+    # many cores (each scanner thread uses some memory in addition to the queued memory).
+    SELECT_OPTIONS = {
+        'mem_limit': "256MB", 'disable_outermost_topn': True, "NUM_SCANNER_THREADS": 1}
+    self.execute_query_expect_success(self.client,
+        "create table {0}.{1} ({2}) stored as avro".format(unique_database, TBL,
+         ",".join(["col{0} STRING".format(i) for i in range(NUM_COLS)])))
+    self.run_stmt_in_hive("""
+        SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
+        SET mapred.output.compression.type=BLOCK;
+        SET hive.exec.compress.output=true;
+        SET avro.output.codec=snappy;
+        insert into {0}.{1} select {2} from tpch_parquet.lineitem
+        limit {3}
+        """.format(unique_database, TBL, ','.join(['l_comment'] * NUM_COLS), NUM_ROWS))
+    self.execute_query_expect_success(self.client,
+        "refresh {0}.{1}".format(unique_database, TBL))
+
+    self.execute_query_expect_success(self.client, SELECT_QUERY, SELECT_OPTIONS)