You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2023/12/06 22:59:59 UTC

(impala) branch master updated (7a99449a5 -> d75807a19)

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 7a99449a5 IMPALA-3268: Add support for SHOW VIEWS statement
     new 6b47c40e0 IMPALA-12159: Support ORDER BY for collections of variable length types in select list
     new d75807a19 IMPALA-12385: Enable Periodic metrics by default

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/codegen/codegen-anyval-read-write-info.cc   |   6 +
 be/src/codegen/codegen-anyval-read-write-info.h    |  18 +-
 be/src/runtime/collection-value.cc                 |   1 -
 be/src/runtime/collection-value.h                  |  18 -
 be/src/runtime/descriptors.cc                      | 378 +++++++++++++++++++--
 be/src/runtime/descriptors.h                       | 107 ++++--
 be/src/runtime/exec-env.cc                         |   2 +-
 be/src/runtime/krpc-data-stream-recvr.cc           |   6 +-
 be/src/runtime/query-state.cc                      |  14 +-
 be/src/runtime/raw-value.cc                        |  98 +++++-
 be/src/runtime/raw-value.h                         |  36 +-
 be/src/runtime/sorter-internal.h                   |  64 +++-
 be/src/runtime/sorter.cc                           | 162 +++++----
 be/src/runtime/tuple-ir.cc                         |   3 +
 be/src/runtime/tuple.cc                            | 128 ++++---
 be/src/runtime/tuple.h                             |  78 +++--
 be/src/util/periodic-counter-updater.cc            | 103 ++++--
 be/src/util/periodic-counter-updater.h             |  22 +-
 be/src/util/runtime-profile-counters.h             |  12 +-
 be/src/util/runtime-profile-test.cc                |   3 +-
 be/src/util/runtime-profile.cc                     |  16 +-
 be/src/util/runtime-profile.h                      |  11 +-
 be/src/util/streaming-sampler.h                    |   2 +-
 common/thrift/Query.thrift                         |   2 +-
 .../java/org/apache/impala/analysis/Analyzer.java  |   7 +-
 .../java/org/apache/impala/analysis/QueryStmt.java |  11 +-
 .../java/org/apache/impala/analysis/SortInfo.java  |  56 +--
 .../apache/impala/analysis/TupleDescriptor.java    |  52 +--
 .../org/apache/impala/planner/AnalyticPlanner.java |   8 +-
 .../java/org/apache/impala/planner/UnionNode.java  |   8 +-
 testdata/ComplexTypesTbl/arrays_big.parq           | Bin 0 -> 19445031 bytes
 testdata/ComplexTypesTbl/simple_arrays_big.parq    | Bin 10252963 -> 0 bytes
 testdata/data/README                               |  76 ++++-
 .../functional/functional_schema_template.sql      |  14 +-
 .../datasets/functional/schema_constraints.csv     |   4 +-
 .../QueryTest/nested-array-in-select-list.test     |  86 ++---
 .../QueryTest/nested-map-in-select-list.test       |  54 ++-
 .../QueryTest/partitioned-top-n-complex.test       |  37 +-
 .../queries/QueryTest/sort-complex.test            | 227 ++++++-------
 .../queries/QueryTest/top-n-complex.test           | 171 +++++++---
 tests/query_test/test_observability.py             |  37 +-
 tests/query_test/test_queries.py                   |   1 -
 tests/query_test/test_sort.py                      |  24 +-
 43 files changed, 1487 insertions(+), 676 deletions(-)
 create mode 100644 testdata/ComplexTypesTbl/arrays_big.parq
 delete mode 100644 testdata/ComplexTypesTbl/simple_arrays_big.parq


(impala) 01/02: IMPALA-12159: Support ORDER BY for collections of variable length types in select list

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6b47c40e0dd808793203eeac2994260f0fc47753
Author: Daniel Becker <da...@cloudera.com>
AuthorDate: Tue May 23 19:41:04 2023 +0200

    IMPALA-12159: Support ORDER BY for collections of variable length types in select list
    
    IMPALA-12019 implemented support for collections of fixed length types
    in the sorting tuple. This change implements it for collections of
    variable length types.
    
    Note that the limitation that structs that contain any type of
    collection are not allowed in the sorting tuple is still in place (see
    IMPALA-12160).
    
    Note that it was not and still is not allowed to sort by complex types,
    this change only allows them to be present in the select list when
    sortin by some other expression.
    
    This change also allows collections of variable length types to be
    non-passthrough children of UNION ALL nodes.
    
    Testing:
     - Renamed the 'simple_arrays_big' table to 'arrays_big' and extended it
       with collections containing variable length types. This table is
       mainly used to test that spilling works during sorting.
     - Renamed
       test_sort.py::TestArraySort::{test_simple_arrays,
       test_simple_arrays_with_limit}
       to {test_array_sort,test_array_sort_with_limit}
     - Extended the tests run in test_queries.py::TestQueries::{test_sort,
       test_top_n,test_partitioned_top_n} with collections containing
       var-len types.
     - Added tests in sort-complex.test that assert that it is not allowed
       to sort by collections. For structs we already have such tests in
       struct-in-select-list.test.
    
    Change-Id: Ic15b29393f260b572e11a8dbb9deeb8c02981852
    Reviewed-on: http://gerrit.cloudera.org:8080/20108
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/codegen/codegen-anyval-read-write-info.cc   |   6 +
 be/src/codegen/codegen-anyval-read-write-info.h    |  18 +-
 be/src/runtime/collection-value.cc                 |   1 -
 be/src/runtime/collection-value.h                  |  18 -
 be/src/runtime/descriptors.cc                      | 378 +++++++++++++++++++--
 be/src/runtime/descriptors.h                       | 107 ++++--
 be/src/runtime/raw-value.cc                        |  98 +++++-
 be/src/runtime/raw-value.h                         |  36 +-
 be/src/runtime/sorter-internal.h                   |  64 +++-
 be/src/runtime/sorter.cc                           | 162 +++++----
 be/src/runtime/tuple-ir.cc                         |   3 +
 be/src/runtime/tuple.cc                            | 128 ++++---
 be/src/runtime/tuple.h                             |  78 +++--
 .../java/org/apache/impala/analysis/Analyzer.java  |   7 +-
 .../java/org/apache/impala/analysis/QueryStmt.java |  11 +-
 .../java/org/apache/impala/analysis/SortInfo.java  |  56 +--
 .../apache/impala/analysis/TupleDescriptor.java    |  52 +--
 .../org/apache/impala/planner/AnalyticPlanner.java |   8 +-
 .../java/org/apache/impala/planner/UnionNode.java  |   8 +-
 testdata/ComplexTypesTbl/arrays_big.parq           | Bin 0 -> 19445031 bytes
 testdata/ComplexTypesTbl/simple_arrays_big.parq    | Bin 10252963 -> 0 bytes
 testdata/data/README                               |  76 ++++-
 .../functional/functional_schema_template.sql      |  14 +-
 .../datasets/functional/schema_constraints.csv     |   4 +-
 .../QueryTest/nested-array-in-select-list.test     |  86 ++---
 .../QueryTest/nested-map-in-select-list.test       |  54 ++-
 .../QueryTest/partitioned-top-n-complex.test       |  37 +-
 .../queries/QueryTest/sort-complex.test            | 227 ++++++-------
 .../queries/QueryTest/top-n-complex.test           | 171 +++++++---
 tests/query_test/test_queries.py                   |   1 -
 tests/query_test/test_sort.py                      |  24 +-
 31 files changed, 1339 insertions(+), 594 deletions(-)

diff --git a/be/src/codegen/codegen-anyval-read-write-info.cc b/be/src/codegen/codegen-anyval-read-write-info.cc
index 3e8a231e3..263e3f049 100644
--- a/be/src/codegen/codegen-anyval-read-write-info.cc
+++ b/be/src/codegen/codegen-anyval-read-write-info.cc
@@ -19,6 +19,7 @@
 
 #include "codegen/codegen-anyval.h"
 #include "codegen/llvm-codegen.h"
+#include "llvm/IR/LLVMContext.h"
 
 namespace impala {
 
@@ -39,6 +40,11 @@ void NonWritableBasicBlock::BranchToIfNot(LlvmBuilder* builder,
   builder->CreateCondBr(condition, then_block.basic_block_, basic_block_);
 }
 
+llvm::BasicBlock* NonWritableBasicBlock::CreateBasicBlockBefore(
+    llvm::LLVMContext& context, const std::string& name, llvm::Function* fn) const {
+  return llvm::BasicBlock::Create(context, name, fn, basic_block_);
+}
+
 llvm::Value* CodegenAnyValReadWriteInfo::GetSimpleVal() const {
   llvm::Value* const * val = std::get_if<llvm::Value*>(&data_);
   DCHECK(val != nullptr);
diff --git a/be/src/codegen/codegen-anyval-read-write-info.h b/be/src/codegen/codegen-anyval-read-write-info.h
index d4bbfc849..efc7c678b 100644
--- a/be/src/codegen/codegen-anyval-read-write-info.h
+++ b/be/src/codegen/codegen-anyval-read-write-info.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <string>
 #include <vector>
 #include <variant>
 
@@ -25,6 +26,8 @@
 namespace llvm {
 class ConstantStruct;
 class BasicBlock;
+class Function;
+class LLVMContext;
 class PHINode;
 class Value;
 }
@@ -36,12 +39,15 @@ class LlvmBuilder;
 class LlvmCodeGen;
 
 /// This class wraps an 'llvm::BasicBlock*' and provides a const interface to it extended
-/// with the possibility of branching to it (either conditionally or not).
-/// This is useful for the entry blocks of 'CodegenAnyValReadWriteInfo' objects as we do
-/// not want these blocks to be writable but we want to be able to branch to them.
+/// with the possibility of branching to it (either conditionally or not) and creating
+/// blocks before it.
+/// This is useful for example for the entry blocks of 'CodegenAnyValReadWriteInfo'
+/// objects as we do not want these blocks to be writable but we want to be able to branch
+/// to them.
 ///
 /// We cannot use a simple const pointer because the branching functions
-/// 'LlvmBuilder::Create[Cond]Br()' take a non-const pointer.
+/// 'LlvmBuilder::Create[Cond]Br()' and 'llvm::BasicBlock::Create()' take a non-const
+/// pointer.
 class NonWritableBasicBlock {
  public:
   explicit NonWritableBasicBlock(llvm::BasicBlock* basic_block)
@@ -60,6 +66,10 @@ class NonWritableBasicBlock {
   /// 'then_block'.
   void BranchToIfNot(LlvmBuilder* builder, llvm::Value* condition,
       const NonWritableBasicBlock& then_block) const;
+
+  /// Create a basic block that is inserted before this basic block.
+  llvm::BasicBlock* CreateBasicBlockBefore(llvm::LLVMContext& context,
+      const std::string& name = "", llvm::Function* fn = nullptr) const;
  private:
   llvm::BasicBlock* basic_block_;
 };
diff --git a/be/src/runtime/collection-value.cc b/be/src/runtime/collection-value.cc
index 1f3911acc..4e6e8ed8e 100644
--- a/be/src/runtime/collection-value.cc
+++ b/be/src/runtime/collection-value.cc
@@ -20,6 +20,5 @@
 namespace impala {
 
 const char* CollectionValue::LLVM_CLASS_NAME = "struct.impala::CollectionValue";
-const char* CollValueAndSize::LLVM_CLASS_NAME = "struct.impala::CollValueAndSize";
 
 }
diff --git a/be/src/runtime/collection-value.h b/be/src/runtime/collection-value.h
index bd39a915d..fbfdbc282 100644
--- a/be/src/runtime/collection-value.h
+++ b/be/src/runtime/collection-value.h
@@ -56,24 +56,6 @@ struct __attribute__((__packed__)) CollectionValue {
   static const char* LLVM_CLASS_NAME;
 };
 
-// A struct that contains a pointer to a CollectionValue and its byte size. Used instead
-// of std::pair because of codegen, because
-//   - the std::pair type is difficult to name in codegen and
-//   - we are not in control of the layout of std::pair.
-struct CollValueAndSize {
-  CollectionValue* coll_value;
-  // In most (maybe all) cases a 32 bit int should be enough but
-  // 'CollectionValue::ByteSize()' returns int64_t so we use that.
-  int64_t byte_size;
-
-  CollValueAndSize(): CollValueAndSize(nullptr, 0) {}
-  CollValueAndSize(CollectionValue* cv, int64_t size)
-    : coll_value(cv), byte_size(size) {}
-
-  /// For C++/IR interop, we need to be able to look up types by name.
-  static const char* LLVM_CLASS_NAME;
-};
-
 }
 
 #endif
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index 4f25c0454..28e10c842 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -642,7 +642,7 @@ Status DescriptorTbl::CreateInternal(ObjectPool* pool, const TDescriptorTable& t
     if (slot_d->type().IsStructType() && children_tuple_descriptor != nullptr &&
         children_tuple_descriptor->getMasterTuple() == nullptr) {
       TupleDescriptor* master_tuple = parent;
-      // If this struct is nested into another structs then get the topmost tuple for the
+      // If this struct is nested into other struct(s) then get the topmost tuple for the
       // master.
       if (parent->getMasterTuple() != nullptr) master_tuple = parent->getMasterTuple();
       children_tuple_descriptor->setMasterTuple(master_tuple);
@@ -823,7 +823,7 @@ void SlotDescriptor::CodegenSetNullIndicator(
 // Example IR for materializing a string column with non-NULL 'pool'. Includes the part
 // that is generated by CodegenAnyVal::ToReadWriteInfo().
 //
-// Produced for the following query:
+// Produced for the following query as part of the @MaterializeExprs() function.
 //   select string_col from functional_orc_def.alltypes order by string_col limit 2;
 //
 //   ; [insert point starts here]
@@ -912,7 +912,7 @@ void SlotDescriptor::CodegenWriteToSlotHelper(
     const CodegenAnyValReadWriteInfo& read_write_info,
     llvm::Value* main_tuple_llvm_struct_ptr, llvm::Value* tuple_llvm_struct_ptr,
     llvm::Value* pool_val,
-    NonWritableBasicBlock insert_before) const {
+    const NonWritableBasicBlock& insert_before) const {
   DCHECK(main_tuple_llvm_struct_ptr->getType()->isPointerTy());
   DCHECK(main_tuple_llvm_struct_ptr->getType()->getPointerElementType()->isStructTy());
   DCHECK(tuple_llvm_struct_ptr->getType()->isPointerTy());
@@ -927,7 +927,7 @@ void SlotDescriptor::CodegenWriteToSlotHelper(
     CodegenStoreStructToNativePtr(read_write_info, main_tuple_llvm_struct_ptr,
         slot, pool_val, insert_before);
   } else {
-    CodegenStoreNonNullAnyVal(read_write_info, slot, pool_val, this);
+    CodegenStoreNonNullAnyVal(read_write_info, slot, pool_val, this, insert_before);
 
     // We only need this branch if we are not a struct, because for structs, the last leaf
     // (non-struct) field will add this branch.
@@ -943,7 +943,7 @@ void SlotDescriptor::CodegenWriteToSlotHelper(
 void SlotDescriptor::CodegenStoreStructToNativePtr(
     const CodegenAnyValReadWriteInfo& read_write_info, llvm::Value* main_tuple_ptr,
     llvm::Value* struct_slot_ptr, llvm::Value* pool_val,
-    NonWritableBasicBlock insert_before) const {
+    const NonWritableBasicBlock& insert_before) const {
   DCHECK(type_.IsStructType());
   DCHECK(children_tuple_descriptor_ != nullptr);
   DCHECK(read_write_info.type().IsStructType());
@@ -1021,14 +1021,15 @@ CodegenAnyValReadWriteInfo CodegenAnyValToReadWriteInfo(CodegenAnyVal& any_val,
 
 void SlotDescriptor::CodegenStoreNonNullAnyVal(CodegenAnyVal& any_val,
       llvm::Value* raw_val_ptr, llvm::Value* pool_val,
-      const SlotDescriptor* slot_desc) {
+      const SlotDescriptor* slot_desc, const NonWritableBasicBlock& insert_before) {
   CodegenAnyValReadWriteInfo rwi = CodegenAnyValToReadWriteInfo(any_val, pool_val);
-  CodegenStoreNonNullAnyVal(rwi, raw_val_ptr, pool_val, slot_desc);
+  CodegenStoreNonNullAnyVal(rwi, raw_val_ptr, pool_val, slot_desc, insert_before);
 }
 
 void SlotDescriptor::CodegenStoreNonNullAnyVal(
     const CodegenAnyValReadWriteInfo& read_write_info, llvm::Value* raw_val_ptr,
-    llvm::Value* pool_val, const SlotDescriptor* slot_desc) {
+    llvm::Value* pool_val, const SlotDescriptor* slot_desc,
+    const NonWritableBasicBlock& insert_before) {
   LlvmBuilder* builder = read_write_info.builder();
   const ColumnType& type = read_write_info.type();
   switch (type.type) {
@@ -1037,7 +1038,7 @@ void SlotDescriptor::CodegenStoreNonNullAnyVal(
     case TYPE_ARRAY: // CollectionVal has same memory layout as StringVal.
     case TYPE_MAP: { // CollectionVal has same memory layout as StringVal.
       CodegenWriteStringOrCollectionToSlot(read_write_info, raw_val_ptr,
-          pool_val, slot_desc);
+          pool_val, slot_desc, insert_before);
       break;
     }
     case TYPE_CHAR:
@@ -1108,31 +1109,213 @@ void SlotDescriptor::CodegenSetToNull(const CodegenAnyValReadWriteInfo& read_wri
   }
 }
 
+// Example IR for materializing an int and an array<string> column with non-NULL 'pool'.
+// Includes the part that is generated by CodegenAnyVal::ToReadWriteInfo().
+//
+// Produced for the following query as part of the @MaterializeExprs() function.
+//   select id, arr_string_1d from functional_parquet.collection_tbl order by id limit 2;
+//
+//   ; [insert point starts here]
+//   br label %entry1
+//
+// entry1:                               ; preds = %entry
+//   %is_null = trunc i64 %src to i1
+//   br i1 %is_null, label %null, label %non_null
+//
+// non_null:                             ; preds = %entry1
+//   %1 = ashr i64 %src, 32
+//   %2 = trunc i64 %1 to i32
+//   %slot = getelementptr inbounds <{ %"struct.impala::CollectionValue", i32, i8 }>,
+//       <{ %"struct.impala::CollectionValue", i32, i8 }>* %tuple, i32 0, i32 1
+//   store i32 %2, i32* %slot
+//   br label %end_write
+//
+// null:                                 ; preds = %entry1
+//   %3 = bitcast <{ %"struct.impala::CollectionValue", i32, i8 }>* %tuple to i8*
+//   %null_byte_ptr = getelementptr inbounds i8, i8* %3, i32 16
+//   %null_byte = load i8, i8* %null_byte_ptr
+//   %null_bit_set = or i8 %null_byte, 2
+//   store i8 %null_bit_set, i8* %null_byte_ptr
+//   br label %end_write
+//
+// end_write:                            ; preds = %null, %non_null
+//   %4 = getelementptr %"class.impala::ScalarExprEvaluator"*,
+//       %"class.impala::ScalarExprEvaluator"** %slot_materialize_expr_evals, i32 1
+//   %expr_eval2 = load %"class.impala::ScalarExprEvaluator"*,
+//       %"class.impala::ScalarExprEvaluator"** %4
+//   %src3 = call { i64, i8* } @GetSlotRef.4(
+//       %"class.impala::ScalarExprEvaluator"* %expr_eval2,
+//       %"class.impala::TupleRow"* %row)
+//   br label %entry4
+//
+// entry4:                               ; preds = %end_write
+//   %5 = extractvalue { i64, i8* } %src3, 0
+//   %is_null7 = trunc i64 %5 to i1
+//   br i1 %is_null7, label %null6, label %non_null5
+//
+// non_null5:                            ; preds = %entry4
+//   %6 = extractvalue { i64, i8* } %src3, 0
+//   %7 = ashr i64 %6, 32
+//   %8 = trunc i64 %7 to i32
+//   %src8 = extractvalue { i64, i8* } %src3, 1
+//   %slot10 = getelementptr inbounds <{ %"struct.impala::CollectionValue", i32, i8 }>,
+//       <{ %"struct.impala::CollectionValue", i32, i8 }>* %tuple, i32 0, i32 0
+//   %9 = insertvalue %"struct.impala::CollectionValue" zeroinitializer, i32 %8, 1
+//   %10 = mul i32 %8, 13
+//   %11 = sext i32 %10 to i64
+//   %new_coll_val_ptr = call i8* @_ZN6impala7MemPool8AllocateILb0EEEPhli(
+//       %"class.impala::MemPool"* %pool, i64 %11, i32 8)
+//   call void @llvm.memcpy.p0i8.p0i8.i32(i8* %new_coll_val_ptr, i8* %src8, i32 %10,
+//       i32 0, i1 false)
+//   %12 = insertvalue %"struct.impala::CollectionValue" %9, i8* %new_coll_val_ptr, 0
+//   store i32 0, i32* %item_index_addr
+//   br label %loop_condition_block
+//
+// null6:                                ; preds = %entry4
+//   %13 = bitcast <{ %"struct.impala::CollectionValue", i32, i8 }>* %tuple to i8*
+//   %null_byte_ptr15 = getelementptr inbounds i8, i8* %13, i32 16
+//   %null_byte16 = load i8, i8* %null_byte_ptr15
+//   %null_bit_set17 = or i8 %null_byte16, 1
+//   store i8 %null_bit_set17, i8* %null_byte_ptr15
+//   br label %end_write9
+//
+// loop_condition_block:                 ; preds = %loop_increment_block, %non_null5
+//   %item_index = load i32, i32* %item_index_addr
+//   %continue_loop = icmp slt i32 %item_index, %8
+//   br i1 %continue_loop, label %loop_body_block, label %loop_exit_block
+//
+// loop_body_block:                      ; preds = %loop_condition_block
+//   %children_tuple_array = bitcast i8* %new_coll_val_ptr
+//       to <{ %"struct.impala::StringValue", i8 }>*
+//   %children_tuple = getelementptr inbounds <{ %"struct.impala::StringValue", i8 }>,
+//       <{ %"struct.impala::StringValue", i8 }>* %children_tuple_array, i32 %item_index
+//   %14 = bitcast <{ %"struct.impala::StringValue", i8 }>* %children_tuple to i8*
+//   %null_byte_ptr11 = getelementptr inbounds i8, i8* %14, i32 12
+//   %null_byte12 = load i8, i8* %null_byte_ptr11
+//   %null_mask = and i8 %null_byte12, 1
+//   %is_null13 = icmp ne i8 %null_mask, 0
+//   br i1 %is_null13, label %next_block_after_child_is_written,
+//       label %child_non_null_block
+//
+// loop_increment_block:                 ; preds = %next_block_after_child_is_written
+//   %item_index_incremented = add i32 %item_index, 1
+//   store i32 %item_index_incremented, i32* %item_index_addr
+//   br label %loop_condition_block
+//
+// loop_exit_block:                      ; preds = %loop_condition_block
+//   store %"struct.impala::CollectionValue" %12,
+//       %"struct.impala::CollectionValue"* %slot10
+//   br label %end_write9
+//
+// child_non_null_block:                 ; preds = %loop_body_block
+//   %child_str_or_coll_value_addr = getelementptr inbounds
+//       <{ %"struct.impala::StringValue", i8 }>,
+//       <{ %"struct.impala::StringValue", i8 }>* %children_tuple, i32 0, i32 0
+//   %child_str_or_coll_value = load %"struct.impala::StringValue",
+//       %"struct.impala::StringValue"* %child_str_or_coll_value_addr
+//   %child_str_or_coll_value_ptr = extractvalue
+//       %"struct.impala::StringValue" %child_str_or_coll_value, 0
+//   %child_str_or_coll_value_len = extractvalue
+//       %"struct.impala::StringValue" %child_str_or_coll_value, 1
+//   %15 = insertvalue %"struct.impala::StringValue" zeroinitializer,
+//       i32 %child_str_or_coll_value_len, 1
+//   %16 = sext i32 %child_str_or_coll_value_len to i64
+//   %new_coll_val_ptr14 = call i8* @_ZN6impala7MemPool8AllocateILb0EEEPhli(
+//       %"class.impala::MemPool"* %pool, i64 %16, i32 8)
+//   call void @llvm.memcpy.p0i8.p0i8.i32(i8* %new_coll_val_ptr14,
+//       i8* %child_str_or_coll_value_ptr, i32 %child_str_or_coll_value_len, i32 0,
+//       i1 false)
+//   %17 = insertvalue %"struct.impala::StringValue" %15, i8* %new_coll_val_ptr14, 0
+//   store %"struct.impala::StringValue" %17,
+//       %"struct.impala::StringValue"* %child_str_or_coll_value_addr
+//   br label %next_block_after_child_is_written
+//
+// next_block_after_child_is_written:    ; preds = %child_non_null_block, %loop_body_block
+//   br label %loop_increment_block
+//
+// end_write9:                           ; preds = %null6, %loop_exit_block
+//   ; [insert point ends here]
 void SlotDescriptor::CodegenWriteStringOrCollectionToSlot(
     const CodegenAnyValReadWriteInfo& read_write_info,
-    llvm::Value* slot_ptr, llvm::Value* pool_val, const SlotDescriptor* slot_desc) {
+    llvm::Value* slot_ptr, llvm::Value* pool_val, const SlotDescriptor* slot_desc,
+    const NonWritableBasicBlock& insert_before) {
   const ColumnType& type = read_write_info.type();
   if (type.IsStringType()) {
     CodegenWriteStringToSlot(read_write_info, slot_ptr, pool_val, slot_desc);
   } else {
     DCHECK(type.IsCollectionType());
-    CodegenWriteCollectionToSlot(read_write_info, slot_ptr, pool_val, slot_desc);
+    CodegenWriteCollectionToSlot(read_write_info, slot_ptr, pool_val, slot_desc,
+        insert_before);
+  }
+}
+
+namespace {
+constexpr int COLL_VALUE_PTR_IDX = 0;
+constexpr int COLL_VALUE_LEN_IDX = 1;
+
+llvm::Value* CodegenStrOrCollValueGetPtr(LlvmCodeGen* codegen, LlvmBuilder* builder,
+    llvm::Value* str_or_coll_value_addr, const string& name = "") {
+  if (str_or_coll_value_addr->getType() ==
+      codegen->GetStructType<StringValue>()->getPointerTo()) {
+    llvm::Function* str_ptr_fn = codegen->GetFunction(
+        IRFunction::STRING_VALUE_PTR, false);
+    return builder->CreateCall(str_ptr_fn,
+        llvm::ArrayRef<llvm::Value*>({str_or_coll_value_addr}), name);
+  } else {
+    DCHECK(str_or_coll_value_addr->getType() ==
+        codegen->GetStructType<CollectionValue>()->getPointerTo());
+    llvm::Value* ptr_addr = builder->CreateStructGEP(nullptr, str_or_coll_value_addr,
+        COLL_VALUE_PTR_IDX, name + "_addr");
+    return builder->CreateLoad(ptr_addr, name);
   }
 }
 
+llvm::Value* CodegenStrOrCollValueGetLen(LlvmCodeGen* codegen, LlvmBuilder* builder,
+    llvm::Value* str_or_coll_value_addr, const string& name = "") {
+  if (str_or_coll_value_addr->getType() ==
+      codegen->GetStructType<StringValue>()->getPointerTo()) {
+    llvm::Function* str_len_fn = codegen->GetFunction(
+        IRFunction::STRING_VALUE_LEN, false);
+    return builder->CreateCall(str_len_fn,
+        llvm::ArrayRef<llvm::Value*>({str_or_coll_value_addr}), name);
+  } else {
+    DCHECK(str_or_coll_value_addr->getType() ==
+        codegen->GetStructType<CollectionValue>()->getPointerTo());
+    llvm::Value* len_addr = builder->CreateStructGEP(nullptr, str_or_coll_value_addr,
+        COLL_VALUE_LEN_IDX, name + "_addr");
+    return builder->CreateLoad(len_addr, name);
+  }
+}
+
+llvm::Value* CodegenCollValueSetPtr(LlvmCodeGen* codegen, LlvmBuilder* builder,
+    llvm::Value* str_or_coll_value, llvm::Value* ptr, const string& name = "") {
+  DCHECK(str_or_coll_value->getType() == codegen->GetStructType<CollectionValue>());
+  return builder->CreateInsertValue(str_or_coll_value, ptr, COLL_VALUE_PTR_IDX,
+      name);
+}
+
+llvm::Value* CodegenCollValueSetLen(LlvmCodeGen* codegen, LlvmBuilder* builder,
+    llvm::Value* str_or_coll_value, llvm::Value* len, const string& name = "") {
+  DCHECK(str_or_coll_value->getType() == codegen->GetStructType<CollectionValue>());
+  return builder->CreateInsertValue(str_or_coll_value, len, COLL_VALUE_LEN_IDX,
+      name);
+}
+} /* anonymous namespace */
+
 void SlotDescriptor::CodegenWriteCollectionToSlot(
     const CodegenAnyValReadWriteInfo& read_write_info,
-    llvm::Value* slot_ptr, llvm::Value* pool_val, const SlotDescriptor* slot_desc) {
+    llvm::Value* slot_ptr, llvm::Value* pool_val, const SlotDescriptor* slot_desc,
+    const NonWritableBasicBlock& insert_before) {
   LlvmCodeGen* codegen = read_write_info.codegen();
   LlvmBuilder* builder = read_write_info.builder();
   const ColumnType& type = read_write_info.type();
   DCHECK(type.IsCollectionType());
 
-  // Convert to StringValue/CollectionValue
+  // Convert to 'CollectionValue'.
   llvm::Type* raw_type = codegen->GetSlotType(type);
   llvm::Value* coll_value = llvm::Constant::getNullValue(raw_type);
-  coll_value = builder->CreateInsertValue(
-      coll_value, read_write_info.GetPtrAndLen().len, 1);
+  coll_value = CodegenCollValueSetLen(codegen, builder, coll_value,
+      read_write_info.GetPtrAndLen().len);
   if (pool_val != nullptr) {
     llvm::Value* len = read_write_info.GetPtrAndLen().len;
     DCHECK(slot_desc != nullptr) << "SlotDescriptor needed to calculate the size of "
@@ -1143,15 +1326,17 @@ void SlotDescriptor::CodegenWriteCollectionToSlot(
     int item_tuple_byte_size = slot_desc->children_tuple_descriptor()->byte_size();
     len = builder->CreateMul(len, codegen->GetI32Constant(item_tuple_byte_size));
 
-    // Allocate a 'new_ptr' from 'pool_val' and copy the data from
-    // 'read_write_info->ptr'
+    // Allocate a 'new_ptr' from 'pool_val' and copy the data from 'read_write_info->ptr'.
     llvm::Value* new_ptr = codegen->CodegenMemPoolAllocate(
-        builder, pool_val, len, "new_ptr");
+        builder, pool_val, len, "new_coll_val_ptr");
     codegen->CodegenMemcpy(builder, new_ptr, read_write_info.GetPtrAndLen().ptr, len);
-    coll_value = builder->CreateInsertValue(coll_value, new_ptr, 0);
+    coll_value = CodegenCollValueSetPtr(codegen, builder, coll_value, new_ptr);
+
+    slot_desc->CodegenWriteCollectionItemsToSlot(codegen, builder, new_ptr,
+        read_write_info.GetPtrAndLen().len, pool_val, insert_before);
   } else {
-    coll_value = builder->CreateInsertValue(
-        coll_value, read_write_info.GetPtrAndLen().ptr, 0);
+    coll_value = CodegenCollValueSetPtr(codegen, builder, coll_value,
+        read_write_info.GetPtrAndLen().ptr);
   }
   builder->CreateStore(coll_value, slot_ptr);
 }
@@ -1167,8 +1352,7 @@ void SlotDescriptor::CodegenWriteStringToSlot(
   llvm::Value* ptr = read_write_info.GetPtrAndLen().ptr;
   llvm::Value* len = read_write_info.GetPtrAndLen().len;
   if (pool_val != nullptr) {
-    // Allocate a 'new_ptr' from 'pool_val' and copy the data from
-    // 'read_write_info->ptr'
+    // Allocate a 'new_ptr' from 'pool_val' and copy the data from 'read_write_info->ptr'.
     llvm::Value* new_ptr = codegen->CodegenMemPoolAllocate(
         builder, pool_val, len, "new_ptr");
     codegen->CodegenMemcpy(builder, new_ptr, ptr, len);
@@ -1180,6 +1364,153 @@ void SlotDescriptor::CodegenWriteStringToSlot(
       llvm::ArrayRef<llvm::Value*>({slot_ptr, ptr, len}));
 }
 
+void SlotDescriptor::CodegenWriteCollectionItemsToSlot(LlvmCodeGen* codegen,
+    LlvmBuilder* builder, llvm::Value* collection_value_ptr, llvm::Value* num_tuples,
+    llvm::Value* pool_val, const NonWritableBasicBlock& insert_before) const {
+  DCHECK(pool_val != nullptr);
+  // We construct a while-like loop using basic blocks and conditional branches to iterate
+  // through the items of the collection, recursively.
+  llvm::Function* fn = builder->GetInsertBlock()->getParent();
+  llvm::BasicBlock* loop_condition_block = insert_before.CreateBasicBlockBefore(
+      codegen->context(), "loop_condition_block", fn);
+  llvm::BasicBlock* loop_body_block = insert_before.CreateBasicBlockBefore(
+      codegen->context(), "loop_body_block", fn);
+  llvm::BasicBlock* loop_increment_block = insert_before.CreateBasicBlockBefore(
+      codegen->context(), "loop_increment_block", fn);
+  llvm::BasicBlock* loop_exit_block = insert_before.CreateBasicBlockBefore(
+      codegen->context(), "loop_exit_block", fn);
+
+  // Initialise the loop counter.
+  llvm::Value* item_index_addr = codegen->CreateEntryBlockAlloca(
+      *builder, codegen->i32_type(), "item_index_addr");
+  builder->CreateStore(codegen->GetI32Constant(0), item_index_addr);
+
+  builder->CreateBr(loop_condition_block);
+
+  // Loop condition block
+  builder->SetInsertPoint(loop_condition_block);
+  llvm::Value* item_index = builder->CreateLoad(item_index_addr, "item_index");
+  llvm::Value* continue_loop = builder->CreateICmpSLT(
+      item_index, num_tuples, "continue_loop");
+  builder->CreateCondBr(continue_loop, loop_body_block, loop_exit_block);
+
+  // Loop body
+  builder->SetInsertPoint(loop_body_block);
+  CodegenWriteCollectionItemLoopBody(codegen, builder, collection_value_ptr, num_tuples,
+      item_index, fn, insert_before, pool_val);
+  builder->CreateBr(loop_increment_block);
+
+  // Loop increment
+  builder->SetInsertPoint(loop_increment_block);
+  llvm::Value* item_index_incremented = builder->CreateAdd(
+      item_index, codegen->GetI32Constant(1), "item_index_incremented");
+  builder->CreateStore(item_index_incremented, item_index_addr);
+  builder->CreateBr(loop_condition_block);
+
+  // Loop exit
+  builder->SetInsertPoint(loop_exit_block);
+}
+
+void SlotDescriptor::CodegenWriteCollectionItemLoopBody(LlvmCodeGen* codegen,
+    LlvmBuilder* builder, llvm::Value* collection_value_ptr, llvm::Value* num_tuples,
+    llvm::Value* item_index, llvm::Function* fn,
+    const NonWritableBasicBlock& insert_before, llvm::Value* pool_val) const {
+  DCHECK(pool_val != nullptr);
+  const TupleDescriptor* children_tuple_desc = children_tuple_descriptor();
+  DCHECK(children_tuple_desc != nullptr);
+
+  llvm::Type* children_tuple_struct_type = children_tuple_desc->GetLlvmStruct(codegen);
+  DCHECK(children_tuple_struct_type != nullptr);
+  llvm::PointerType* children_tuple_type = codegen->GetPtrType(
+      children_tuple_struct_type);
+
+  llvm::Value* children_tuple_array = builder->CreateBitCast(collection_value_ptr,
+      children_tuple_type, "children_tuple_array");
+  llvm::Value* children_tuple = builder->CreateInBoundsGEP(children_tuple_array,
+      item_index, "children_tuple");
+
+  CodegenWriteCollectionIterateOverChildren(codegen, builder, children_tuple, fn,
+      insert_before, pool_val);
+}
+
+void SlotDescriptor::CodegenWriteCollectionIterateOverChildren(LlvmCodeGen* codegen,
+    LlvmBuilder* builder, llvm::Value* children_tuple, llvm::Function* fn,
+    const NonWritableBasicBlock& insert_before, llvm::Value* pool_val) const {
+  DCHECK(pool_val != nullptr);
+  const TupleDescriptor* children_tuple_desc = children_tuple_descriptor();
+  DCHECK(children_tuple_desc != nullptr);
+
+  for (const SlotDescriptor* child_slot_desc : children_tuple_desc->slots()) {
+    DCHECK(child_slot_desc != nullptr);
+
+    const ColumnType& child_type = child_slot_desc->type();
+    if (child_type.IsVarLenStringType() || child_type.IsCollectionType()) {
+      child_slot_desc->CodegenWriteCollectionVarlenChild(codegen, builder, children_tuple,
+          fn, insert_before, pool_val);
+    } else if (child_type.IsStructType()) {
+      child_slot_desc-> CodegenWriteCollectionStructChild(codegen, builder,
+          children_tuple, fn, insert_before, pool_val);
+    }
+  }
+}
+
+void SlotDescriptor::CodegenWriteCollectionStructChild(LlvmCodeGen* codegen,
+    LlvmBuilder* builder, llvm::Value* tuple, llvm::Function* fn,
+    const NonWritableBasicBlock& insert_before, llvm::Value* pool_val) const {
+  DCHECK(type().IsStructType());
+
+  const TupleDescriptor* children_tuple_desc = children_tuple_descriptor();
+  DCHECK(children_tuple_desc != nullptr);
+
+  // We only need to handle var-len descendant slots.
+  if (children_tuple_desc->string_slots().empty()
+      && children_tuple_desc->collection_slots().empty()) {
+    return;
+  }
+
+  llvm::Value* children_tuple = builder->CreateStructGEP(nullptr, tuple,
+      llvm_field_idx(), "struct_children_tuple");
+
+  CodegenWriteCollectionIterateOverChildren(codegen, builder, children_tuple ,fn,
+      insert_before, pool_val);
+}
+
+void SlotDescriptor::CodegenWriteCollectionVarlenChild(LlvmCodeGen* codegen,
+    LlvmBuilder* builder, llvm::Value* children_tuple, llvm::Function* fn,
+    const NonWritableBasicBlock& insert_before, llvm::Value* pool_val) const {
+  DCHECK(pool_val != nullptr);
+  DCHECK(type_.IsVarLenStringType() || type_.IsCollectionType());
+
+  llvm::BasicBlock* child_non_null_block = insert_before.CreateBasicBlockBefore(
+      codegen->context(), "child_non_null_block", fn);
+  llvm::BasicBlock* child_written_block = insert_before.CreateBasicBlockBefore(
+      codegen->context(), "next_block_after_child_is_written", fn);
+
+  llvm::Value* child_is_null = CodegenIsNull(codegen, builder, children_tuple);
+  builder->CreateCondBr(child_is_null, child_written_block, child_non_null_block);
+
+  // Note: Although the input of CodegenWriteStringOrCollectionToSlot() is a '*Val', not a
+  // '*Value', the items of a collection are still '*Value' objects, because the pointer
+  // of the collection points to an array of tuples (the items). 'StringValue' has Small
+  // String Optimisation, but smallness is not preserved here: even if the 'StringValue'
+  // was originally small, the new copy will be a long string.
+  builder->SetInsertPoint(child_non_null_block);
+  llvm::Value* child_str_or_coll_value_slot = builder->CreateStructGEP(nullptr,
+      children_tuple, llvm_field_idx(), "child_str_or_coll_value_addr");
+  llvm::Value* child_str_or_coll_value_ptr = CodegenStrOrCollValueGetPtr(codegen, builder,
+      child_str_or_coll_value_slot, "child_str_or_coll_value_ptr");
+  llvm::Value* child_str_or_coll_value_len = CodegenStrOrCollValueGetLen(codegen, builder,
+      child_str_or_coll_value_slot, "child_str_or_coll_value_len");
+
+  CodegenAnyValReadWriteInfo child_rwi(codegen, builder, type());
+  child_rwi.SetPtrAndLen(child_str_or_coll_value_ptr, child_str_or_coll_value_len);
+
+  CodegenWriteStringOrCollectionToSlot(child_rwi, child_str_or_coll_value_slot, pool_val,
+      this, insert_before);
+  builder->CreateBr(child_written_block);
+  builder->SetInsertPoint(child_written_block);
+}
+
 llvm::Value* SlotDescriptor::CodegenToTimestampValue(
     const CodegenAnyValReadWriteInfo& read_write_info) {
   const ColumnType& type = read_write_info.type();
@@ -1268,6 +1599,7 @@ llvm::StructType* TupleDescriptor::CreateLlvmStructTypeFromFieldTypes(
   // 16 bytes although their useful payload is only 12 bytes.
   llvm::StructType* tuple_struct = llvm::StructType::get(codegen->context(),
       llvm::ArrayRef<llvm::Type*>(field_types), true);
+  DCHECK(tuple_struct != nullptr);
   const llvm::DataLayout& data_layout = codegen->execution_engine()->getDataLayout();
   const llvm::StructLayout* layout = data_layout.getStructLayout(tuple_struct);
   for (SlotDescriptor* slot: slots()) {
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 5b7ae6381..35e585315 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -35,6 +35,7 @@
 
 namespace llvm {
   class Constant;
+  class Function;
   class StructType;
   class Type;
   class Value;
@@ -196,15 +197,20 @@ class SlotDescriptor {
   /// 'slot_desc' is needed when 'pool_val' is non-NULL and the value is a collection. In
   /// this case the collection is copied and the slot desc is needed to calculate its byte
   /// size.
+  ///
+  /// If 'insert_before' contains a non-NULL pointer to a basic block, any newly created
+  /// blocks will be inserted before that.
   static void CodegenStoreNonNullAnyVal(CodegenAnyVal& any_val,
       llvm::Value* raw_val_ptr, llvm::Value* pool_val = nullptr,
-      const SlotDescriptor* slot_desc = nullptr);
+      const SlotDescriptor* slot_desc = nullptr,
+      const NonWritableBasicBlock& insert_before = NonWritableBasicBlock(nullptr));
 
   /// Like the above, but takes a 'CodegenAnyValReadWriteInfo' instead of a
   /// 'CodegenAnyVal'.
   static void CodegenStoreNonNullAnyVal(const CodegenAnyValReadWriteInfo& read_write_info,
       llvm::Value* raw_val_ptr, llvm::Value* pool_val = nullptr,
-      const SlotDescriptor* slot_desc = nullptr);
+      const SlotDescriptor* slot_desc = nullptr,
+      const NonWritableBasicBlock& insert_before = NonWritableBasicBlock(nullptr));
 
   /// Like 'CodegenStoreNonNullAnyVal' but stores the value into a new alloca()
   /// allocation. Returns a pointer to the stored value.
@@ -254,7 +260,7 @@ class SlotDescriptor {
 
   void CodegenWriteToSlotHelper(const CodegenAnyValReadWriteInfo& read_write_info,
       llvm::Value* main_tuple_llvm_struct_ptr, llvm::Value* tuple_llvm_struct_ptr,
-      llvm::Value* pool_val, NonWritableBasicBlock insert_before) const;
+      llvm::Value* pool_val, const NonWritableBasicBlock& insert_before) const;
 
   /// Stores a struct value into a native slot. This should only be used if this struct is
   /// not null.
@@ -265,27 +271,51 @@ class SlotDescriptor {
   /// beginning of the main tuple.
   void CodegenStoreStructToNativePtr(const CodegenAnyValReadWriteInfo& read_write_info,
       llvm::Value* main_tuple_ptr, llvm::Value* struct_slot_ptr, llvm::Value* pool_val,
-      NonWritableBasicBlock insert_before) const;
+      const NonWritableBasicBlock& insert_before) const;
 
   // Sets the null indicator bit to 0 - recursively for structs.
   void CodegenSetToNull(const CodegenAnyValReadWriteInfo& read_write_info,
       llvm::Value* tuple) const;
 
   /// Codegens writing a string or a collection to the address pointed to by 'slot_ptr'.
-  /// If 'pool_val' is non-NULL, the data will be copied into 'pool_val'. 'pool_val' has
-  /// to be of type MemPool*. 'slot_desc' is needed when 'pool_val' is non-NULL and the
-  /// value is a collection. In this case the collection is copied and the slot desc is
-  /// needed to calculate its byte size.
+  /// If 'pool_val' is non-NULL, the data will be deep-copied into 'pool_val'. 'pool_val'
+  /// has to be of type MemPool*. 'slot_desc' is needed when 'pool_val' is non-NULL and
+  /// the value is a collection. In this case the collection is copied and the slot desc
+  /// is needed to calculate its byte size.
   static void CodegenWriteStringOrCollectionToSlot(
       const CodegenAnyValReadWriteInfo& read_write_info,
-      llvm::Value* slot_ptr, llvm::Value* pool_val, const SlotDescriptor* slot_desc);
+      llvm::Value* slot_ptr, llvm::Value* pool_val, const SlotDescriptor* slot_desc,
+      const NonWritableBasicBlock& insert_before);
   static void CodegenWriteCollectionToSlot(
       const CodegenAnyValReadWriteInfo& read_write_info,
-      llvm::Value* slot_ptr, llvm::Value* pool_val, const SlotDescriptor* slot_desc);
+      llvm::Value* slot_ptr, llvm::Value* pool_val, const SlotDescriptor* slot_desc,
+      const NonWritableBasicBlock& insert_before);
   static void CodegenWriteStringToSlot(
       const CodegenAnyValReadWriteInfo& read_write_info,
       llvm::Value* slot_ptr, llvm::Value* pool_val, const SlotDescriptor* slot_desc);
 
+  void CodegenWriteCollectionItemsToSlot(LlvmCodeGen* codegen, LlvmBuilder* builder,
+      llvm::Value* collection_value, llvm::Value* num_tuples, llvm::Value* pool_val,
+      const NonWritableBasicBlock& insert_before) const;
+
+  // The below functions are helpers of 'CodegenWriteStringOrCollectionToSlot()'.
+  void CodegenWriteCollectionItemLoopBody(LlvmCodeGen* codegen, LlvmBuilder* builder,
+      llvm::Value* collection_value_ptr, llvm::Value* num_tuples,
+      llvm::Value* child_index, llvm::Function* fn,
+      const NonWritableBasicBlock& insert_before, llvm::Value* pool_val) const;
+
+  void CodegenWriteCollectionIterateOverChildren(LlvmCodeGen* codegen,
+      LlvmBuilder* builder, llvm::Value* children_tuple, llvm::Function* fn,
+      const NonWritableBasicBlock& insert_before, llvm::Value* pool_val) const;
+
+  void CodegenWriteCollectionStructChild(LlvmCodeGen* codegen, LlvmBuilder* builder,
+      llvm::Value* tuple, llvm::Function* fn, const NonWritableBasicBlock& insert_before,
+      llvm::Value* pool_val) const;
+
+  void CodegenWriteCollectionVarlenChild(LlvmCodeGen* codegen, LlvmBuilder* builder,
+      llvm::Value* child_tuple, llvm::Function* fn,
+      const NonWritableBasicBlock& insert_before, llvm::Value* pool_val) const;
+
   static llvm::Value* CodegenToTimestampValue(
       const CodegenAnyValReadWriteInfo& read_write_info);
 };
@@ -579,9 +609,8 @@ class TupleDescriptor {
   /// but not necessarily the id.
   bool LayoutEquals(const TupleDescriptor& other_desc) const;
 
-  /// Creates a typed struct description for llvm.  The layout of the struct is computed
-  /// by the FE which includes the order of the fields in the resulting struct.
-  /// Returns the struct type or NULL if the type could not be created.
+  /// Creates and returns a typed struct description for llvm. The layout of the struct is
+  /// computed by the FE which includes the order of the fields in the resulting struct.
   /// For example, the aggregation tuple for this query: select count(*), min(int_col_a)
   /// would map to:
   /// struct Tuple {
@@ -621,18 +650,46 @@ class TupleDescriptor {
   /// collection, empty otherwise.
   SchemaPath tuple_path_;
 
-  /// If this tuple represents the children of a struct slot then 'master_tuple_' is the
-  /// tuple that holds the topmost struct slot. For example:
-  /// - Tuple0
-  ///     - Slot1 e.g. INT slot
-  ///     - Slot2 e.g. STRUCT slot
-  ///         - Tuple1 (Holds the children of the struct)
-  ///             - Slot3 e.g. INT child of the STRUCT
-  ///             - Slot4 e.g. STRING child of the STRUCT
-  /// In the above example the 'master_tuple_' for Tuple1 (that is the struct's tuple to
-  /// hold its children) would be Tuple0. In case the STRUCT in Slot2 was a nested struct
-  /// in any depth then the 'master_tuple_' for any of the tuples under Slot2 would be
-  /// again Tuple0.
+  /// If this tuple represents the children of a struct slot then the master tuple is
+  /// defined as follows:
+  ///  - starting from this tuple, go upwards in the slot-tuple tree as long as the parent
+  ///    slot is a struct
+  ///  - the tuple holding the last such struct slot, i.e. the slot whose immediate parent
+  ///    (if it exists) is not a struct, is the master tuple for this tuple (and all other
+  ///    tuples on the path).
+  /// Therefore the master tuple is either the main tuple or the item tuple of a
+  /// collection.
+  ///
+  /// For example:
+  /// - Tuple0 (Main tuple)
+  ///   - Slot0: INT
+  ///   - Slot1: STRUCT
+  ///     - Tuple1 (Holds the children of the struct in Slot1)
+  ///       - Slot2: ARRAY
+  ///         - Tuple2 (Holds the children of the array in Slot2)
+  ///           - Slot3: STRUCT
+  ///             - Tuple3 (Holds the children of the struct in Slot3)
+  ///               - Slot4: STRUCT
+  ///                 - Tuple4 (Holds the children of the struct in Slot4)
+  ///                   - Slot5: INT
+  ///                   - Slot6: STRING
+  ///
+  /// In the above example the master tuple for Tuple1 is Tuple0. For Tuple3 and Tuple4
+  /// the master tuple is Tuple2 because it holds the struct in Slot3 which is not
+  /// directly inside another struct. Tuple0 and Tuple2 do not have a master tuple because
+  /// they do not hold the children of structs.
+  ///
+  /// The reason for this is that, in memory, the children of a struct do not have their
+  /// own physical tuple but are placed in the tuple that conceptually holds their parent
+  /// struct. This means that the whole of a pure struct tree (i.e. one that does not
+  /// contain collections) of any depth will be placed in the same physical tuple, the
+  /// master tuple. Slot offsets are calculated in terms of the master tuple, not the
+  /// individual structs' virtual tuples.
+  ///
+  /// Children of a collection, on the other hand, do have their physical tuple that is
+  /// distinct from the one that holds their parent collection. This is because the number
+  /// of children a collection has is not known statically, in contrast to the number (and
+  /// type) of the fields of structs.
   TupleDescriptor* master_tuple_ = nullptr;
 
   TupleDescriptor(const TTupleDescriptor& tdesc);
diff --git a/be/src/runtime/raw-value.cc b/be/src/runtime/raw-value.cc
index 9f3f1a64e..a6d9fbe2f 100644
--- a/be/src/runtime/raw-value.cc
+++ b/be/src/runtime/raw-value.cc
@@ -243,9 +243,7 @@ void RawValue::WriteNonNull(const void* value, Tuple* tuple,
 
   if (COLLECT_VAR_LEN_VALS) {
     DCHECK(string_values != nullptr);
-    DCHECK(string_values->size() == 0);
     DCHECK(collection_values != nullptr);
-    DCHECK(collection_values->size() == 0);
   }
 
   if (slot_desc->type().IsStructType()) {
@@ -256,7 +254,7 @@ void RawValue::WriteNonNull(const void* value, Tuple* tuple,
         string_values, collection_values);
   } else {
     WritePrimitiveCollectVarlen<COLLECT_VAR_LEN_VALS>(value, tuple, slot_desc, pool,
-        string_values, collection_values);
+        string_values);
   }
 }
 
@@ -278,17 +276,16 @@ void RawValue::WriteStruct(const void* value, Tuple* tuple,
   for (int i = 0; i < src->num_children; ++i) {
     SlotDescriptor* child_slot = children_tuple_desc->slots()[i];
     uint8_t* src_child = src->ptr[i];
+    // TODO IMPALA-12160: Handle collections in structs.
     if (child_slot->type().IsStructType()) {
       // Recursive call in case of nested structs.
       WriteStruct<COLLECT_VAR_LEN_VALS>(src_child, tuple, child_slot, pool,
           string_values, collection_values);
-      continue;
-    }
-    if (src_child == nullptr) {
+    } else if (src_child == nullptr) {
       tuple->SetNull(child_slot->null_indicator_offset());
     } else {
       WritePrimitiveCollectVarlen<COLLECT_VAR_LEN_VALS>(src_child, tuple, child_slot,
-          pool, string_values, collection_values);
+          pool, string_values);
     }
   }
 }
@@ -301,13 +298,16 @@ void RawValue::WriteCollection(const void* value, Tuple* tuple,
 
   void* dst = tuple->GetSlot(slot_desc->tuple_offset());
 
-  // TODO IMPALA-10939: Enable recursive collections.
   const CollectionValue* src = reinterpret_cast<const CollectionValue*>(value);
   CollectionValue* dest = reinterpret_cast<CollectionValue*>(dst);
   dest->num_tuples = src->num_tuples;
 
   int64_t byte_size = dest->ByteSize(*slot_desc->children_tuple_descriptor());
   if (pool != nullptr) {
+    // If 'dest' and 'src' point to the same address, assigning the address of the newly
+    // allocated buffer to 'dest->ptr' will also overwrite 'src->ptr', and the memcpy will
+    // be from the destination to the destination.
+    DCHECK_NE(dest, src);
     // Note: if this changes to TryAllocate(), SlotDescriptor::CodegenWriteToSlot() will
     // need to reflect this change as well (the codegen'd Allocate() call is actually
     // generated in SlotDescriptor::CodegenWriteStringOrCollectionToSlot()).
@@ -317,6 +317,13 @@ void RawValue::WriteCollection(const void* value, Tuple* tuple,
     dest->ptr = src->ptr;
   }
 
+  // We only need to recurse if this is a deep copy (pool != nullptr) OR if we collect
+  // var-len values.
+  if (pool != nullptr || COLLECT_VAR_LEN_VALS) {
+    WriteCollectionChildren<COLLECT_VAR_LEN_VALS>(*dest, *src, *slot_desc, pool,
+        string_values, collection_values);
+  }
+
   if (COLLECT_VAR_LEN_VALS) {
     DCHECK(string_values != nullptr);
     DCHECK(collection_values != nullptr);
@@ -325,18 +332,77 @@ void RawValue::WriteCollection(const void* value, Tuple* tuple,
 }
 
 template <bool COLLECT_VAR_LEN_VALS>
-void RawValue::WritePrimitiveCollectVarlen(const void* value, Tuple* tuple,
-    const SlotDescriptor* slot_desc, MemPool* pool, vector<StringValue*>* string_values,
+void RawValue::WriteCollectionChildren(const CollectionValue& dest,
+    const CollectionValue& src, const SlotDescriptor& collection_slot_desc, MemPool* pool,
+    vector<StringValue*>* string_values,
     vector<pair<CollectionValue*, int64_t>>* collection_values) {
+  DCHECK_EQ(src.num_tuples, dest.num_tuples);
+  const TupleDescriptor* child_tuple_desc =
+      collection_slot_desc.children_tuple_descriptor();
+  DCHECK(child_tuple_desc != nullptr);
+
+  for (int i = 0; i < dest.num_tuples; i++) {
+    Tuple* child_src_tuple = reinterpret_cast<Tuple*>(
+        src.ptr + i * child_tuple_desc->byte_size());
+    Tuple* child_dest_tuple = reinterpret_cast<Tuple*>(
+        dest.ptr + i * child_tuple_desc->byte_size());
+
+    for (const SlotDescriptor* string_slot_desc : child_tuple_desc->string_slots()) {
+      WriteCollectionVarlenChild<COLLECT_VAR_LEN_VALS>(child_dest_tuple, child_src_tuple,
+          string_slot_desc, pool, string_values, collection_values);
+    }
+
+    for (const SlotDescriptor* collection_slot_desc
+        : child_tuple_desc->collection_slots()) {
+      WriteCollectionVarlenChild<COLLECT_VAR_LEN_VALS>(child_dest_tuple, child_src_tuple,
+          collection_slot_desc, pool, string_values, collection_values);
+    }
+  }
+}
+
+template <bool COLLECT_VAR_LEN_VALS>
+void RawValue::WriteCollectionVarlenChild(Tuple* child_dest_tuple, Tuple* child_src_tuple,
+    const SlotDescriptor* slot_desc, MemPool* pool, vector<StringValue*>* string_values,
+    vector<pair<CollectionValue*, int64_t>>* collection_values ) {
+  DCHECK(slot_desc != nullptr);
+  DCHECK(slot_desc->type().IsVarLenStringType() || slot_desc->type().IsCollectionType());
+
+  if (!child_dest_tuple->IsNull(slot_desc->null_indicator_offset())) {
+    // The fixed length part of the child (the pointer and the length / number of tuples)
+    // is already in the destination tuple, copied there as the var-len data of the
+    // parent. We continue the recursion for two things:
+    //   1. deep-copying the var-len data of the child
+    //   2. collecting var-len slots.
+    // At least one of these is true, otherwise we never get here. The called recursive
+    // function will once again set the length (always unnecessary) and the pointer
+    // (unnecessary if we're not deep-copying, only collecting). This is not costly enough
+    // to justify complicating the code. Note, however, that although at this point the
+    // source and destination slots hold the same value (pointer and length / number of
+    // tuples), we take 'child_value', the source in the recursive call, from the source
+    // tuple, because in case of deep-copying, the pointer of the destination slot will be
+    // re-assigned to the newly allocated buffer, and if we took 'child_value' from the
+    // destination slot, the 'source' pointer and the 'destination' pointer would be the
+    // same, meaning the 'source' pointer would also be overwritten before we copied the
+    // data it pointed to.
+    void* child_value = child_src_tuple->GetSlot(slot_desc->tuple_offset());
+
+    WriteNonNull<COLLECT_VAR_LEN_VALS>(child_value, child_dest_tuple,
+        slot_desc, pool, string_values, collection_values);
+  }
+}
+
+template <bool COLLECT_VAR_LEN_VALS>
+void RawValue::WritePrimitiveCollectVarlen(const void* value, Tuple* tuple,
+    const SlotDescriptor* slot_desc, MemPool* pool, vector<StringValue*>* string_values) {
   DCHECK(value != nullptr && tuple != nullptr && slot_desc != nullptr);
 
   void* dst = tuple->GetSlot(slot_desc->tuple_offset());
   WriteNonNullPrimitive(value, dst, slot_desc->type(), pool);
-  if (COLLECT_VAR_LEN_VALS) {
+  if constexpr (COLLECT_VAR_LEN_VALS) {
     DCHECK(string_values != nullptr);
-    DCHECK(collection_values != nullptr);
     if (slot_desc->type().IsVarLenStringType()) {
-      string_values->push_back(reinterpret_cast<StringValue*>(dst));
+      StringValue* str_value = reinterpret_cast<StringValue*>(dst);
+      if (!str_value->IsSmall()) string_values->push_back(str_value);
     } else if (slot_desc->type().IsCollectionType()) {
       DCHECK(false) << "Collections should be handled in WriteCollection.";
     }
@@ -483,11 +549,9 @@ template void RawValue::WriteStruct<false>(const void* value, Tuple* tuple,
 
 template void RawValue::WritePrimitiveCollectVarlen<true>(const void* value,
     Tuple* tuple, const SlotDescriptor* slot_desc, MemPool* pool,
-    std::vector<StringValue*>* string_values,
-    std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
+    std::vector<StringValue*>* string_values);
 template void RawValue::WritePrimitiveCollectVarlen<false>(const void* value,
     Tuple* tuple,
     const SlotDescriptor* slot_desc, MemPool* pool,
-    std::vector<StringValue*>* string_values,
-    std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
+    std::vector<StringValue*>* string_values);
 }
diff --git a/be/src/runtime/raw-value.h b/be/src/runtime/raw-value.h
index 7240b9fc5..0f66d0498 100644
--- a/be/src/runtime/raw-value.h
+++ b/be/src/runtime/raw-value.h
@@ -119,12 +119,16 @@ class RawValue {
 
   /// Writes the bytes of a given value into the slot of a tuple. Supports primitive and
   /// complex types. 'value' is allowed to be NULL. For string and collection values, the
-  /// data is copied into memory allocated from 'pool' if pool is non-NULL, otherwise the
-  /// data is not copied.
-  /// If COLLECT_VAR_LEN_VALS is true, gathers the string slots of the slot tree into
-  /// 'string_values' and the collection slots along with their byte sizes into
-  /// 'collection_values'. In this case, 'string_values' and 'collection_values' must be
-  /// non-NULL.
+  /// data is deep-copied into memory allocated from 'pool' if pool is non-NULL, otherwise
+  /// the data is not copied.
+  ///
+  /// If COLLECT_VAR_LEN_VALS is true, gathers the non-NULL non-smallified string slots of
+  /// the slot tree into 'string_values' and the non-NULL collection slots along with
+  /// their byte sizes into 'collection_values' recursively. Smallified strings (see Small
+  /// String Optimization, IMPALA-12373) are not collected. Children are placed before
+  /// their parents in the vectors (post-order traversal) - see Tuple::MaterializeExprs()
+  /// and Sorter::Run::CollectNonNullVarSlots() for the reason. If COLLECT_VAR_LEN_VALS is
+  /// true, 'string_values' and 'collection_values' must be non-NULL.
   template <bool COLLECT_VAR_LEN_VALS>
   static void Write(const void* value, Tuple* tuple, const SlotDescriptor* slot_desc,
       MemPool* pool, std::vector<StringValue*>* string_values,
@@ -189,15 +193,23 @@ private:
       const SlotDescriptor* slot_desc, MemPool* pool, vector<StringValue*>* string_values,
       vector<pair<CollectionValue*, int64_t>>* collection_values);
 
+  template <bool COLLECT_VAR_LEN_VALS>
+  static void WriteCollectionChildren(const CollectionValue& dest,
+      const CollectionValue& src, const SlotDescriptor& collection_slot_desc,
+      MemPool* pool, vector<StringValue*>* string_values,
+      vector<pair<CollectionValue*, int64_t>>* collection_values);
+
+  template <bool COLLECT_VAR_LEN_VALS>
+  static void WriteCollectionVarlenChild(Tuple* child_dest_tuple, Tuple* child_src_tuple,
+      const SlotDescriptor* slot_desc, MemPool* pool, vector<StringValue*>* string_values,
+      vector<pair<CollectionValue*, int64_t>>* collection_values );
+
   /// Gets the destination slot from 'tuple' and 'slot_desc' and writes 'value' to this
-  /// slot. 'value' must be non-NULL. If COLLECT_VAR_LEN_VALS is true, collects the
-  /// pointers of the string slots to 'string_values' and the pointers of the collection
-  /// slots along with their byte sizes to 'collection_values'. 'slot_desc' has to be a
-  /// primitive type.
+  /// slot. 'value' must be primitive and non-NULL. If COLLECT_VAR_LEN_VALS is true,
+  /// collects the pointers of string slots to 'string_values'.
   template <bool COLLECT_VAR_LEN_VALS>
   static void WritePrimitiveCollectVarlen(const void* value, Tuple* tuple,
       const SlotDescriptor* slot_desc, MemPool* pool,
-      std::vector<StringValue*>* string_values,
-      std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
+      std::vector<StringValue*>* string_values);
 };
 }
diff --git a/be/src/runtime/sorter-internal.h b/be/src/runtime/sorter-internal.h
index 892f4404d..dba50f109 100644
--- a/be/src/runtime/sorter-internal.h
+++ b/be/src/runtime/sorter-internal.h
@@ -244,15 +244,36 @@ class Sorter::Run {
   /// if the run is unpinned.
   Status FinalizePages(vector<Page>* pages);
 
-  void CheckTypeForVarLenCollectionSorting();
-
-  /// Collects the non-null var-len slots (strings and collections) from 'src'. Strings
-  /// are returned in 'string_values' and collections are returned, along with their byte
-  /// size, in 'collection_values'. The total length of all var-len values is returned in
-  /// 'total_var_len'.
-  void CollectNonNullNonSmallVarSlots(
-      Tuple* src, vector<StringValue*>* string_values,
-      std::vector<CollValueAndSize>* collection_values,
+  void CheckTypesAreValidInSortingTuple();
+
+  /// Collects the non-null var-len slots (non-smallified strings and collections) from
+  /// 'src'. Smallified strings (see Small String Optimization, IMPALA-12373) are not
+  /// collected. Strings are returned in 'string_values' and collections are returned,
+  /// along with their byte size, in 'collection_values' (any existing elements of
+  /// 'string_values' and 'collection_values' are cleared). The total length of all
+  /// var-len values is returned in 'total_var_len'. Nested (non-top-level) var-len values
+  /// are collected recursively.
+  ///
+  /// Children are placed before their parents in the vectors (post-order traversal).
+  /// This order is chosen because of the way we serialise the values in CopyVarLenData()
+  /// and CopyVarLenDataConvertOffset(): we write the var-len part of 'StringValue's and
+  /// 'CollectionValue's to the buffer and update the pointers in-place. The order becomes
+  /// important if these '(String|Collection)Value's are themselves (var-len) children of
+  /// other 'CollectionValue's. If children are written before their parents, then when
+  /// the parents are written their pointers have already been updated so they can be
+  /// written as-is. If parents were written before their children, updating the pointers
+  /// in-place would not be enough, the already serialised pointers would have to be
+  /// updated in the byte buffer. Note that to ensure that this order is kept, the
+  /// 'StringValue's must be serialised before the 'CollectionValue's - strings can be
+  /// children of collections but not the other way around.
+  void CollectNonNullNonSmallVarSlots(Tuple* src, const TupleDescriptor& tuple_desc,
+      vector<StringValue*>* string_values,
+      std::vector<std::pair<CollectionValue*, int64_t>>* collection_values,
+      int* total_var_len);
+
+  void CollectNonNullNonSmallVarSlotsHelper(Tuple* src, const TupleDescriptor& tuple_desc,
+      vector<StringValue*>* string_values,
+      std::vector<std::pair<CollectionValue*, int64_t>>* collection_values,
       int* total_var_len);
 
   enum AddPageMode { KEEP_PREV_PINNED, UNPIN_PREV };
@@ -282,28 +303,37 @@ class Sorter::Run {
   /// Copy the var len data in 'string_values' and 'collection_values_and_sizes' to 'dest'
   /// in order and update the pointers to point to the copied data.
   void CopyVarLenData(const vector<StringValue*>& string_values,
-      const vector<CollValueAndSize>& collection_values_and_sizes, uint8_t* dest);
+      const vector<std::pair<CollectionValue*, int64_t>>& collection_values_and_sizes,
+      uint8_t* dest);
 
   /// Copy the StringValues in 'var_values' and the CollectionValues referenced in
   /// 'collection_values_and_sizes' to 'dest' in order. Update the StringValue ptrs in
   /// 'dest' to contain a packed offset for the copied data comprising page_index and the
   /// offset relative to page_start.
   void CopyVarLenDataConvertOffset(const vector<StringValue*>& var_values,
-      const std::vector<CollValueAndSize>& collection_values_and_sizes,
+      const std::vector<std::pair<CollectionValue*, int64_t>>&
+          collection_values_and_sizes,
       int page_index, const uint8_t* page_start, uint8_t* dest);
 
-  /// Convert encoded offsets to valid pointers in tuple with layout 'sort_tuple_desc_'.
+  /// Convert encoded offsets to valid pointers in 'tuple' with layout 'tuple_desc'.
   /// 'tuple' is modified in-place. Returns true if the pointers refer to the page at
-  /// 'var_len_pages_index_' and were successfully converted or false if the var len
-  /// data is in the next page, in which case 'tuple' is unmodified.
-  bool ConvertOffsetsToPtrs(Tuple* tuple);
+  /// 'var_len_pages_index_' and were successfully converted or false if the var len data
+  /// is in the next page, in which case 'tuple' is unmodified.
+  bool ConvertOffsetsToPtrs(Tuple* tuple, const TupleDescriptor& tuple_desc)
+      WARN_UNUSED_RESULT;
 
   template <class ValueType>
+  WARN_UNUSED_RESULT
   bool ConvertValueOffsetsToPtrs(Tuple* tuple, uint8_t* page_start,
       const vector<SlotDescriptor*>& slots);
 
-  bool ConvertStringValueOffsetsToPtrs(Tuple* tuple, uint8_t* page_start);
-  bool ConvertCollectionValueOffsetsToPtrs(Tuple* tuple, uint8_t* page_start);
+  bool ConvertStringValueOffsetsToPtrs(Tuple* tuple, const TupleDescriptor& tuple_desc,
+      uint8_t* page_start) WARN_UNUSED_RESULT;
+  bool ConvertCollectionValueOffsetsToPtrs(Tuple* tuple,
+      const TupleDescriptor& tuple_desc, uint8_t* page_start) WARN_UNUSED_RESULT;
+
+  bool ConvertOffsetsForCollectionChildren(const CollectionValue& cv,
+      const SlotDescriptor& slot_desc) WARN_UNUSED_RESULT;
 
   int NumOpenPages(const vector<Page>& pages);
 
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index acd9be621..07aecc66a 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -148,7 +148,7 @@ Status Sorter::Run::Init() {
     sorter_->spilled_runs_counter_->Add(1);
   }
 
-  CheckTypeForVarLenCollectionSorting();
+  CheckTypesAreValidInSortingTuple();
 
   return Status::OK();
 }
@@ -207,8 +207,9 @@ Status Sorter::Run::AddBatchInternal(
   // processed.
   int cur_input_index = start_index;
   vector<StringValue*> string_values;
-  vector<CollValueAndSize> coll_values_and_sizes;
+  vector<pair<CollectionValue*, int64_t>> coll_values_and_sizes;
   string_values.reserve(sort_tuple_desc_->string_slots().size());
+  coll_values_and_sizes.reserve(sort_tuple_desc_->collection_slots().size());
   while (cur_input_index < batch->num_rows()) {
     // tuples_remaining is the number of tuples to copy/materialize into
     // cur_fixed_len_page.
@@ -233,7 +234,7 @@ Status Sorter::Run::AddBatchInternal(
       } else {
         memcpy(new_tuple, input_row->GetTuple(0), sort_tuple_size_);
         if (HAS_VAR_LEN_SLOTS) {
-          CollectNonNullNonSmallVarSlots(new_tuple, &string_values,
+          CollectNonNullNonSmallVarSlots(new_tuple, *sort_tuple_desc_, &string_values,
               &coll_values_and_sizes, &total_var_len);
         }
       }
@@ -289,28 +290,36 @@ Status Sorter::Run::AddBatchInternal(
   return Status::OK();
 }
 
-bool is_allowed_collection_item_type(const ColumnType& type) {
-  if (type.IsStructType()) {
+bool IsValidStructInSortingTuple(const ColumnType& struct_type) {
+  DCHECK(struct_type.IsStructType());
+  for (const ColumnType& child_type : struct_type.children) {
+    if (child_type.IsCollectionType()) return false;
+    if (child_type.IsStructType() && !IsValidStructInSortingTuple(child_type)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+bool IsValidInSortingTuple(const ColumnType& type) {
+  if (type.IsCollectionType()) {
+    // Check children - one 'item' for arrays and a 'key' and a 'value' for maps.
     for (const ColumnType& child_type : type.children) {
-      if (!is_allowed_collection_item_type(child_type)) return false;
+      if (!IsValidInSortingTuple(child_type)) return false;
     }
     return true;
+  } else if (type.IsStructType()) {
+    return IsValidStructInSortingTuple(type);
   }
-
-  return !type.IsComplexType() && !type.IsVarLenStringType();
+  return true;
 }
 
-void Sorter::Run::CheckTypeForVarLenCollectionSorting() {
-  // Sorting of tuples containing collection values is only implemented if the items are
-  // fixed length types. The planner combined with projection should guarantee that only
-  // such values are in each tuple.
-  for (const SlotDescriptor* coll_slot: sort_tuple_desc_->collection_slots()) {
-    for (SlotDescriptor* child_slot :
-        coll_slot->children_tuple_descriptor()->slots()) {
-      DCHECK(is_allowed_collection_item_type(child_slot->type()))
-          << "Type not allowed in collection in sorting tuple: "
-          << child_slot->type() << ".";
-    }
+void Sorter::Run::CheckTypesAreValidInSortingTuple() {
+  // Collections within structs (also if they are nested in another struct or collection)
+  // are currently not allowed in the sorting tuple (see IMPALA-12160). See
+  // SortInfo.isValidInSortingTuple().
+  for (const SlotDescriptor* slot: sort_tuple_desc_->slots()) {
+    DCHECK(IsValidInSortingTuple(slot->type()));
   }
 }
 
@@ -363,9 +372,10 @@ Status Sorter::Run::UnpinAllPages() {
   sorted_var_len_pages.reserve(var_len_pages_.size());
 
   vector<StringValue*> string_values;
-  vector<CollValueAndSize> collection_values_and_sizes;
+  vector<pair<CollectionValue*, int64_t>> collection_values_and_sizes;
   int total_var_len;
   string_values.reserve(sort_tuple_desc_->string_slots().size());
+  collection_values_and_sizes.reserve(sort_tuple_desc_->collection_slots().size());
   Page* cur_sorted_var_len_page = nullptr;
   if (HasVarLenPages()) {
     DCHECK(var_len_copy_page_.is_open());
@@ -389,7 +399,7 @@ Status Sorter::Run::UnpinAllPages() {
       for (int page_offset = 0; page_offset < cur_fixed_page->valid_data_len();
            page_offset += sort_tuple_size_) {
         Tuple* cur_tuple = reinterpret_cast<Tuple*>(cur_fixed_page->data() + page_offset);
-        CollectNonNullNonSmallVarSlots(cur_tuple, &string_values,
+        CollectNonNullNonSmallVarSlots(cur_tuple, *sort_tuple_desc_, &string_values,
             &collection_values_and_sizes, &total_var_len);
         DCHECK(cur_sorted_var_len_page->is_open());
         if (cur_sorted_var_len_page->BytesRemaining() < total_var_len) {
@@ -531,7 +541,7 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
     Tuple* input_tuple =
         reinterpret_cast<Tuple*>(fixed_len_page->data() + fixed_len_page_offset_);
 
-    if (CONVERT_OFFSET_TO_PTR && !ConvertOffsetsToPtrs(input_tuple)) {
+    if (CONVERT_OFFSET_TO_PTR && !ConvertOffsetsToPtrs(input_tuple, *sort_tuple_desc_)) {
       DCHECK(!is_pinned_);
       // The var-len data is in the next page. We are done with the current page, so
       // return rows we've accumulated so far along with the page's buffer and advance
@@ -593,14 +603,22 @@ Status Sorter::Run::PinNextReadPage(vector<Page>* pages, int page_index) {
   return Status::OK();
 }
 
-void Sorter::Run::CollectNonNullNonSmallVarSlots(Tuple* src,
+void Sorter::Run::CollectNonNullNonSmallVarSlots(Tuple* src, const TupleDescriptor& tuple_desc,
     vector<StringValue*>* string_values,
-    vector<CollValueAndSize>* collection_values, int* total_var_len) {
+    vector<pair<CollectionValue*, int64_t>>* collection_values, int* total_var_len) {
   string_values->clear();
   collection_values->clear();
   *total_var_len = 0;
 
-  for (const SlotDescriptor* string_slot: sort_tuple_desc_->string_slots()) {
+  CollectNonNullNonSmallVarSlotsHelper(src, tuple_desc, string_values, collection_values,
+      total_var_len);
+}
+
+void Sorter::Run::CollectNonNullNonSmallVarSlotsHelper(Tuple* src,
+    const TupleDescriptor& tuple_desc, vector<StringValue*>* string_values,
+    std::vector<std::pair<CollectionValue*, int64_t>>* collection_values,
+    int* total_var_len) {
+  for (const SlotDescriptor* string_slot: tuple_desc.string_slots()) {
     if (!src->IsNull(string_slot->null_indicator_offset())) {
       StringValue* string_val =
           reinterpret_cast<StringValue*>(src->GetSlot(string_slot->tuple_offset()));
@@ -610,14 +628,26 @@ void Sorter::Run::CollectNonNullNonSmallVarSlots(Tuple* src,
     }
   }
 
-  for (const SlotDescriptor* collection_slot: sort_tuple_desc_->collection_slots()) {
+  for (const SlotDescriptor* collection_slot: tuple_desc.collection_slots()) {
     if (!src->IsNull(collection_slot->null_indicator_offset())) {
       CollectionValue* collection_value = reinterpret_cast<CollectionValue*>(
           src->GetSlot(collection_slot->tuple_offset()));
-      int64_t byte_size = collection_value->ByteSize(
-          *collection_slot->children_tuple_descriptor());
-      collection_values->push_back(CollValueAndSize(collection_value, byte_size));
+      const TupleDescriptor& child_tuple_desc =
+          *collection_slot->children_tuple_descriptor();
+
+      int64_t byte_size = collection_value->ByteSize(child_tuple_desc);
       *total_var_len += byte_size;
+
+      // Recurse first so that children come before parents in the list. See the function
+      // comment in the header for the reason.
+      for (int i = 0; i < collection_value->num_tuples; i++) {
+        Tuple* child_tuple = reinterpret_cast<Tuple*>(
+            collection_value->ptr + i * child_tuple_desc.byte_size());
+        CollectNonNullNonSmallVarSlotsHelper(child_tuple, child_tuple_desc, string_values,
+            collection_values, total_var_len);
+      }
+
+      collection_values->push_back(make_pair(collection_value, byte_size));
     }
   }
 }
@@ -649,17 +679,19 @@ Status Sorter::Run::AddPage(vector<Page>* page_sequence) {
 }
 
 void Sorter::Run::CopyVarLenData(const vector<StringValue*>& string_values,
-    const vector<CollValueAndSize>& collection_values_and_sizes, uint8_t* dest) {
+    const vector<pair<CollectionValue*, int64_t>>& collection_values_and_sizes,
+    uint8_t* dest) {
   for (StringValue* string_val: string_values) {
+    DCHECK(!string_val->IsSmall());
     Ubsan::MemCpy(dest, string_val->Ptr(), string_val->Len());
     string_val->SetPtr(reinterpret_cast<char*>(dest));
     dest += string_val->Len();
   }
 
-  // TODO IMPALA-10939: Check embedded varlen types recursively.
-  for (const CollValueAndSize& coll_val_and_size : collection_values_and_sizes) {
-    CollectionValue* coll_val = coll_val_and_size.coll_value;
-    int64_t byte_size = coll_val_and_size.byte_size;
+  for (const pair<CollectionValue*, int64_t>& coll_val_and_size
+      : collection_values_and_sizes) {
+    CollectionValue* coll_val = coll_val_and_size.first;
+    int64_t byte_size = coll_val_and_size.second;
     Ubsan::MemCpy(dest, coll_val->ptr, byte_size);
     coll_val->ptr = dest;
     dest += byte_size;
@@ -676,12 +708,13 @@ void UnpackOffset(uint64_t packed_offset, uint32_t* page_index, uint32_t* page_o
 }
 
 void Sorter::Run::CopyVarLenDataConvertOffset(const vector<StringValue*>& string_values,
-    const vector<CollValueAndSize>& collection_values_and_sizes,
+    const vector<pair<CollectionValue*, int64_t>>& collection_values_and_sizes,
     int page_index, const uint8_t* page_start, uint8_t* dest) {
   DCHECK_GE(page_index, 0);
   DCHECK_GE(dest - page_start, 0);
 
   for (StringValue* string_val : string_values) {
+    DCHECK(!string_val->IsSmall());
     memcpy(dest, string_val->Ptr(), string_val->Len());
     DCHECK_LE(dest - page_start, sorter_->page_len_);
     DCHECK_LE(dest - page_start, numeric_limits<uint32_t>::max());
@@ -691,10 +724,10 @@ void Sorter::Run::CopyVarLenDataConvertOffset(const vector<StringValue*>& string
     dest += string_val->Len();
   }
 
-  // TODO IMPALA-10939: Check embedded varlen types recursively.
-  for (const CollValueAndSize&  coll_val_and_size : collection_values_and_sizes) {
-    CollectionValue* coll_value = coll_val_and_size.coll_value;
-    int64_t byte_size = coll_val_and_size.byte_size;
+  for (const pair<CollectionValue*, int64_t>&  coll_val_and_size
+      : collection_values_and_sizes) {
+    CollectionValue* coll_value = coll_val_and_size.first;
+    int64_t byte_size = coll_val_and_size.second;
     memcpy(dest, coll_value->ptr, byte_size);
     DCHECK_LE(dest - page_start, sorter_->page_len_);
     DCHECK_LE(dest - page_start, numeric_limits<uint32_t>::max());
@@ -705,24 +738,24 @@ void Sorter::Run::CopyVarLenDataConvertOffset(const vector<StringValue*>& string
   }
 }
 
-bool Sorter::Run::ConvertOffsetsToPtrs(Tuple* tuple) {
+bool Sorter::Run::ConvertOffsetsToPtrs(Tuple* tuple, const TupleDescriptor& tuple_desc) {
   // We need to be careful to handle the case where var_len_pages_ is empty,
   // e.g. if all strings are nullptr.
   uint8_t* page_start =
       var_len_pages_.empty() ? nullptr : var_len_pages_[var_len_pages_index_].data();
 
-  bool strings_converted = ConvertStringValueOffsetsToPtrs(tuple, page_start);
+  bool strings_converted = ConvertStringValueOffsetsToPtrs(tuple, tuple_desc, page_start);
   if (!strings_converted) return false;
-  return ConvertCollectionValueOffsetsToPtrs(tuple, page_start);
+  return ConvertCollectionValueOffsetsToPtrs(tuple, tuple_desc, page_start);
 }
 
-// Helpers for Sorter::Run::ConvertValueOffsetsToPtr() to get the byte size based on the
+// Helpers for Sorter::Run::ConvertValueOffsetsToPtr() to get the data size based on the
 // type.
-int64_t GetByteSize(const StringValue& string_value, const SlotDescriptor& slot_desc) {
+int64_t GetDataSize(const StringValue& string_value, const SlotDescriptor& slot_desc) {
   return string_value.IsSmall() ? 0 : string_value.Len();
 }
 
-int64_t GetByteSize(const CollectionValue& collection_value,
+int64_t GetDataSize(const CollectionValue& collection_value,
     const SlotDescriptor& slot_desc) {
  return collection_value.ByteSize(*slot_desc.children_tuple_descriptor());
 }
@@ -750,6 +783,9 @@ bool Sorter::Run::ConvertValueOffsetsToPtrs(Tuple* tuple, uint8_t* page_start,
     const vector<SlotDescriptor*>& slots) {
   static_assert(std::is_same_v<ValueType, StringValue>
       || std::is_same_v<ValueType, CollectionValue>);
+
+  constexpr bool IS_COLLECTION = std::is_same_v<ValueType, CollectionValue>;
+
   for (int idx = 0; idx < slots.size(); ++idx) {
     SlotDescriptor* slot_desc = slots[idx];
     if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
@@ -757,11 +793,11 @@ bool Sorter::Run::ConvertValueOffsetsToPtrs(Tuple* tuple, uint8_t* page_start,
     ValueType* value = reinterpret_cast<ValueType*>(
         tuple->GetSlot(slot_desc->tuple_offset()));
 
-    if constexpr (std::is_same_v<ValueType, StringValue>) {
+    if constexpr (IS_COLLECTION) {
+      DCHECK(slot_desc->type().IsCollectionType());
+    } else {
       DCHECK(slot_desc->type().IsVarLenStringType());
       if (value->IsSmall()) continue;
-    } else {
-      DCHECK(slot_desc->type().IsCollectionType());
     }
 
     // packed_offset includes the page index in the upper 32 bits and the page
@@ -784,34 +820,44 @@ bool Sorter::Run::ConvertValueOffsetsToPtrs(Tuple* tuple, uint8_t* page_start,
       DCHECK(AllPrevSlotsAreNullsOrSmall<ValueType>(tuple, slots, idx));
       return false;
     }
-
     DCHECK_EQ(page_index, var_len_pages_index_);
 
-    const int64_t byte_size = GetByteSize(*value, *slot_desc);
-
+    const int64_t data_size = GetDataSize(*value, *slot_desc);
     if (var_len_pages_.empty()) {
-      DCHECK_EQ(byte_size, 0);
+      DCHECK_EQ(data_size, 0);
     } else {
-      DCHECK_LE(page_offset + byte_size, var_len_pages_[page_index].valid_data_len());
+      DCHECK_LE(page_offset + data_size, var_len_pages_[page_index].valid_data_len());
     }
     // Calculate the address implied by the offset and assign it. May be nullptr for
     // zero-length strings if there are no pages in the run since page_start is nullptr.
     DCHECK(page_start != nullptr || page_offset == 0);
     using ptr_type = decltype(value->Ptr());
     value->SetPtr(reinterpret_cast<ptr_type>(page_start + page_offset));
+
+    if constexpr (IS_COLLECTION) {
+      const TupleDescriptor* children_tuple_desc = slot_desc->children_tuple_descriptor();
+      DCHECK(children_tuple_desc != nullptr);
+      for (int i = 0; i < value->num_tuples; i++) {
+        Tuple* child_tuple = reinterpret_cast<Tuple*>(
+            value->ptr + i * children_tuple_desc->byte_size());
+
+        if (!ConvertOffsetsToPtrs(child_tuple, *children_tuple_desc)) return false;
+      }
+    }
   }
   return true;
 }
 
-bool Sorter::Run::ConvertStringValueOffsetsToPtrs(Tuple* tuple, uint8_t* page_start) {
-  const vector<SlotDescriptor*>& string_slots = sort_tuple_desc_->string_slots();
+bool Sorter::Run::ConvertStringValueOffsetsToPtrs(Tuple* tuple,
+    const TupleDescriptor& tuple_desc, uint8_t* page_start) {
+  const vector<SlotDescriptor*>& string_slots = tuple_desc.string_slots();
   return ConvertValueOffsetsToPtrs<StringValue>(tuple, page_start, string_slots);
 }
 
-bool Sorter::Run::ConvertCollectionValueOffsetsToPtrs(Tuple* tuple, uint8_t* page_start) {
-  const vector<SlotDescriptor*>& collection_slots = sort_tuple_desc_->collection_slots();
+bool Sorter::Run::ConvertCollectionValueOffsetsToPtrs(Tuple* tuple,
+    const TupleDescriptor& tuple_desc, uint8_t* page_start) {
+  const vector<SlotDescriptor*>& collection_slots = tuple_desc.collection_slots();
   return ConvertValueOffsetsToPtrs<CollectionValue>(tuple, page_start, collection_slots);
-
 }
 
 int64_t Sorter::Run::TotalBytes() const {
diff --git a/be/src/runtime/tuple-ir.cc b/be/src/runtime/tuple-ir.cc
index d87190d65..fca74031d 100644
--- a/be/src/runtime/tuple-ir.cc
+++ b/be/src/runtime/tuple-ir.cc
@@ -21,6 +21,9 @@
 
 namespace impala {
 
+// Used to force the compilation of the CodegenTypes struct.
+void Tuple::dummy(Tuple::CodegenTypes*) {}
+
 bool Tuple::CopyStrings(const char* err_ctx, RuntimeState* state,
     const SlotOffsets* string_slot_offsets, int num_string_slots, MemPool* pool,
     Status* status) noexcept {
diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc
index ff79c852d..a17f1c1e9 100644
--- a/be/src/runtime/tuple.cc
+++ b/be/src/runtime/tuple.cc
@@ -42,6 +42,7 @@
 namespace impala {
 
 const char* Tuple::LLVM_CLASS_NAME = "class.impala::Tuple";
+const char* Tuple::CodegenTypes::LLVM_CLASS_NAME = "struct.impala::Tuple::CodegenTypes";
 const char* SlotOffsets::LLVM_CLASS_NAME = "struct.impala::SlotOffsets";
 
 const char* Tuple::MATERIALIZE_EXPRS_SYMBOL = "MaterializeExprsILb0ELb0";
@@ -241,13 +242,41 @@ void Tuple::SetNullIndicators(NullIndicatorOffset offset, int64_t num_tuples,
   }
 }
 
+// Note: 'noexcept' is added because otherwise in LLVM code this function is called with
+// the 'invoke' instruction, not the 'call' instruction. This is bad because we can't
+// replace functions called with 'invoke' with a codegen'd function.
+//
+// The 'invoke' instruction is for functions that can throw exceptions. It would be chosen
+// here - if we didn't add 'noexcept' - because std::vector<...> (the actual type of
+// 'non_null_string_values' and 'non_null_collection_values') can throw if it fails to
+// allocate memory when it tries to grow. If a noexcept function throws, std::terminate()
+// is called (so no Undefined Behaviour).
+//
+// Currently the hand-crafted function we replace this function with doesn't use the
+// vectors (COLLECT_VAR_LEN_VALS is always false), so no exception will be thrown in
+// codegen code.
+//
+// In interpreted mode, if we crash because of std::terminate(), it is the same as what we
+// did before changing the type of these parameters to std::vector<...>: vector resizing
+// was done in the caller, but there was no catch block for it. This will also be the case
+// if the hand-crafted function in codegen code starts using the vectors.
 template <bool COLLECT_VAR_LEN_VALS, bool NO_POOL>
 void Tuple::MaterializeExprs(TupleRow* row, const TupleDescriptor& desc,
     ScalarExprEvaluator* const* evals, MemPool* pool,
-    StringValue** non_null_string_values, CollValueAndSize* non_null_collection_values,
+    CodegenTypes::StringValuePtrVecType* non_null_string_values,
+    CodegenTypes::CollValuePtrAndSizeVecType* non_null_collection_values,
     int* total_varlen_lengths,
-    int* num_non_null_string_values, int* num_non_null_collection_values) {
+    int* num_non_null_string_values, int* num_non_null_collection_values) noexcept {
+  if constexpr (COLLECT_VAR_LEN_VALS) {
+    DCHECK(non_null_string_values != nullptr);
+    DCHECK(non_null_collection_values != nullptr);
+    DCHECK(total_varlen_lengths != nullptr);
+    DCHECK(num_non_null_string_values != nullptr);
+    DCHECK(num_non_null_collection_values != nullptr);
+  }
+
   ClearNullBits(desc);
+
   // Evaluate the materialize_expr_evals and place the results in the tuple.
   for (int i = 0; i < desc.slots().size(); ++i) {
     SlotDescriptor* slot_desc = desc.slots()[i];
@@ -258,27 +287,30 @@ void Tuple::MaterializeExprs(TupleRow* row, const TupleDescriptor& desc,
         slot_desc->type() == evals[i]->root().type());
     void* src = evals[i]->GetValue(row);
 
-    vector<StringValue*> string_values;
-    vector<pair<CollectionValue*, int64_t>> collection_values;
+    const size_t old_num_string_values = COLLECT_VAR_LEN_VALS ?
+        non_null_string_values->size() : 0;
+    const size_t old_num_collection_values = COLLECT_VAR_LEN_VALS ?
+        non_null_collection_values->size() : 0;
     RawValue::Write<COLLECT_VAR_LEN_VALS>(src, this, slot_desc, pool,
-        &string_values, &collection_values);
-    if (string_values.size() > 0) {
-      for (StringValue* string_val : string_values) {
-        *(non_null_string_values++) = string_val;
-        if (string_val->IsSmall()) continue;
+        non_null_string_values, non_null_collection_values);
+
+    if constexpr (COLLECT_VAR_LEN_VALS) {
+      const size_t new_num_string_values = non_null_string_values->size();
+      for (size_t i = old_num_string_values; i < new_num_string_values; i++) {
+        const StringValue* string_val = (*non_null_string_values)[i];
+        DCHECK(!string_val->IsSmall());
         *total_varlen_lengths += string_val->Len();
       }
-      (*num_non_null_string_values) += string_values.size();
-    }
+      (*num_non_null_string_values) += new_num_string_values - old_num_string_values;
 
-    if (collection_values.size() > 0) {
-      for (const pair<CollectionValue*, int64_t>& collection_val_pair
-          : collection_values) {
-        CollValueAndSize cvs(collection_val_pair.first, collection_val_pair.second);
-        *(non_null_collection_values++) = cvs;
+      const size_t new_num_collection_values = non_null_collection_values->size();
+      for (size_t i = old_num_collection_values; i < new_num_collection_values; i++) {
+        const pair<CollectionValue*, int64_t>& collection_val_pair =
+            (*non_null_collection_values)[i];
         *total_varlen_lengths += collection_val_pair.second;
       }
-      (*num_non_null_collection_values) += collection_values.size();
+      (*num_non_null_collection_values) +=
+          new_num_collection_values - old_num_collection_values;
     }
   }
 }
@@ -315,14 +347,14 @@ char* Tuple::AllocateStrings(const char* err_ctx, RuntimeState* state,
 //   order by l_comment
 //   limit 10;
 //
+//
 // define void @MaterializeExprs(%"class.impala::Tuple"* %opaque_tuple,
 //     %"class.impala::TupleRow"* %row, %"class.impala::TupleDescriptor"* %desc,
 //     %"class.impala::ScalarExprEvaluator"** %slot_materialize_expr_evals,
 //     %"class.impala::MemPool"* %pool,
-//     %"struct.impala::StringValue"** %non_null_string_values,
-//     %"struct.impala::CollValueAndSize"* %non_null_collection_values,
-//     i32* %total_varlen_lengths, i32* %num_non_null_string_values,
-//     i32* %num_non_null_collection_values) #50 {
+//     %"class.std::vector.1154"* %non_null_string_values,
+//     %"class.std::vector.1159"* %non_null_collection_values, i32* %total_varlen_lengths,
+//     i32* %num_non_null_string_values, i32* %num_non_null_collection_values) #50 {
 // entry:
 //   %tuple = bitcast %"class.impala::Tuple"* %opaque_tuple
 //       to <{ %"struct.impala::StringValue", i8 }>*
@@ -336,7 +368,8 @@ char* Tuple::AllocateStrings(const char* err_ctx, RuntimeState* state,
 //   %src = call { i64, i8* } @GetSlotRef.4(
 //       %"class.impala::ScalarExprEvaluator"* %expr_eval,
 //       %"class.impala::TupleRow"* %row)
-//   ; -- generated by CodegenAnyVal::ToReadWriteInfo() and SlotDescriptor::WriteToSlot()
+//   ; -- generated by CodegenAnyVal::ToReadWriteInfo() and
+//   ; -- SlotDescriptor::CodegenWriteToSlot()
 //   br label %entry1
 //
 // entry1:                                           ; preds = %entry
@@ -353,11 +386,11 @@ char* Tuple::AllocateStrings(const char* err_ctx, RuntimeState* state,
 //       <{ %"struct.impala::StringValue", i8 }>* %tuple, i32 0, i32 0
 //   %5 = insertvalue %"struct.impala::StringValue" zeroinitializer, i32 %4, 1
 //   %6 = sext i32 %4 to i64
-//   %new_ptr = call i8* @_ZN6impala7MemPool8AllocateILb0EEEPhli(
+//   %new_coll_val_ptr = call i8* @_ZN6impala7MemPool8AllocateILb0EEEPhli(
 //       %"class.impala::MemPool"* %pool, i64 %6, i32 8)
-//   call void @llvm.memcpy.p0i8.p0i8.i32(i8* %new_ptr, i8* %src2, i32 %4, i32 0,
+//   call void @llvm.memcpy.p0i8.p0i8.i32(i8* %new_coll_val_ptr, i8* %src2, i32 %4, i32 0,
 //       i1 false)
-//   %7 = insertvalue %"struct.impala::StringValue" %5, i8* %new_ptr, 0
+//   %7 = insertvalue %"struct.impala::StringValue" %5, i8* %new_coll_val_ptr, 0
 //   store %"struct.impala::StringValue" %7, %"struct.impala::StringValue"* %slot
 //   br label %end_write
 //
@@ -370,7 +403,7 @@ char* Tuple::AllocateStrings(const char* err_ctx, RuntimeState* state,
 //   br label %end_write
 //
 // end_write:                                        ; preds = %null, %non_null
-//   ; -- end CodegenAnyVal::ToReadWriteInfo() and SlotDescriptor::WriteToSlot() --------
+//   ; -- end CodegenAnyVal::ToReadWriteInfo() and SlotDescriptor::CodegenWriteToSlot()
 //   ret void
 // }
 Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_varlen_vals,
@@ -398,8 +431,8 @@ Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_varlen_
   // used in xcompiled IR). With 'pool':
   // void MaterializeExprs(Tuple* opaque_tuple, TupleRow* row,
   //     const TupleDescriptor& desc, ScalarExprEvaluator* const* evals, MemPool* pool,
-  //     StringValue** non_null_string_values,
-  //     CollValueAndSize* non_null_collection_values,
+  //     CodegenTypes::StringValuePtrVecType* non_null_string_values,
+  //     CodegenTypes::CollValuePtrAndSizeVecType* non_null_collection_values,
   //     int* total_varlen_lengths, int* num_non_null_string_values,
   //     int* num_non_null_collection_values);
   llvm::PointerType* opaque_tuple_type = codegen->GetStructPtrType<Tuple>();
@@ -408,10 +441,12 @@ Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_varlen_
   llvm::PointerType* expr_evals_type =
       codegen->GetStructPtrPtrType<ScalarExprEvaluator>();
   llvm::PointerType* pool_type = codegen->GetStructPtrType<MemPool>();
-  llvm::PointerType* string_values_type = codegen->GetStructPtrPtrType<StringValue>();
-  llvm::PointerType* coll_values_and_sizes_type =
-      codegen->GetStructPtrType<CollValueAndSize>();
+  llvm::Type* string_values_type =
+      CodegenTypes::getStringValuePtrVecType(codegen)->getPointerTo();
+  llvm::Type* coll_values_and_sizes_type =
+      CodegenTypes::getCollValuePtrAndSizeVecType(codegen)->getPointerTo();
   llvm::PointerType* int_ptr_type = codegen->i32_ptr_type();
+
   LlvmCodeGen::FnPrototype prototype(codegen, "MaterializeExprs", codegen->void_type());
   prototype.AddArgument("opaque_tuple", opaque_tuple_type);
   prototype.AddArgument("row", row_type);
@@ -441,9 +476,8 @@ Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_varlen_
 
   // Cast the opaque Tuple* argument to the generated struct type
   llvm::Type* tuple_struct_type = desc.GetLlvmStruct(codegen);
-  if (tuple_struct_type == NULL) {
-    return Status("CodegenMaterializeExprs(): failed to generate tuple desc");
-  }
+  DCHECK(tuple_struct_type != nullptr);
+
   llvm::PointerType* tuple_type = codegen->GetPtrType(tuple_struct_type);
   llvm::Value* tuple = builder.CreateBitCast(opaque_tuple_arg, tuple_type, "tuple");
 
@@ -541,16 +575,26 @@ llvm::Constant* SlotOffsets::ToIR(LlvmCodeGen* codegen) const {
           codegen->GetI32Constant(tuple_offset)});
 }
 
+llvm::Type* Tuple::CodegenTypes::getStringValuePtrVecType(LlvmCodeGen* codegen) {
+  llvm::StructType* codegenTypes = codegen->GetStructType<Tuple::CodegenTypes>();
+  return codegenTypes->getElementType(0);
+}
+
+llvm::Type* Tuple::CodegenTypes::getCollValuePtrAndSizeVecType(LlvmCodeGen* codegen) {
+  llvm::StructType* codegenTypes = codegen->GetStructType<Tuple::CodegenTypes>();
+  return codegenTypes->getElementType(1);
+}
+
 template void Tuple::MaterializeExprs<false, false>(TupleRow*, const TupleDescriptor&,
-    ScalarExprEvaluator* const*, MemPool*, StringValue**, CollValueAndSize*,
-    int*, int*, int*);
+    ScalarExprEvaluator* const*, MemPool*, Tuple::CodegenTypes::StringValuePtrVecType*,
+    Tuple::CodegenTypes::CollValuePtrAndSizeVecType*, int*, int*, int*);
 template void Tuple::MaterializeExprs<false, true>(TupleRow*, const TupleDescriptor&,
-    ScalarExprEvaluator* const*, MemPool*, StringValue**, CollValueAndSize*,
-    int*, int*, int*);
+    ScalarExprEvaluator* const*, MemPool*, Tuple::CodegenTypes::StringValuePtrVecType*,
+    Tuple::CodegenTypes::CollValuePtrAndSizeVecType*, int*, int*, int*);
 template void Tuple::MaterializeExprs<true, false>(TupleRow*, const TupleDescriptor&,
-    ScalarExprEvaluator* const*, MemPool*, StringValue**, CollValueAndSize*,
-    int*, int*, int*);
+    ScalarExprEvaluator* const*, MemPool*, Tuple::CodegenTypes::StringValuePtrVecType*,
+    Tuple::CodegenTypes::CollValuePtrAndSizeVecType*, int*, int*, int*);
 template void Tuple::MaterializeExprs<true, true>(TupleRow*, const TupleDescriptor&,
-    ScalarExprEvaluator* const*, MemPool*, StringValue**, CollValueAndSize*,
-    int*, int*, int*);
+    ScalarExprEvaluator* const*, MemPool*, Tuple::CodegenTypes::StringValuePtrVecType*,
+    Tuple::CodegenTypes::CollValuePtrAndSizeVecType*, int*, int*, int*);
 }
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index d7c1c89f3..1682e5d9b 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -166,52 +166,55 @@ class Tuple {
   /// IR function for the case 'pool' is non-NULL and false for the NULL pool case.
   ///
   /// If 'COLLECT_VAR_LEN_VALS' is true
-  ///  - the materialized non-NULL string value slots and are returned in
-  ///    'non_null_string_values',
+  ///  - the materialized non-NULL non-smallified string value slots are returned in
+  ///    'non_null_string_values' - smallified strings (see Small
+  ///    String Optimization, IMPALA-12373) are not collected;
   ///  - the materialized non-NULL collection value slots, along with their byte sizes,
   ///    are returned in 'non_null_collection_values' and
   ///  - the total length of the string and collection slots is returned in
   ///    'total_varlen_lengths'.
   /// 'non_null_string_values', 'non_null_collection_values' and 'total_varlen_lengths'
-  /// must be non-NULL in this case. 'non_null_string_values' and
-  /// 'non_null_collection_values' do not need to be empty; their original contents will
-  /// be overwritten.
+  /// must be non-NULL in this case. Their contents will be overwritten. The var-len
+  /// values are collected recursively. This means that if there are no collections with
+  /// variable length types in the tuple, the length of 'non_null_string_values' will be
+  /// 'desc.string_slots().size()' and the length of 'non_null_collection_values' will be
+  /// 'desc.collection_slots().size()'; if there are collections with variable length
+  /// types, the lengths of these vectors will be more.
+  ///
+  /// Children are placed before their parents in the vectors (post-order traversal). The
+  /// order matters in case of serialisation. When a child is serialised, the pointer of
+  /// the parent needs to be updated. If the parent itself also needs to be serialised
+  /// (because it is also a var-len child of a 'CollectionValue'), it should be serialised
+  /// after its child so that its pointer is already updated at that time. For the same
+  /// reason, the 'StringValue's must be serialised before the 'CollectionValue's -
+  /// strings can be children of collections but not the other way around. For more see
+  /// Sorter::Run::CollectNonNullVarSlots().
   template <bool COLLECT_VAR_LEN_VALS, bool NULL_POOL>
   inline void IR_ALWAYS_INLINE MaterializeExprs(TupleRow* row,
       const TupleDescriptor& desc, const std::vector<ScalarExprEvaluator*>& evals,
       MemPool* pool, std::vector<StringValue*>* non_null_string_values = nullptr,
-      std::vector<CollValueAndSize>* non_null_collection_values = nullptr,
+      std::vector<std::pair<CollectionValue*, int64_t>>*
+         non_null_collection_values = nullptr,
       int* total_varlen_lengths = nullptr) {
     DCHECK_EQ(NULL_POOL, pool == nullptr);
     DCHECK_EQ(evals.size(), desc.slots().size());
 
-    StringValue** non_null_string_values_array = nullptr;
-    CollValueAndSize* non_null_coll_vals_and_sizes_array = nullptr;
     int num_non_null_string_values = 0;
     int num_non_null_collection_values = 0;
     if (COLLECT_VAR_LEN_VALS) {
       DCHECK(non_null_string_values != nullptr);
       DCHECK(non_null_collection_values != nullptr);
       DCHECK(total_varlen_lengths != nullptr);
-      // vector::resize() will zero-initialize any new values, so we resize to the largest
-      // possible size here, then truncate the vector below once we know the actual size
-      // (which preserves already-written values).
-      non_null_string_values->resize(desc.string_slots().size());
-      non_null_collection_values->resize(desc.collection_slots().size());
-
-      non_null_string_values_array = non_null_string_values->data();
-      non_null_coll_vals_and_sizes_array = non_null_collection_values->data();
 
+      non_null_string_values->clear();
+      non_null_collection_values->clear();
       *total_varlen_lengths = 0;
     }
+
     MaterializeExprs<COLLECT_VAR_LEN_VALS, NULL_POOL>(row, desc,
-        evals.data(), pool, non_null_string_values_array,
-        non_null_coll_vals_and_sizes_array, total_varlen_lengths,
-        &num_non_null_string_values, &num_non_null_collection_values);
-    if (COLLECT_VAR_LEN_VALS) {
-      non_null_string_values->resize(num_non_null_string_values);
-      non_null_collection_values->resize(num_non_null_collection_values);
-    }
+        evals.data(), pool, non_null_string_values, non_null_collection_values,
+        total_varlen_lengths, &num_non_null_string_values,
+        &num_non_null_collection_values);
   }
 
   /// Copy the var-len string data in this tuple into the provided memory pool and update
@@ -343,15 +346,37 @@ class Tuple {
   void DeepCopyVarlenData(const TupleDescriptor& desc, char** data, int* offset,
       bool convert_ptrs);
 
+  /// During the construction of hand-crafted codegen'd functions, types cannot generally
+  /// be looked up by name. In our own types we use the static 'LLVM_CLASS_NAME' member to
+  /// facilitate this, but it cannot be used with other types, for example standard
+  /// containers. This struct contains members with types that we'd like to use - struct
+  /// members can be retrieved by their index in LLVM.
+  struct CodegenTypes {
+    // Use type aliases to ensure that we use the same types in codegen and the
+    // corresponding normal code.
+    using StringValuePtrVecType = std::vector<StringValue*>;
+    using CollValuePtrAndSizeVecType = std::vector<std::pair<CollectionValue*, int64_t>>;
+
+    StringValuePtrVecType string_value_vec;
+    CollValuePtrAndSizeVecType coll_size_andvalue_vec;
+
+    static llvm::Type* getStringValuePtrVecType(LlvmCodeGen* codegen);
+    static llvm::Type* getCollValuePtrAndSizeVecType(LlvmCodeGen* codegen);
+
+    /// For C++/IR interop, we need to be able to look up types by name.
+    static const char* LLVM_CLASS_NAME;
+  };
+
   /// Implementation of MaterializedExprs(). This function is replaced during codegen.
   /// 'num_non_null_string_values' and 'num_non_null_collection_values' must be
   /// initialized by the caller.
   template <bool COLLECT_VAR_LEN_VALS, bool NULL_POOL>
   void IR_NO_INLINE MaterializeExprs(TupleRow* row, const TupleDescriptor& desc,
       ScalarExprEvaluator* const* evals, MemPool* pool,
-      StringValue** non_null_string_values, CollValueAndSize* non_null_collection_values,
+      CodegenTypes::StringValuePtrVecType* non_null_string_values,
+      CodegenTypes::CollValuePtrAndSizeVecType* non_null_collection_values,
       int* total_varlen_lengths, int* num_non_null_string_values,
-      int* num_non_null_collection_values);
+      int* num_non_null_collection_values) noexcept;
 
   /// Helper for CopyStrings() to allocate 'bytes' of memory. Returns a pointer to the
   /// allocated buffer on success. Otherwise an error was encountered, in which case NULL
@@ -359,6 +384,9 @@ class Tuple {
   /// avoid emitting unnecessary code for ~Status() in perf-critical code.
   char* AllocateStrings(const char* err_ctx, RuntimeState* state, int64_t bytes,
       MemPool* pool, Status* status) noexcept;
+
+  // Defined in tuple-ir.cc to force the compilation of the CodegenTypes struct.
+  void dummy(Tuple::CodegenTypes*);
 };
 
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 230cf351a..91e3a0151 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -1670,9 +1670,10 @@ public class Analyzer {
       for (StructField structField : type.getFields()) {
         // 'slotPath' could be null e.g. when the query has an order by clause and
         // this is the sorting tuple.
-        // TODO: IMPALA-10939: When we enable collections in sorting tuples we need to
-        // revisit this. Currently collection SlotDescriptors cannot be created without a
-        // path. Maybe descriptors should have a path even in the sorting tuple.
+        // TODO: IMPALA-12160: When we enable structs containing collections in sorting
+        // tuples we need to revisit this. Currently collection SlotDescriptors cannot be
+        // created without a path. Maybe descriptors should have a path even in the
+        // sorting tuple.
         if (slotPath == null) {
           createStructTuplesAndSlotDescsWithoutPath(slotPath, structField);
         } else {
diff --git a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
index 3ac0142a5..b5a036975 100644
--- a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
@@ -321,7 +321,7 @@ public abstract class QueryStmt extends StatementBase {
       }
     }
 
-    checkForVarLenCollectionSorting(analyzer);
+    checkTypeValidityForSorting(analyzer);
 
     sortInfo_.createSortTupleInfo(resultExprs_, analyzer);
 
@@ -344,13 +344,14 @@ public abstract class QueryStmt extends StatementBase {
     substituteResultExprs(smap, analyzer);
   }
 
-  private void checkForVarLenCollectionSorting(Analyzer analyzer)
+  private void checkTypeValidityForSorting(Analyzer analyzer)
       throws AnalysisException {
     for (Expr expr: getResultExprs()) {
       Type exprType = expr.getType();
-      Optional<String> err = SortInfo.checkTypeForVarLenCollection(exprType);
-      if (err.isPresent()) {
-        throw new AnalysisException(err.get());
+      if (!SortInfo.isValidInSortingTuple(exprType)) {
+        String error = "Sorting is not supported if the select list "
+            + "contains collection(s) nested in struct(s).";
+        throw new AnalysisException(error);
       }
     }
   }
diff --git a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
index 95e65c8d6..b15eabbbb 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
@@ -332,68 +332,42 @@ public class SortInfo {
     return ProcessingCost.basicCost(label, inputCardinality, weight);
   }
 
-  // Collections with variable length data as well as any collections within structs are
-  // currently not allowed in the sorting tuple (see IMPALA-12019 and IMPALA-10939). This
-  // function checks whether the given type is allowed in the sorting tuple: returns an
-  // empty 'Optional' if the type is allowed, or an 'Optional' with an error message if it
-  // is not.
-  public static Optional<String> checkTypeForVarLenCollection(Type type) {
-    final String errorMsg = "Sorting is not supported if the select list contains " +
-      "(possibly nested) collections with variable length data types.";
-
+  // Collections within structs (also if they are nested in another struct or collection)
+  // are currently not allowed in the sorting tuple (see IMPALA-12160). This function
+  // returns whether the given type is allowed in the sorting tuple.
+  public static boolean isValidInSortingTuple(Type type) {
     if (type.isCollectionType()) {
       if (type instanceof ArrayType) {
         ArrayType arrayType = (ArrayType) type;
-        return isAllowedCollectionItemForSorting(arrayType.getItemType())
-            ? Optional.empty() : Optional.of(errorMsg);
+        return isValidInSortingTuple(arrayType.getItemType());
       } else {
         Preconditions.checkState(type instanceof MapType);
         MapType mapType = (MapType) type;
 
-        if (!isAllowedCollectionItemForSorting(mapType.getKeyType())) {
-          return Optional.of(errorMsg);
-        }
+        if (!isValidInSortingTuple(mapType.getKeyType())) return false;
 
-        return isAllowedCollectionItemForSorting(mapType.getValueType())
-            ? Optional.empty() : Optional.of(errorMsg);
+        return isValidInSortingTuple(mapType.getValueType());
       }
     } else if (type.isStructType()) {
       StructType structType = (StructType) type;
-      return checkStructTypeForVarLenCollection(structType);
+      return isValidStructInSortingTuple(structType);
     }
 
-    return Optional.empty();
+    return true;
   }
 
-  // Helper for checkTypeForVarLenCollection(), see more there.
-  private static Optional<String> checkStructTypeForVarLenCollection(
-      StructType structType) {
+  // Helper for isValidInSortingTuple(), see more there.
+  private static boolean isValidStructInSortingTuple(StructType structType) {
     for (StructField field : structType.getFields()) {
       Type fieldType = field.getType();
       if (fieldType.isStructType()) {
-        return checkStructTypeForVarLenCollection((StructType) fieldType);
+        if (!isValidStructInSortingTuple((StructType) fieldType)) return false;
       } else if (fieldType.isCollectionType()) {
-        // TODO IMPALA-10939: Once we allow sorting collections in structs, test that
+        // TODO IMPALA-12160: Once we allow sorting collections in structs, test that
         // collections containing var-len types are handled correctly.
-        String error = "Sorting is not supported if the select list "
-            + "contains collection(s) nested in struct(s).";
-        return Optional.of(error);
-      }
-    }
-
-    return Optional.empty();
-  }
-
-  private static boolean isAllowedCollectionItemForSorting(Type itemType) {
-    if (itemType.isStructType()) {
-      StructType structType = (StructType) itemType;
-      for (StructField field : structType.getFields()) {
-        Type fieldType = field.getType();
-        if (!isAllowedCollectionItemForSorting(fieldType)) return false;
+        return false;
       }
-      return true;
     }
-
-    return !itemType.isComplexType() && !itemType.isVarLenStringType();
+    return true;
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
index e0a24a161..ae7d2140f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
@@ -114,9 +114,10 @@ public class TupleDescriptor {
   // Tuple of the table masking view that masks this tuple's table.
   private TupleDescriptor maskedByTuple_ = null;
 
-  // If this is a tuple representing the children of a struct slot then 'parentSlot_'
-  // is the struct slot where this tuple belongs. Otherwise it's null.
-  private SlotDescriptor parentStructSlot_ = null;
+  // If this is a tuple representing the children of a struct or collection slot then
+  // 'parentSlot_' is the struct or collection slot where this tuple belongs. Otherwise
+  // it's null.
+  private SlotDescriptor parentSlot_ = null;
 
   // The view that registered this tuple. Null if this is not the result tuple of a view.
   // This affects handling of collections.
@@ -252,9 +253,13 @@ public class TupleDescriptor {
     Preconditions.checkState(parentType.isStructType() || parentType.isCollectionType(),
         "Parent for a TupleDescriptor should be a STRUCT or a COLLECTION. " +
         "Actual type is " + parentType + " Tuple ID: " + getId());
-    parentStructSlot_ = parent;
+    parentSlot_ = parent;
+  }
+  public SlotDescriptor getParentSlotDesc() { return parentSlot_; }
+
+  public boolean isStructChild() {
+    return parentSlot_ != null && parentSlot_.getType().isStructType();
   }
-  public SlotDescriptor getParentSlotDesc() { return parentStructSlot_; }
 
   public String debugString() {
     String tblStr = (getTable() == null ? "null" : getTable().getFullName());
@@ -270,8 +275,8 @@ public class TupleDescriptor {
         .add("is_materialized", isMaterialized_)
         .add("slots", "[" + Joiner.on(", ").join(slotStrings) + "]");
     if (maskedTable_ != null) toStrHelper.add("masks", maskedTable_.getId());
-    if (parentStructSlot_ != null) {
-      toStrHelper.add("parentSlot", parentStructSlot_.getId());
+    if (parentSlot_ != null) {
+      toStrHelper.add("parentSlot", parentSlot_.getId());
     }
     return toStrHelper.toString();
   }
@@ -336,11 +341,11 @@ public class TupleDescriptor {
    * structure of the structs.
    */
   public Pair<Integer, Integer> computeMemLayout() {
-    if (parentStructSlot_ != null) {
+    boolean isChildOfStruct = isStructChild();
+    if (isChildOfStruct) {
       // If this TupleDescriptor represents the children of a STRUCT then the slot
       // offsets are adjusted with the parent struct's offset.
-      Preconditions.checkState(parentStructSlot_.getType().isStructType());
-      Preconditions.checkState(parentStructSlot_.getByteOffset() != -1);
+      Preconditions.checkState(parentSlot_.getByteOffset() != -1);
     }
     if (hasMemLayout_) return null;
     hasMemLayout_ = true;
@@ -371,24 +376,21 @@ public class TupleDescriptor {
     Preconditions.checkState(!slotsBySize.containsKey(0));
     Preconditions.checkState(!slotsBySize.containsKey(-1));
 
-    // The total number of bytes for nullable scalar or nested struct fields will be
-    // computed for the struct at the top level (i.e., parentStructSlot_ == null).
-
-    // If this descriptor is inside a struct then don't need to count for an additional
-    // null byte here as the null indicator will be on the top level tuple. In other
-    // words the total number of bytes for nullable scalar or nested struct fields will
-    // be computed for the struct at the top level (i.e., parentStructSlot_ == null).
-    numNullBytes_ = (parentStructSlot_ == null) ? (numNullBits + 7) / 8 : 0;
+    // If this descriptor is inside a struct then we don't need to count for an additional
+    // null byte here as the null indicator will be on the level of the "root" struct,
+    // i.e. the struct that is either in the top level tuple or in the tuple of a
+    // collection (where 'parentSlot_' is either null or a collection). In other words,
+    // the total number of bytes for nullable scalar or nested struct fields will be
+    // computed for the "root" struct.
+    numNullBytes_ = isChildOfStruct ? 0 : (numNullBits + 7) / 8;
     int slotOffset = 0;
     int nullIndicatorByte = totalSlotSize;
-    if (parentStructSlot_ != null) {
-      nullIndicatorByte = parentStructSlot_.getNullIndicatorByte();
-    }
     int nullIndicatorBit = 0;
-    if (parentStructSlot_ != null) {
+    if (isChildOfStruct) {
+      nullIndicatorByte = parentSlot_.getNullIndicatorByte();
       // If this is a child tuple from a struct then get the next available bit from the
       // parent struct.
-      nullIndicatorBit = (parentStructSlot_.getNullIndicatorBit() + 1) % 8;
+      nullIndicatorBit = (parentSlot_.getNullIndicatorBit() + 1) % 8;
       // If the parent struct ran out of null bits in the current null byte just before
       // this tuple then start using a new byte.
       if (nullIndicatorBit == 0) ++nullIndicatorByte;
@@ -405,8 +407,8 @@ public class TupleDescriptor {
       for (SlotDescriptor d: slotsBySize.get(slotSize)) {
         Preconditions.checkState(d.isMaterialized());
         d.setByteSize(slotSize);
-        d.setByteOffset((parentStructSlot_ == null) ? slotOffset :
-            parentStructSlot_.getByteOffset() + slotOffset);
+        d.setByteOffset(isChildOfStruct ?
+            parentSlot_.getByteOffset() + slotOffset : slotOffset);
         slotOffset += slotSize;
         d.setSlotIdx(slotIdx++);
 
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
index 739f02330..104d40a83 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
@@ -331,12 +331,8 @@ public class AnalyticPlanner {
       for (SlotDescriptor inputSlotDesc: tupleDesc.getSlots()) {
         if (inputSlotDesc.isMaterialized()) {
           // Project out collection slots that are not supported in the sorting tuple
-          // (collections containing var-len types).
-          Optional<String> err = SortInfo.checkTypeForVarLenCollection(
-              inputSlotDesc.getType());
-          // An empty 'Optional' result means there is no error so the type can be put
-          // into the sorting tuple.
-          if (!err.isPresent()) {
+          // (collections within structs).
+          if (SortInfo.isValidInSortingTuple(inputSlotDesc.getType())) {
             inputSlotRefs.add(new SlotRef(inputSlotDesc));
           } else {
             if (LOG.isTraceEnabled()) {
diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
index 30636ad2e..f3971b554 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
@@ -265,10 +265,10 @@ public class UnionNode extends PlanNode {
     for (int i = 0; i < children_.size(); i++) {
       if (!isChildPassthrough(analyzer, children_.get(i), resultExprLists_.get(i))) {
         for (Expr expr : resultExprLists_.get(i)) {
-          Preconditions.checkState(!SortInfo.checkTypeForVarLenCollection(
-              expr.getType()).isPresent(),
-              "only pass-through UNION ALL is supported for collections of "
-              + "variable length types.");
+          // TODO IMPALA-12160: Add tests for collection-in-struct.
+          Preconditions.checkState(SortInfo.isValidInSortingTuple(expr.getType()),
+              "only pass-through UNION ALL is supported for "
+              + "structs containing collections.");
         }
         newResultExprLists.add(resultExprLists_.get(i));
         newChildren.add(children_.get(i));
diff --git a/testdata/ComplexTypesTbl/arrays_big.parq b/testdata/ComplexTypesTbl/arrays_big.parq
new file mode 100644
index 000000000..7872398f2
Binary files /dev/null and b/testdata/ComplexTypesTbl/arrays_big.parq differ
diff --git a/testdata/ComplexTypesTbl/simple_arrays_big.parq b/testdata/ComplexTypesTbl/simple_arrays_big.parq
deleted file mode 100644
index 5cb15885a..000000000
Binary files a/testdata/ComplexTypesTbl/simple_arrays_big.parq and /dev/null differ
diff --git a/testdata/data/README b/testdata/data/README
index c08681a87..c29d8880b 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -970,7 +970,7 @@ And converted the HiveCatalog metadata to HadoopCatalog metadata via scripts at
 And rewrote metadata content to the correct lengths with
 testdata/bin/rewrite-iceberg-metadata.py "" testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/
 
-simple_arrays_big.parq:
+arrays_big.parq:
 Generated with RandomNestedDataGenerator.java from the following schema:
 {
   "fields": [
@@ -980,23 +980,71 @@ Generated with RandomNestedDataGenerator.java from the following schema:
     },
     {
       "name": "string_col",
+      "type": "string"
+    },
+    {
+      "name": "int_array",
       "type": [
         "null",
-        "string"
+        {
+          "type": "array",
+          "items": ["int", "null"]
+        }
       ]
     },
     {
-      "name": "int_array",
-      "type": {
-        "type": "array",
-        "items": "int"
-      }
+      "name": "double_map",
+      "type": [
+        "null",
+        {
+          "type": "map",
+          "values": ["double", "null"]
+        }
+      ]
+    },
+    {
+      "name": "string_array",
+      "type": [
+        "null",
+        {
+          "type": "array",
+          "items": ["string", "null"]
+        }
+      ]
     },
     {
-      "name": "double_array",
-      "type": {
-        "type": "array",
-        "items": "double"
+      "name" : "mixed",
+      "type" : {
+        "type" : "map",
+        "values" : [
+          "null",
+          {
+            "type" : "array",
+            "items" : [
+              "null",
+              {
+                "type": "map",
+                "values": [
+                  "null",
+                  {
+                    "name": "struct_in_mixed",
+                    "type": "record",
+                    "fields": [
+                      {
+                        "name": "string_member",
+                        "type": ["string", "null"]
+                      },
+                      {
+                        "name": "int_member",
+                        "type": ["int", "null"]
+                      }
+                    ]
+                  }
+                ]
+              }
+            ]
+          }
+        ]
       }
     }
   ],
@@ -1005,9 +1053,9 @@ Generated with RandomNestedDataGenerator.java from the following schema:
   "type": "record"
 }
 The following command was used:
-mvn -f "${IMPALA_HOME}/java/datagenerator/pom.xml" exec:java
- -Dexec.mainClass="org.apache.impala.datagenerator.RandomNestedDataGenerator"
- -Dexec.args="${input_table_schema}.avsc 1500000 10 15 '${output_file}.parquet'";
+mvn -f "${IMPALA_HOME}/java/datagenerator/pom.xml" exec:java \
+-Dexec.mainClass="org.apache.impala.datagenerator.RandomNestedDataGenerator" \
+-Dexec.args="${INPUT_TBL_SCHEMA} 1500000 20 15 ${OUTPUT_FILE}"
 
 empty_present_stream.orc:
 Generated by ORC C++ library using the following code
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 79978062f..fc345e042 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -4180,18 +4180,20 @@ AS SELECT id, arr_contains_struct, arr_contains_nested_struct, struct_contains_n
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
-simple_arrays_big
+arrays_big
 ---- COLUMNS
 int_col INT
 string_col STRING
 int_array ARRAY<INT>
-double_array ARRAY<DOUBLE>
+double_map MAP<STRING,DOUBLE>
+string_array ARRAY<STRING>
+mixed MAP<STRING,ARRAY<MAP<STRING,STRUCT<string_member: STRING, int_member: INT>>>>
 ---- DEPENDENT_LOAD
-`hadoop fs -mkdir -p /test-warehouse/simple_arrays_big_parquet && \
-hadoop fs -put -f ${IMPALA_HOME}/testdata/ComplexTypesTbl/simple_arrays_big.parq \
-/test-warehouse/simple_arrays_big_parquet/
+`hadoop fs -mkdir -p /test-warehouse/arrays_big_parquet && \
+hadoop fs -put -f ${IMPALA_HOME}/testdata/ComplexTypesTbl/arrays_big.parq \
+/test-warehouse/arrays_big_parquet/
 ---- DEPENDENT_LOAD_ACID
-INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM functional_parquet.simple_arrays_big;
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM functional_parquet.arrays_big;
 ====
 ---- DATASET
 functional
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index 972715384..d99aac234 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -381,8 +381,8 @@ table_name:collection_struct_mix, constraint:restrict_to, table_format:orc/def/b
 table_name:collection_struct_mix_view, constraint:restrict_to, table_format:parquet/none/none
 table_name:collection_struct_mix_view, constraint:restrict_to, table_format:orc/def/block
 
-table_name:simple_arrays_big, constraint:restrict_to, table_format:parquet/none/none
-table_name:simple_arrays_big, constraint:restrict_to, table_format:orc/def/block
+table_name:arrays_big, constraint:restrict_to, table_format:parquet/none/none
+table_name:arrays_big, constraint:restrict_to, table_format:orc/def/block
 
 table_name:complextypes_maps_view, constraint:restrict_to, table_format:parquet/none/none
 table_name:complextypes_maps_view, constraint:restrict_to, table_format:orc/def/block
diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-array-in-select-list.test b/testdata/workloads/functional-query/queries/QueryTest/nested-array-in-select-list.test
index ca084c3eb..5c84778bd 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/nested-array-in-select-list.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/nested-array-in-select-list.test
@@ -96,72 +96,44 @@ select id, int_array from complextypestbl union select id, int_array from comple
 IllegalStateException: UNION, EXCEPT and INTERSECT are not supported for collection types
 ====
 ---- QUERY
-# Changing a column to a different type leads "non-pass-through" union that does a
-# deepcopy on the tuple, which is only implemented for collections of fixed-length types
-# in BE for arrays.
-select id, int_array from complextypestbl
-  union all select cast(id as tinyint), int_array from complextypestbl
+# Changing a column to a different type leads to a "non-pass-through" union that does a
+# deepcopy on the tuple.
+select id, arr_int_1d, arr_string_3d from collection_tbl
+  union all select cast(id as tinyint), arr_int_1d, arr_string_3d from collection_tbl
 ---- RESULTS
-1,'[1,2,3]'
-2,'[null,1,2,null,3,null]'
-3,'[]'
-4,'NULL'
-5,'NULL'
-6,'NULL'
-7,'NULL'
-8,'[-1]'
-1,'[1,2,3]'
-2,'[null,1,2,null,3,null]'
-3,'[]'
-4,'NULL'
-5,'NULL'
-6,'NULL'
-7,'NULL'
-8,'[-1]'
+1,'[1,2,null]','[[["1","second harmonic",null],["three cities"]],[["four castles"]]]'
+2,'[1,null,3]','[[["second dinosaur bone",null,null],["three dinosaur bones"]],[["one",null,"four dinosaur bones"]]]'
+3,'[null,4679,null,49,null]','[[["1","-1","second even-toed ungulate",null],["three even-toed ungulates"]],[["four even-toed ungulate"]]]'
+1,'[1,2,null]','[[["1","second harmonic",null],["three cities"]],[["four castles"]]]'
+2,'[1,null,3]','[[["second dinosaur bone",null,null],["three dinosaur bones"]],[["one",null,"four dinosaur bones"]]]'
+3,'[null,4679,null,49,null]','[[["1","-1","second even-toed ungulate",null],["three even-toed ungulates"]],[["four even-toed ungulate"]]]'
 ---- TYPES
-bigint,string
+int,string,string
 ====
 ---- QUERY
-# Changing a column to a different type leads "non-pass-through" union that does a
-# deepcopy on the tuple, which is only implemented for collections of fixed-length types
-# in BE for arrays.
-select id, int_array_array from complextypestbl
-  union all select cast(id as tinyint), int_array_array from complextypestbl
----- CATCH
-IllegalStateException: only pass-through UNION ALL is supported for collections of variable length types.
-====
----- QUERY
-# Constants in the select list of unions also lead to "non-pass-through" union but
-# collections of fixed length types are allowed.
-select 1, int_array from complextypestbl
-  union all select 2, int_array from complextypestbl;
+# Constants in the select list of unions also lead to a "non-pass-through" union.
+select 1, arr_int_1d, arr_string_3d from collection_tbl
+  union all select 2, arr_int_1d, arr_string_3d from collection_tbl
 ---- RESULTS
-1,'[-1]'
-1,'[1,2,3]'
-1,'[null,1,2,null,3,null]'
-1,'[]'
-1,'NULL'
-1,'NULL'
-1,'NULL'
-1,'NULL'
-2,'[-1]'
-2,'[1,2,3]'
-2,'[null,1,2,null,3,null]'
-2,'[]'
-2,'NULL'
-2,'NULL'
-2,'NULL'
-2,'NULL'
+1,'[1,2,null]','[[["1","second harmonic",null],["three cities"]],[["four castles"]]]'
+1,'[1,null,3]','[[["second dinosaur bone",null,null],["three dinosaur bones"]],[["one",null,"four dinosaur bones"]]]'
+1,'[null,4679,null,49,null]','[[["1","-1","second even-toed ungulate",null],["three even-toed ungulates"]],[["four even-toed ungulate"]]]'
+2,'[1,2,null]','[[["1","second harmonic",null],["three cities"]],[["four castles"]]]'
+2,'[1,null,3]','[[["second dinosaur bone",null,null],["three dinosaur bones"]],[["one",null,"four dinosaur bones"]]]'
+2,'[null,4679,null,49,null]','[[["1","-1","second even-toed ungulate",null],["three even-toed ungulates"]],[["four even-toed ungulate"]]]'
 ---- TYPES
-tinyint,string
+tinyint,string,string
 ====
 ---- QUERY
-# Constants in the select list of unions also lead to "non-pass-through" union and
-# collections of variable length types are not allowed yet.
-select 1, int_array_array from complextypestbl
-  union all select 2, int_array_array from complextypestbl;
+# Constants in the select list of unions also lead to a "non-pass-through" union and
+# structs containing collections are not allowed there yet because we can't materialise
+# them, see IMPALA-12160. However, set operations on structs are not supported at all at
+# present, and the error reflects that. When both limitations are lifted, the query should
+# succeed.
+select 1, struct_contains_arr, struct_contains_nested_arr, all_mix from collection_struct_mix
+  union all select 2, struct_contains_arr, struct_contains_nested_arr, all_mix from collection_struct_mix
 ---- CATCH
-IllegalStateException: only pass-through UNION ALL is supported for collections of variable length types.
+AnalysisException: Set operations don't support STRUCT type. STRUCT<arr:ARRAY<INT>> in struct_contains_arr
 ====
 ---- QUERY
 select 1 from (select int_array from complextypestbl) s
diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-map-in-select-list.test b/testdata/workloads/functional-query/queries/QueryTest/nested-map-in-select-list.test
index 0f379db1e..337dfbd55 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/nested-map-in-select-list.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/nested-map-in-select-list.test
@@ -96,9 +96,8 @@ select id, int_map from complextypestbl union select id, int_map from complextyp
 IllegalStateException: UNION, EXCEPT and INTERSECT are not supported for collection types
 ====
 ---- QUERY
-# Changing a column to a different type leads "non-pass-through" union that does a
-# deepcopy on the tuple, which is only implemented for collections of fixed-length types
-# in BE for arrays.
+# Changing a column to a different type leads to a "non-pass-through" union that does a
+# deepcopy on the tuple.
 select id, map_int_int from map_non_varlen
   union all select cast(id as tinyint), map_int_int from map_non_varlen
 ---- RESULTS
@@ -126,13 +125,19 @@ select id, map_int_int from map_non_varlen
 int,string
 ====
 ---- QUERY
-# Changing a column to a different type leads "non-pass-through" union that does a
-# deepcopy on the tuple, which is only implemented for collections of fixed-length types
-# in BE for arrays.
-select id, int_map from complextypestbl
-  union all select cast(id as tinyint), int_map from complextypestbl
----- CATCH
-IllegalStateException: only pass-through UNION ALL is supported for collections of variable length types.
+# Changing a column to a different type leads to a "non-pass-through" union that does a
+# deepcopy on the tuple.
+select id, map_1d, map_3d from collection_tbl
+  union all select cast(id as tinyint), map_1d, map_3d from collection_tbl
+---- RESULTS
+1,'{1:"first automobile",2:"second"}','{1:{10:{100:"hundred",200:"two hundred pages"},20:{300:"three hundred pages",400:"four hundred"}},2:{30:{500:"five hundred pages",600:"six hundred"},40:{700:"seven hundred pages",800:"eight hundred"}}}'
+2,'{1:"first dinosaur bone",2:"second",3:null}','{1:{10:{100:"hundred",200:"two hundred dinosaur bones"},20:{300:"three hundred dinosaur bones",400:"four hundred"}},2:{30:{500:"five hundred dinosaur bones",600:"six hundred"},40:{700:"seven hundred dinosaur bones",800:"eight hundred"}}}'
+3,'{645:"fourth even-toed ungulate",5:"fifth"}','{1:{10:{100:"hundred",200:"two hundred even-toed ungulates"},20:{300:"three hundred even-toed ungulates",400:"four hundred"}},2:{30:{500:"five hundred even-toed ungulates",600:"six hundred"},40:{700:"seven hundred even-toed ungulates",800:"eight hundred"}}}'
+1,'{1:"first automobile",2:"second"}','{1:{10:{100:"hundred",200:"two hundred pages"},20:{300:"three hundred pages",400:"four hundred"}},2:{30:{500:"five hundred pages",600:"six hundred"},40:{700:"seven hundred pages",800:"eight hundred"}}}'
+2,'{1:"first dinosaur bone",2:"second",3:null}','{1:{10:{100:"hundred",200:"two hundred dinosaur bones"},20:{300:"three hundred dinosaur bones",400:"four hundred"}},2:{30:{500:"five hundred dinosaur bones",600:"six hundred"},40:{700:"seven hundred dinosaur bones",800:"eight hundred"}}}'
+3,'{645:"fourth even-toed ungulate",5:"fifth"}','{1:{10:{100:"hundred",200:"two hundred even-toed ungulates"},20:{300:"three hundred even-toed ungulates",400:"four hundred"}},2:{30:{500:"five hundred even-toed ungulates",600:"six hundred"},40:{700:"seven hundred even-toed ungulates",800:"eight hundred"}}}'
+---- TYPES
+int,string,string
 ====
 ---- QUERY
 # Constants in the select list of unions also lead to "non-pass-through" union.
@@ -163,11 +168,32 @@ select 1, map_int_int from map_non_varlen
 tinyint,string
 ====
 ---- QUERY
-# Constants in the select list of unions also lead to "non-pass-through" union.
-select 1, int_map from complextypestbl
-  union all select 2, int_map from complextypestbl;
+# Constants in the select list of unions also lead to a "non-pass-through" union.
+select 1, int_map, int_map_array from complextypestbl
+  union all select 2, int_map, int_map_array from complextypestbl;
+
+select 1, map_1d, map_3d from collection_tbl
+  union all select 2, map_1d, map_3d from collection_tbl
+---- RESULTS
+1,'{1:"first automobile",2:"second"}','{1:{10:{100:"hundred",200:"two hundred pages"},20:{300:"three hundred pages",400:"four hundred"}},2:{30:{500:"five hundred pages",600:"six hundred"},40:{700:"seven hundred pages",800:"eight hundred"}}}'
+1,'{1:"first dinosaur bone",2:"second",3:null}','{1:{10:{100:"hundred",200:"two hundred dinosaur bones"},20:{300:"three hundred dinosaur bones",400:"four hundred"}},2:{30:{500:"five hundred dinosaur bones",600:"six hundred"},40:{700:"seven hundred dinosaur bones",800:"eight hundred"}}}'
+1,'{645:"fourth even-toed ungulate",5:"fifth"}','{1:{10:{100:"hundred",200:"two hundred even-toed ungulates"},20:{300:"three hundred even-toed ungulates",400:"four hundred"}},2:{30:{500:"five hundred even-toed ungulates",600:"six hundred"},40:{700:"seven hundred even-toed ungulates",800:"eight hundred"}}}'
+2,'{1:"first automobile",2:"second"}','{1:{10:{100:"hundred",200:"two hundred pages"},20:{300:"three hundred pages",400:"four hundred"}},2:{30:{500:"five hundred pages",600:"six hundred"},40:{700:"seven hundred pages",800:"eight hundred"}}}'
+2,'{1:"first dinosaur bone",2:"second",3:null}','{1:{10:{100:"hundred",200:"two hundred dinosaur bones"},20:{300:"three hundred dinosaur bones",400:"four hundred"}},2:{30:{500:"five hundred dinosaur bones",600:"six hundred"},40:{700:"seven hundred dinosaur bones",800:"eight hundred"}}}'
+2,'{645:"fourth even-toed ungulate",5:"fifth"}','{1:{10:{100:"hundred",200:"two hundred even-toed ungulates"},20:{300:"three hundred even-toed ungulates",400:"four hundred"}},2:{30:{500:"five hundred even-toed ungulates",600:"six hundred"},40:{700:"seven hundred even-toed ungulates",800:"eight hundred"}}}'
+---- TYPES
+tinyint,string,string
+====
+---- QUERY
+# Constants in the select list of unions also lead to a "non-pass-through" union and
+# structs containing collections are not allowed there yet because we can't materialise
+# them, see IMPALA-12160. However, set operations on structs are not supported at all at
+# present, and the error reflects that. When both limitations are lifted, the query should
+# succeed.
+select 1, struct_contains_map, all_mix from collection_struct_mix
+  union all select 2, struct_contains_map, all_mix from collection_struct_mix
 ---- CATCH
-IllegalStateException: only pass-through UNION ALL is supported for collections of variable length types.
+AnalysisException: Set operations don't support STRUCT type. STRUCT<m:MAP<INT,STRING>> in struct_contains_map
 ====
 ---- QUERY
 select 1 from (select int_map from complextypestbl) s
diff --git a/testdata/workloads/functional-query/queries/QueryTest/partitioned-top-n-complex.test b/testdata/workloads/functional-query/queries/QueryTest/partitioned-top-n-complex.test
index 9f08bfb7c..a948941d3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/partitioned-top-n-complex.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/partitioned-top-n-complex.test
@@ -1,6 +1,6 @@
 ====
 ---- QUERY
-# In-memory partitioned top-N containing a collection, with some partitions that hit
+# In-memory partitioned top-N containing a collection, with some partitions that hit the
 # limit.
 with joined as (
   select a.*, b.int_array from alltypesagg a left join complextypestbl b on a.tinyint_col = b.id
@@ -34,17 +34,38 @@ NULL,7770,'NULL',4
 TINYINT, INT, STRING, BIGINT
 ====
 ---- QUERY
-# Sorting is not supported yet when the sorting tuple contains collections containing
-# varlen types: IMPALA-10939.
+# In-memory partitioned top-N containing a collection with a variable length type, with
+# some partitions that hit the limit.
 with joined as (
-  select a.*, b.int_array, b.int_array_array from alltypesagg a left join complextypestbl b on a.tinyint_col = b.id
+  select a.*, b.arr_int_1d, b.arr_int_2d, b.map_1d, b.map_3d
+  from alltypesagg a left join collection_tbl b on a.tinyint_col = b.id
 ),
 v as (
-  select tinyint_col, id, int_array, int_array_array, row_number() over (partition by tinyint_col order by id) as rn
+  select tinyint_col, id, arr_int_1d, arr_int_2d, map_1d, map_3d,
+    row_number() over (partition by tinyint_col order by id) as rn
   from joined where id % 777 = 0 or id % 10 = 7)
-select tinyint_col, id, int_array, int_array_array, rn from v
+select tinyint_col, id, arr_int_1d, arr_int_2d, map_1d, map_3d, rn from v
 where rn <= 5
 order by tinyint_col, rn
----- CATCH
-AnalysisException: Sorting is not supported if the select list contains (possibly nested) collections with variable length data types.
+---- RESULTS
+1,2331,'[1,2,null]','[[1,2,null],[3]]','{1:"first automobile",2:"second"}','{1:{10:{100:"hundred",200:"two hundred pages"},20:{300:"three hundred pages",400:"four hundred"}},2:{30:{500:"five hundred pages",600:"six hundred"},40:{700:"seven hundred pages",800:"eight hundred"}}}',1
+2,4662,'[1,null,3]','[[null,1,2,null],[5,14,null]]','{1:"first dinosaur bone",2:"second",3:null}','{1:{10:{100:"hundred",200:"two hundred dinosaur bones"},20:{300:"three hundred dinosaur bones",400:"four hundred"}},2:{30:{500:"five hundred dinosaur bones",600:"six hundred"},40:{700:"seven hundred dinosaur bones",800:"eight hundred"}}}',1
+3,6993,'[null,4679,null,49,null]','[[1,2,null,null,856],[365,855,369,null]]','{645:"fourth even-toed ungulate",5:"fifth"}','{1:{10:{100:"hundred",200:"two hundred even-toed ungulates"},20:{300:"three hundred even-toed ungulates",400:"four hundred"}},2:{30:{500:"five hundred even-toed ungulates",600:"six hundred"},40:{700:"seven hundred even-toed ungulates",800:"eight hundred"}}}',1
+4,1554,'NULL','NULL','NULL','NULL',1
+4,9324,'NULL','NULL','NULL','NULL',2
+5,3885,'NULL','NULL','NULL','NULL',1
+6,6216,'NULL','NULL','NULL','NULL',1
+7,7,'NULL','NULL','NULL','NULL',1
+7,17,'NULL','NULL','NULL','NULL',2
+7,27,'NULL','NULL','NULL','NULL',3
+7,37,'NULL','NULL','NULL','NULL',4
+7,47,'NULL','NULL','NULL','NULL',5
+8,3108,'NULL','NULL','NULL','NULL',1
+9,5439,'NULL','NULL','NULL','NULL',1
+NULL,0,'NULL','NULL','NULL','NULL',1
+NULL,0,'NULL','NULL','NULL','NULL',2
+NULL,7770,'NULL','NULL','NULL','NULL',3
+NULL,7770,'NULL','NULL','NULL','NULL',4
+---- TYPES
+TINYINT, INT, STRING, STRING, STRING, STRING, BIGINT
 ====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/sort-complex.test b/testdata/workloads/functional-query/queries/QueryTest/sort-complex.test
index 634d6b85e..ee1d25b2a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/sort-complex.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/sort-complex.test
@@ -1,149 +1,113 @@
 ====
 ---- QUERY
-# Sort a collection.
-select id, int_array from complextypestbl order by id
+# Sort collections.
+select id, arr_int_1d, arr_string_3d, map_1d, map_3d from collection_tbl order by id desc
 ---- RESULTS
-1,'[1,2,3]'
-2,'[null,1,2,null,3,null]'
-3,'[]'
-4,'NULL'
-5,'NULL'
-6,'NULL'
-7,'NULL'
-8,'[-1]'
+3,'[null,4679,null,49,null]','[[["1","-1","second even-toed ungulate",null],["three even-toed ungulates"]],[["four even-toed ungulate"]]]','{645:"fourth even-toed ungulate",5:"fifth"}','{1:{10:{100:"hundred",200:"two hundred even-toed ungulates"},20:{300:"three hundred even-toed ungulates",400:"four hundred"}},2:{30:{500:"five hundred even-toed ungulates",600:"six hundred"},40:{700:"seven hundred even-toed ungulates",800:"eight hundred"}}}'
+2,'[1,null,3]','[[["second dinosaur bone",null,null],["three dinosaur bones"]],[["one",null,"four dinosaur bones"]]]','{1:"first dinosaur bone",2:"second",3:null}','{1:{10:{100:"hundred",200:"two hundred dinosaur bones"},20:{300:"three hundred dinosaur bones",400:"four hundred"}},2:{30:{500:"five hundred dinosaur bones",600:"six hundred"},40:{700:"seven hundred dinosaur bones",800:"eight hundred"}}}'
+1,'[1,2,null]','[[["1","second harmonic",null],["three cities"]],[["four castles"]]]','{1:"first automobile",2:"second"}','{1:{10:{100:"hundred",200:"two hundred pages"},20:{300:"three hundred pages",400:"four hundred"}},2:{30:{500:"five hundred pages",600:"six hundred"},40:{700:"seven hundred pages",800:"eight hundred"}}}'
 ---- TYPES
-bigint,string
+int,string,string,string,string
 ====
 ---- QUERY
-# Sort collection from HMS view.
-select id, int_array from complextypes_arrays_only_view order by id
+# Sort collections from HMS view.
+select id, int_array, int_array_array from complextypes_arrays_only_view order by id desc
 ---- RESULTS
-1,'[1,2,3]'
-2,'[null,1,2,null,3,null]'
-3,'[]'
-4,'NULL'
-5,'NULL'
-6,'NULL'
-7,'NULL'
-8,'[-1]'
+8,'[-1]','[[-1,-2],[]]'
+7,'NULL','[null,[5,6]]'
+6,'NULL','NULL'
+5,'NULL','NULL'
+4,'NULL','[]'
+3,'[]','[null]'
+2,'[null,1,2,null,3,null]','[[null,1,2,null],[3,null,4],[],null]'
+1,'[1,2,3]','[[1,2],[3,4]]'
 ---- TYPES
-bigint,string
+bigint,string,string
 ====
 ---- QUERY
 # Sort collection from WITH-clause inline view.
 with v as (
-  select id, int_array from complextypestbl
+  select id, arr_int_1d, arr_string_3d, map_1d, map_3d from collection_tbl
 )
-select id, int_array from v order by id
+select id, arr_int_1d, arr_string_3d, map_1d, map_3d from collection_tbl order by id desc
 ---- RESULTS
-1,'[1,2,3]'
-2,'[null,1,2,null,3,null]'
-3,'[]'
-4,'NULL'
-5,'NULL'
-6,'NULL'
-7,'NULL'
-8,'[-1]'
+3,'[null,4679,null,49,null]','[[["1","-1","second even-toed ungulate",null],["three even-toed ungulates"]],[["four even-toed ungulate"]]]','{645:"fourth even-toed ungulate",5:"fifth"}','{1:{10:{100:"hundred",200:"two hundred even-toed ungulates"},20:{300:"three hundred even-toed ungulates",400:"four hundred"}},2:{30:{500:"five hundred even-toed ungulates",600:"six hundred"},40:{700:"seven hundred even-toed ungulates",800:"eight hundred"}}}'
+2,'[1,null,3]','[[["second dinosaur bone",null,null],["three dinosaur bones"]],[["one",null,"four dinosaur bones"]]]','{1:"first dinosaur bone",2:"second",3:null}','{1:{10:{100:"hundred",200:"two hundred dinosaur bones"},20:{300:"three hundred dinosaur bones",400:"four hundred"}},2:{30:{500:"five hundred dinosaur bones",600:"six hundred"},40:{700:"seven hundred dinosaur bones",800:"eight hundred"}}}'
+1,'[1,2,null]','[[["1","second harmonic",null],["three cities"]],[["four castles"]]]','{1:"first automobile",2:"second"}','{1:{10:{100:"hundred",200:"two hundred pages"},20:{300:"three hundred pages",400:"four hundred"}},2:{30:{500:"five hundred pages",600:"six hundred"},40:{700:"seven hundred pages",800:"eight hundred"}}}'
 ---- TYPES
-bigint,string
+int,string,string,string,string
 ====
 ---- QUERY
 # Sort collection from nested query inline view.
-select id, int_array
-from (select id, int_array from complextypestbl) v
-order by id
+select id, arr_int_1d, arr_string_3d, map_1d, map_3d
+from (select id, arr_int_1d, arr_string_3d, map_1d, map_3d from collection_tbl) v
+order by id desc
 ---- RESULTS
-1,'[1,2,3]'
-2,'[null,1,2,null,3,null]'
-3,'[]'
-4,'NULL'
-5,'NULL'
-6,'NULL'
-7,'NULL'
-8,'[-1]'
+3,'[null,4679,null,49,null]','[[["1","-1","second even-toed ungulate",null],["three even-toed ungulates"]],[["four even-toed ungulate"]]]','{645:"fourth even-toed ungulate",5:"fifth"}','{1:{10:{100:"hundred",200:"two hundred even-toed ungulates"},20:{300:"three hundred even-toed ungulates",400:"four hundred"}},2:{30:{500:"five hundred even-toed ungulates",600:"six hundred"},40:{700:"seven hundred even-toed ungulates",800:"eight hundred"}}}'
+2,'[1,null,3]','[[["second dinosaur bone",null,null],["three dinosaur bones"]],[["one",null,"four dinosaur bones"]]]','{1:"first dinosaur bone",2:"second",3:null}','{1:{10:{100:"hundred",200:"two hundred dinosaur bones"},20:{300:"three hundred dinosaur bones",400:"four hundred"}},2:{30:{500:"five hundred dinosaur bones",600:"six hundred"},40:{700:"seven hundred dinosaur bones",800:"eight hundred"}}}'
+1,'[1,2,null]','[[["1","second harmonic",null],["three cities"]],[["four castles"]]]','{1:"first automobile",2:"second"}','{1:{10:{100:"hundred",200:"two hundred pages"},20:{300:"three hundred pages",400:"four hundred"}},2:{30:{500:"five hundred pages",600:"six hundred"},40:{700:"seven hundred pages",800:"eight hundred"}}}'
 ---- TYPES
-bigint,string
+int,string,string,string,string
 ====
 ---- QUERY
 # Sort a collection that is join-unnested.
 select id, a.item
-from complextypestbl t, t.int_array_array a
-order by id;
+from collection_tbl t, t.arr_string_3d a
+where id != 2
+order by id desc;
 ---- RESULTS
-1,'[1,2]'
-1,'[3,4]'
-2,'[null,1,2,null]'
-2,'[3,null,4]'
-2,'[]'
-2,'NULL'
-3,'NULL'
-7,'NULL'
-7,'[5,6]'
-8,'[-1,-2]'
-8,'[]'
+3,'[["1","-1","second even-toed ungulate",null],["three even-toed ungulates"]]'
+3,'[["four even-toed ungulate"]]'
+1,'[["1","second harmonic",null],["three cities"]]'
+1,'[["four castles"]]'
 ---- TYPES
-bigint,string
+int,string
 ====
 ---- QUERY
 # Sort a collection that is join-unnested in a WITH-clause inline view.
 with v as (
   select id, a.item arr
-  from complextypestbl t, t.int_array_array a
+  from collection_tbl t, t.arr_string_3d a
+  where id != 2
 )
-select id, arr from v order by id;
+select id, arr from v order by id desc;
 ---- RESULTS
-1,'[1,2]'
-1,'[3,4]'
-2,'[null,1,2,null]'
-2,'[3,null,4]'
-2,'[]'
-2,'NULL'
-3,'NULL'
-7,'NULL'
-7,'[5,6]'
-8,'[-1,-2]'
-8,'[]'
+3,'[["1","-1","second even-toed ungulate",null],["three even-toed ungulates"]]'
+3,'[["four even-toed ungulate"]]'
+1,'[["1","second harmonic",null],["three cities"]]'
+1,'[["four castles"]]'
 ---- TYPES
-bigint,string
+int,string
 ====
 ---- QUERY
 # Sort a collection that is join-unnested in a nested query inline view.
 select id, arr
 from (
     select id, a.item arr
-    from complextypestbl t, t.int_array_array a
+    from collection_tbl t, t.arr_string_3d a
   ) v
-order by id;
+where id != 2
+order by id desc;
 ---- RESULTS
-1,'[1,2]'
-1,'[3,4]'
-2,'[null,1,2,null]'
-2,'[3,null,4]'
-2,'[]'
-2,'NULL'
-3,'NULL'
-7,'NULL'
-7,'[5,6]'
-8,'[-1,-2]'
-8,'[]'
+3,'[["1","-1","second even-toed ungulate",null],["three even-toed ungulates"]]'
+3,'[["four even-toed ungulate"]]'
+1,'[["1","second harmonic",null],["three cities"]]'
+1,'[["four castles"]]'
 ---- TYPES
-bigint,string
+int,string
 ====
 ---- QUERY
-# Sorting is not supported yet for arrays containing varlen types: IMPALA-10939
-select id, arr_string_1d from collection_tbl order by id
----- CATCH
-AnalysisException: Sorting is not supported if the select list contains (possibly nested) collections with variable length data types.
-====
----- QUERY
-# Sorting is not supported yet for arrays containing varlen types: IMPALA-10939
-select id, int_array_array from complextypestbl order by id
----- CATCH
-AnalysisException: Sorting is not supported if the select list contains (possibly nested) collections with variable length data types.
+# Sort a collection of strings.
+select id, arr_string_1d from collection_tbl order by id desc
+---- RESULTS
+3,'["1",null,"three even-toed ungulates"]'
+2,'["one dinosaur bone",null,"2",null]'
+1,'["1","two wooden boxes",null]'
+---- TYPES
+int,string
 ====
 ---- QUERY
-# Being in the sorting tuple is supported for maps containing only fixed length types:
-# IMPALA-10939
+# Sort maps containing only fixed length types.
 select id, map_int_int, map_char3_char5 from map_non_varlen order by id desc
 ---- RESULTS
 10,'{100:1000,101:1010,102:1020}','{"aaj":"aaaaj"}'
@@ -160,46 +124,65 @@ select id, map_int_int, map_char3_char5 from map_non_varlen order by id desc
 int,string,string
 ====
 ---- QUERY
-# Sorting is not supported yet for collections containing varlen types: IMPALA-10939
-select id, int_map_array, int_map from complextypestbl order by id
----- CATCH
-AnalysisException: Sorting is not supported if the select list contains (possibly nested) collections with variable length data types.
+# Sort collections selected from within a struct.
+select id, struct_contains_arr.arr, struct_contains_nested_arr.arr from collection_struct_mix order by id desc;
+---- RESULTS
+2,'NULL','[["2022-12-10","2022-12-11",null,"2022-12-12"],null]'
+1,'[1,2,3,4,null,null,5]','[["2022-12-05","2022-12-06",null,"2022-12-07"],["2022-12-08","2022-12-09",null]]'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Sort collections containing structs, also containing var-len data.
+select id, arr_contains_struct, arr_contains_nested_struct from collection_struct_mix order by id desc;
+---- RESULTS
+2,'[{"i":100},{"i":8},{"i":35},{"i":45},null,{"i":193},{"i":null}]','[{"inner_struct":null,"small":104},{"inner_struct":{"str":"a few soju distilleries","l":28},"small":105},null]'
+1,'[{"i":1},{"i":2},{"i":3},{"i":4},null,{"i":5},{"i":null}]','[{"inner_struct":{"str":"","l":0},"small":2},null,{"inner_struct":{"str":"some spaceship captain","l":5},"small":20}]'
+---- TYPES
+INT,STRING,STRING
 ====
 ---- QUERY
-# Sorting is not supported yet for collections within structs: IMPALA-10939. Test with a
+# Sorting is not supported yet for collections within structs: IMPALA-12160. Test with a
 # struct that contains an array.
 select id, struct_contains_arr from collection_struct_mix order by id
 ---- CATCH
 AnalysisException: Sorting is not supported if the select list contains collection(s) nested in struct(s).
 ====
 ---- QUERY
-# Sorting is not supported yet for collections within structs: IMPALA-10939. Test with a
+# Sorting is not supported yet for collections within structs: IMPALA-12160. Test with a
 # struct that contains a map.
 select id, struct_contains_map from collection_struct_mix order by id;
 ---- CATCH
 AnalysisException: Sorting is not supported if the select list contains collection(s) nested in struct(s).
 ====
 ---- QUERY
-# Sort collection selected from within a struct.
-select id, struct_contains_arr.arr from collection_struct_mix order by id desc;
----- RESULTS
-2,'NULL'
-1,'[1,2,3,4,null,null,5]'
----- TYPES
-INT,STRING
+# Sorting is not supported yet for collections within structs: IMPALA-12160. Test with an
+# array that contains a struct that contains an array.
+select id, maps.key, maps.value.big.arr from collection_struct_mix t, t.all_mix maps order by id
+---- CATCH
+AnalysisException: Sorting is not supported if the select list contains collection(s) nested in struct(s).
 ====
 ---- QUERY
-# Sort collection containing a struct.
-select id, arr_contains_struct from collection_struct_mix order by id desc;
----- RESULTS
-2,'[{"i":100},{"i":8},{"i":35},{"i":45},null,{"i":193},{"i":null}]'
-1,'[{"i":1},{"i":2},{"i":3},{"i":4},null,{"i":5},{"i":null}]'
----- TYPES
-INT,STRING
+# Sorting by collections is not supported. Test with an array of fixed length type.
+select id, arr_int_1d from collection_tbl order by arr_int_1d
+---- CATCH
+AnalysisException: ORDER BY expression 'arr_int_1d' with complex type 'ARRAY<INT>' is not supported.
+====
+---- QUERY
+# Sorting by collections is not supported. Test with an array of variable length type.
+select id, arr_string_1d from collection_tbl order by arr_string_1d
+---- CATCH
+AnalysisException: ORDER BY expression 'arr_string_1d' with complex type 'ARRAY<STRING>' is not supported.
+====
+---- QUERY
+# Sorting by collections is not supported. Test with a map of fixed length type.
+select id, map_int_int from map_non_varlen order by map_int_int
+---- CATCH
+AnalysisException: ORDER BY expression 'map_int_int' with complex type 'MAP<INT,INT>' is not supported.
 ====
 ---- QUERY
-# Sorting a collection containing a struct that contains var-len data is not supported.
-select id, arr_contains_nested_struct from collection_struct_mix order by id desc;
+# Sorting by collections is not supported. Test with a map of variable length type.
+select id, map_1d from collection_tbl order by map_1d
 ---- CATCH
-AnalysisException: Sorting is not supported if the select list contains (possibly nested) collections with variable length data types.
+AnalysisException: ORDER BY expression 'map_1d' with complex type 'MAP<INT,STRING>' is not supported.
 ====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/top-n-complex.test b/testdata/workloads/functional-query/queries/QueryTest/top-n-complex.test
index 556e37859..f55b5ae7a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/top-n-complex.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/top-n-complex.test
@@ -1,66 +1,161 @@
 ====
 ---- QUERY
-# Sort a collection with limit.
-select id, int_array from complextypestbl order by id limit 5
+# Sort collections with limit.
+select id, arr_int_1d, arr_string_3d, map_1d, map_3d from collection_tbl order by id desc limit 2
 ---- RESULTS
-1,'[1,2,3]'
-2,'[null,1,2,null,3,null]'
-3,'[]'
-4,'NULL'
-5,'NULL'
+3,'[null,4679,null,49,null]','[[["1","-1","second even-toed ungulate",null],["three even-toed ungulates"]],[["four even-toed ungulate"]]]','{645:"fourth even-toed ungulate",5:"fifth"}','{1:{10:{100:"hundred",200:"two hundred even-toed ungulates"},20:{300:"three hundred even-toed ungulates",400:"four hundred"}},2:{30:{500:"five hundred even-toed ungulates",600:"six hundred"},40:{700:"seven hundred even-toed ungulates",800:"eight hundred"}}}'
+2,'[1,null,3]','[[["second dinosaur bone",null,null],["three dinosaur bones"]],[["one",null,"four dinosaur bones"]]]','{1:"first dinosaur bone",2:"second",3:null}','{1:{10:{100:"hundred",200:"two hundred dinosaur bones"},20:{300:"three hundred dinosaur bones",400:"four hundred"}},2:{30:{500:"five hundred dinosaur bones",600:"six hundred"},40:{700:"seven hundred dinosaur bones",800:"eight hundred"}}}'
 ---- TYPES
-bigint,string
+int,string,string,string,string
 ====
 ---- QUERY
-# Sort collection from nested query inline view with limit.
-select id, int_array
-from (select id, int_array from complextypestbl) v
-order by id limit 5
+# Sort collections from HMS view.
+select id, int_array, int_array_array from complextypes_arrays_only_view order by id desc limit 4
 ---- RESULTS
-1,'[1,2,3]'
-2,'[null,1,2,null,3,null]'
-3,'[]'
-4,'NULL'
-5,'NULL'
+8,'[-1]','[[-1,-2],[]]'
+7,'NULL','[null,[5,6]]'
+6,'NULL','NULL'
+5,'NULL','NULL'
 ---- TYPES
-bigint,string
+bigint,string,string
+====
+---- QUERY
+# Sort collection from WITH-clause inline view.
+with v as (
+  select id, arr_int_1d, arr_string_3d, map_1d, map_3d from collection_tbl
+)
+select id, arr_int_1d, arr_string_3d, map_1d, map_3d from collection_tbl order by id desc limit 2;
+---- RESULTS
+3,'[null,4679,null,49,null]','[[["1","-1","second even-toed ungulate",null],["three even-toed ungulates"]],[["four even-toed ungulate"]]]','{645:"fourth even-toed ungulate",5:"fifth"}','{1:{10:{100:"hundred",200:"two hundred even-toed ungulates"},20:{300:"three hundred even-toed ungulates",400:"four hundred"}},2:{30:{500:"five hundred even-toed ungulates",600:"six hundred"},40:{700:"seven hundred even-toed ungulates",800:"eight hundred"}}}'
+2,'[1,null,3]','[[["second dinosaur bone",null,null],["three dinosaur bones"]],[["one",null,"four dinosaur bones"]]]','{1:"first dinosaur bone",2:"second",3:null}','{1:{10:{100:"hundred",200:"two hundred dinosaur bones"},20:{300:"three hundred dinosaur bones",400:"four hundred"}},2:{30:{500:"five hundred dinosaur bones",600:"six hundred"},40:{700:"seven hundred dinosaur bones",800:"eight hundred"}}}'
+---- TYPES
+int,string,string,string,string
+====
+---- QUERY
+# Sort collections from nested query inline view with limit.
+select id, arr_int_1d, arr_string_3d, map_1d, map_3d
+from (select id, arr_int_1d, arr_string_3d, map_1d, map_3d from collection_tbl) v
+order by id desc limit 2
+---- RESULTS
+3,'[null,4679,null,49,null]','[[["1","-1","second even-toed ungulate",null],["three even-toed ungulates"]],[["four even-toed ungulate"]]]','{645:"fourth even-toed ungulate",5:"fifth"}','{1:{10:{100:"hundred",200:"two hundred even-toed ungulates"},20:{300:"three hundred even-toed ungulates",400:"four hundred"}},2:{30:{500:"five hundred even-toed ungulates",600:"six hundred"},40:{700:"seven hundred even-toed ungulates",800:"eight hundred"}}}'
+2,'[1,null,3]','[[["second dinosaur bone",null,null],["three dinosaur bones"]],[["one",null,"four dinosaur bones"]]]','{1:"first dinosaur bone",2:"second",3:null}','{1:{10:{100:"hundred",200:"two hundred dinosaur bones"},20:{300:"three hundred dinosaur bones",400:"four hundred"}},2:{30:{500:"five hundred dinosaur bones",600:"six hundred"},40:{700:"seven hundred dinosaur bones",800:"eight hundred"}}}'
+---- TYPES
+int,string,string,string,string
+====
+---- QUERY
+# Sort a collection that is join-unnested.
+select id, a.item
+from collection_tbl t, t.arr_string_3d a
+where id != 2
+order by id desc limit 3
+# Adding VERIFY_IS_EQUAL_SORTED because collections cannot be sorted and the order of the
+# rows with the same 'id' is different for different files.
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+3,'[["1","-1","second even-toed ungulate",null],["three even-toed ungulates"]]'
+3,'[["four even-toed ungulate"]]'
+1,'[["1","second harmonic",null],["three cities"]]'
+---- TYPES
+int,string
+====
+---- QUERY
+# Sort a collection that is join-unnested in a WITH-clause inline view.
+with v as (
+  select id, a.item m
+  from complextypestbl t, t.int_map_array a
+  where id != 8 and id != 2
+)
+select id, m from v order by id desc limit 3;
+
+with v as (
+  select id, a.item arr
+  from collection_tbl t, t.arr_string_3d a
+  where id != 2
+)
+select id, arr from v order by id desc limit 3
+# Adding VERIFY_IS_EQUAL_SORTED because collections cannot be sorted and the order of the
+# rows with the same 'id' is different for different files.
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+3,'[["1","-1","second even-toed ungulate",null],["three even-toed ungulates"]]'
+3,'[["four even-toed ungulate"]]'
+1,'[["1","second harmonic",null],["three cities"]]'
+---- TYPES
+int,string
 ====
 ---- QUERY
 # Sort a collection that is join-unnested in a nested query inline view with limit.
 select id, arr
 from (
     select id, a.item arr
-    from complextypestbl t, t.int_array_array a
+    from collection_tbl t, t.arr_string_3d a
   ) v
-order by id limit 2;
----- RESULTS
-1,'[1,2]'
-1,'[3,4]'
+where id != 2
+order by id desc limit 3
+# Adding VERIFY_IS_EQUAL_SORTED because collections cannot be sorted and the order of the
+# rows with the same 'id' is different for different files.
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+3,'[["1","-1","second even-toed ungulate",null],["three even-toed ungulates"]]'
+3,'[["four even-toed ungulate"]]'
+1,'[["1","second harmonic",null],["three cities"]]'
 ---- TYPES
-bigint,string
+int,string
 ====
 ---- QUERY
-# Sorting with limit is not supported yet for arrays containing varlen types: IMPALA-10939
-select id, arr_string_1d from collection_tbl order by id limit 2;
----- CATCH
-AnalysisException: Sorting is not supported if the select list contains (possibly nested) collections with variable length data types.
-====
----- QUERY
-# Sorting with limit is not supported yet for arrays containing varlen types: IMPALA-10939
-select id, int_array_array from complextypestbl order by id limit 2;
----- CATCH
-AnalysisException: Sorting is not supported if the select list contains (possibly nested) collections with variable length data types.
+# Sorting a collection of strings.
+select id, arr_string_1d from collection_tbl order by id desc limit 2
+---- RESULTS
+3,'["1",null,"three even-toed ungulates"]'
+2,'["one dinosaur bone",null,"2",null]'
+---- TYPES
+int,string
 ====
 ---- QUERY
-# Being in the sorting tuple is supported for maps containing only fixed length types,
-# also with limit:
-# IMPALA-10939
-select id, map_int_int, map_char3_char5 from map_non_varlen order by id desc limit 4
+# Sort maps containing only fixed length types.
+select id, map_int_int, map_char3_char5 from map_non_varlen order by id desc limit 7
 ---- RESULTS
 10,'{100:1000,101:1010,102:1020}','{"aaj":"aaaaj"}'
 9,'{90:900,91:910,92:920}','{"aai":"aaaai"}'
 8,'{80:800,81:810,82:820}','{"aah":"aaaah"}'
 7,'{70:700,71:710,72:720}','{"aag":"aaaag"}'
+6,'{60:600,61:610,62:620}','{"aaf":"aaaaf"}'
+5,'{50:500,51:510,52:520}','{"aae":"aaaae"}'
+4,'{40:400,41:410,42:420}','{"aad":"aaaad"}'
 ---- TYPES
 int,string,string
 ====
+---- QUERY
+# Sort collections selected from within a struct.
+select id, struct_contains_arr.arr, struct_contains_nested_arr.arr from collection_struct_mix order by id desc limit 1;
+---- RESULTS
+2,'NULL','[["2022-12-10","2022-12-11",null,"2022-12-12"],null]'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Sort collections containing structs, also containing var-len data.
+select id, arr_contains_struct, arr_contains_nested_struct from collection_struct_mix order by id desc limit 1;
+---- RESULTS
+2,'[{"i":100},{"i":8},{"i":35},{"i":45},null,{"i":193},{"i":null}]','[{"inner_struct":null,"small":104},{"inner_struct":{"str":"a few soju distilleries","l":28},"small":105},null]'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Sorting is not supported yet for collections within structs: IMPALA-12160. Test with a
+# struct that contains an array.
+select id, struct_contains_arr from collection_struct_mix order by id limit 5
+---- CATCH
+AnalysisException: Sorting is not supported if the select list contains collection(s) nested in struct(s).
+====
+---- QUERY
+# Sorting is not supported yet for collections within structs: IMPALA-12160. Test with a
+# struct that contains a map.
+select id, struct_contains_map from collection_struct_mix order by id limit 5
+---- CATCH
+AnalysisException: Sorting is not supported if the select list contains collection(s) nested in struct(s).
+====
+---- QUERY
+# Sorting is not supported yet for collections within structs: IMPALA-12160. Test with an
+# array that contains a struct that contains an array.
+select id, maps.key, maps.value.big.arr from collection_struct_mix t, t.all_mix maps order by id limit 7
+---- CATCH
+AnalysisException: Sorting is not supported if the select list contains collection(s) nested in struct(s).
+====
diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py
index 0ae78b304..e67dc43dc 100644
--- a/tests/query_test/test_queries.py
+++ b/tests/query_test/test_queries.py
@@ -148,7 +148,6 @@ class TestQueries(ImpalaTestSuite):
 
     if file_format in ['parquet', 'orc']:
       self.run_test_case('QueryTest/sort-complex', vector)
-      self.run_test_case('QueryTest/top-n-complex', vector)
 
   def test_partitioned_top_n(self, vector):
     """Test partitioned Top-N operator."""
diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py
index 82e43a78f..48a01cf25 100644
--- a/tests/query_test/test_sort.py
+++ b/tests/query_test/test_sort.py
@@ -359,19 +359,31 @@ class TestArraySort(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('table_format').file_format == 'parquet')
 
-  def test_simple_arrays(self, vector):
-    """Test arrays that do not contain var-len data."""
-    query = """select string_col, int_array, double_array
-         from functional_parquet.simple_arrays_big order by string_col;"""
+  def test_array_sort(self, vector):
+    self._run_test_sort_query(vector, None)
+
+  def test_array_sort_with_limit(self, vector):
+    self._run_test_sort_query(vector, 3000)
+
+  def _run_test_sort_query(self, vector, limit):
+    """Test sorting with spilling where an array that contains var-len data is in the
+    sorting tuple. If 'limit' is set to an integer, applies that limit."""
+    query = """select string_col, int_array, double_map, string_array, mixed
+        from functional_parquet.arrays_big order by string_col"""
+
+    if limit:
+      assert isinstance(limit, int)
+      limit_clause = " limit {}".format(limit)
+      query = query + limit_clause
 
     exec_option = copy(vector.get_value('exec_option'))
     exec_option['disable_outermost_topn'] = 1
     exec_option['num_nodes'] = 1
-    exec_option['buffer_pool_limit'] = '28m'
+    exec_option['buffer_pool_limit'] = '44m'
     table_format = vector.get_value('table_format')
 
     query_result = self.execute_query(query, exec_option, table_format=table_format)
-    assert "SpilledRuns: 2" in query_result.runtime_profile
+    assert "SpilledRuns: 3" in query_result.runtime_profile
 
     # Split result rows (strings) into columns.
     result = split_result_rows(query_result.data)


(impala) 02/02: IMPALA-12385: Enable Periodic metrics by default

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d75807a195273cdba29e33f00b7b6f9bee012f62
Author: Kurt Deschler <kd...@cloudera.com>
AuthorDate: Fri Aug 18 07:44:44 2023 -0500

    IMPALA-12385: Enable Periodic metrics by default
    
    This patch enables periodic metrics in query profiles by default and
    changes the metric collectors to be more suitable for mixed workloads.
    
    -Changed default of resource_trace_ratio to 1.
    -Changed profile metrics to use sampling counters which can automatically
     resize for long queries.
    -Reduced profile metric samping interval to 50ms to support short-running
     queries.
    -Changed fragment metrics to use the same sampling interval as periodic
     metrics.
    -Added singleton PeriodicCounterUpdater and thread for updating system
     (KRPC) metrics at a different period than fragment metrics.
    -Added new flag periodic_system_counter_update_period_ms for configuring
     system metric update period. Default is 500ms.
    
    Example profile output:
    - HostCpuIoWaitPercentage (400.000ms): 1, 0, 2, 3, 4, 6, 5, 2, 1, ...
    - HostCpuSysPercentage (400.000ms): 1, 12, 10, 11, 11, 5, 3, 3, 3, ...
    - HostCpuUserPercentage (400.000ms): 8, 46, 39, 39, 39, 29, 22, 23, ...
    
    Testing:
    -Updated runtime-profile-test and test_observability.py for new defaults
    -Manual inspection of query profile metrics for long-running queries
    -Tested for performance regression using 50 concurrent TPCH Q1 and with
     periodic_counter_update_period_ms=1 to verify that the periodic update
     loop does not affect peformance or become a bottleneck
    
    Change-Id: Ic8e5cbfd4b324081158574ceb8f4b3a062a69fd1
    Reviewed-on: http://gerrit.cloudera.org:8080/20377
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Michael Smith <mi...@cloudera.com>
---
 be/src/runtime/exec-env.cc               |   2 +-
 be/src/runtime/krpc-data-stream-recvr.cc |   6 +-
 be/src/runtime/query-state.cc            |  14 ++---
 be/src/util/periodic-counter-updater.cc  | 103 ++++++++++++++++++++-----------
 be/src/util/periodic-counter-updater.h   |  22 +++++--
 be/src/util/runtime-profile-counters.h   |  12 +++-
 be/src/util/runtime-profile-test.cc      |   3 +-
 be/src/util/runtime-profile.cc           |  16 +++--
 be/src/util/runtime-profile.h            |  11 +++-
 be/src/util/streaming-sampler.h          |   2 +-
 common/thrift/Query.thrift               |   2 +-
 tests/query_test/test_observability.py   |  37 +++++------
 12 files changed, 148 insertions(+), 82 deletions(-)

diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 8f2768e49..c31bd597f 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -637,7 +637,7 @@ void ExecEnv::InitSystemStateInfo() {
   system_state_info_.reset(new SystemStateInfo());
   PeriodicCounterUpdater::RegisterUpdateFunction([s = system_state_info_.get()]() {
     s->CaptureSystemStateSnapshot();
-  });
+  }, true);
 }
 
 Status ExecEnv::GetKuduClient(const vector<string>& master_addresses,
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index a3b804e7f..394fe64cb 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -707,7 +707,7 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
   // Initialize various counters for measuring dequeuing from queues.
   bytes_dequeued_counter_ =
       ADD_COUNTER(dequeue_profile_, "TotalBytesDequeued", TUnit::BYTES);
-  bytes_dequeued_time_series_counter_ = ADD_TIME_SERIES_COUNTER(
+  bytes_dequeued_time_series_counter_ = ADD_SYSTEM_TIME_SERIES_COUNTER(
       dequeue_profile_, "BytesDequeued", bytes_dequeued_counter_);
   queue_get_batch_timer_ = ADD_TIMER(dequeue_profile_, "TotalGetBatchTime");
   data_wait_timer_ =
@@ -719,7 +719,7 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
   // Initialize various counters for measuring enqueuing into queues.
   bytes_received_counter_ =
       ADD_COUNTER(enqueue_profile_, "TotalBytesReceived", TUnit::BYTES);
-  bytes_received_time_series_counter_ = ADD_TIME_SERIES_COUNTER(
+  bytes_received_time_series_counter_ = ADD_SYSTEM_TIME_SERIES_COUNTER(
       enqueue_profile_, "BytesReceived", bytes_received_counter_);
   deserialize_row_batch_timer_ =
       ADD_TIMER(enqueue_profile_, "DeserializeRowBatchTime");
@@ -735,7 +735,7 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
       ADD_COUNTER(enqueue_profile_, "TotalRPCsDeferred", TUnit::UNIT);
   deferred_rpcs_time_series_counter_ =
       enqueue_profile_->AddSamplingTimeSeriesCounter("DeferredQueueSize", TUnit::UNIT,
-      bind<int64_t>(mem_fn(&KrpcDataStreamRecvr::num_deferred_rpcs), this));
+      bind<int64_t>(mem_fn(&KrpcDataStreamRecvr::num_deferred_rpcs), this), true);
   total_has_deferred_rpcs_timer_ =
       ADD_TIMER(enqueue_profile_, "TotalHasDeferredRPCsTime");
   dispatch_timer_ =
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 31f833b8d..5738448e8 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -218,33 +218,33 @@ Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
   // Initialize resource tracking counters.
   if (query_ctx().trace_resource_usage) {
     SystemStateInfo* system_state_info = exec_env->system_state_info();
-    host_profile_->AddChunkedTimeSeriesCounter(
+    host_profile_->AddSamplingTimeSeriesCounter(
         "HostCpuUserPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
         return system_state_info->GetCpuUsageRatios().user;
         });
-    host_profile_->AddChunkedTimeSeriesCounter(
+    host_profile_->AddSamplingTimeSeriesCounter(
         "HostCpuSysPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
         return system_state_info->GetCpuUsageRatios().system;
         });
-    host_profile_->AddChunkedTimeSeriesCounter(
+    host_profile_->AddSamplingTimeSeriesCounter(
         "HostCpuIoWaitPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
         return system_state_info->GetCpuUsageRatios().iowait;
         });
     // Add network usage
-    host_profile_->AddChunkedTimeSeriesCounter(
+    host_profile_->AddSamplingTimeSeriesCounter(
         "HostNetworkRx", TUnit::BYTES_PER_SECOND, [system_state_info] () {
         return system_state_info->GetNetworkUsage().rx_rate;
         });
-    host_profile_->AddChunkedTimeSeriesCounter(
+    host_profile_->AddSamplingTimeSeriesCounter(
         "HostNetworkTx", TUnit::BYTES_PER_SECOND, [system_state_info] () {
         return system_state_info->GetNetworkUsage().tx_rate;
         });
     // Add disk stats
-    host_profile_->AddChunkedTimeSeriesCounter(
+    host_profile_->AddSamplingTimeSeriesCounter(
         "HostDiskReadThroughput", TUnit::BYTES_PER_SECOND, [system_state_info] () {
         return system_state_info->GetDiskStats().read_rate;
         });
-    host_profile_->AddChunkedTimeSeriesCounter(
+    host_profile_->AddSamplingTimeSeriesCounter(
         "HostDiskWriteThroughput", TUnit::BYTES_PER_SECOND, [system_state_info] () {
         return system_state_info->GetDiskStats().write_rate;
         });
diff --git a/be/src/util/periodic-counter-updater.cc b/be/src/util/periodic-counter-updater.cc
index 8bef6aa9a..edb8a4d14 100644
--- a/be/src/util/periodic-counter-updater.cc
+++ b/be/src/util/periodic-counter-updater.cc
@@ -26,25 +26,43 @@ namespace posix_time = boost::posix_time;
 using boost::get_system_time;
 using boost::system_time;
 
-// Period to update rate counters and sampling counters in ms.
-DEFINE_int32(periodic_counter_update_period_ms, 500, "Period to update rate counters and"
-    " sampling counters in ms");
+// Period to update query profile rate counters and sampling counters in ms.
+DEFINE_int32(periodic_counter_update_period_ms, 50, "Period to update"
+    " query profile rate counters and sampling counters in ms");
+
+// Period to update system-level rate counters and sampling counters in ms.
+DEFINE_int32(periodic_system_counter_update_period_ms, 500, "Period to update"
+    " system-level rate counters and sampling counters in ms");
 
 namespace impala {
 
+// Updater for profile counters
 PeriodicCounterUpdater* PeriodicCounterUpdater::instance_ = nullptr;
 
+// Updater for system counters
+PeriodicCounterUpdater* PeriodicCounterUpdater::system_instance_ = nullptr;
+
 void PeriodicCounterUpdater::Init() {
-  DCHECK(instance_ == nullptr);
-  // Create the singleton, which will live until the process terminates.
-  instance_ = new PeriodicCounterUpdater;
+  DCHECK(instance_ == nullptr && system_instance_ == nullptr);
+  // Create two singletons, which will live until the process terminates.
+  instance_ = new PeriodicCounterUpdater(FLAGS_periodic_counter_update_period_ms);
+
   instance_->update_thread_.reset(
-      new thread(&PeriodicCounterUpdater::UpdateLoop, instance_));
+      new thread(boost::bind(&PeriodicCounterUpdater::UpdateLoop, instance_, instance_)));
+
+  system_instance_ =
+      new PeriodicCounterUpdater(FLAGS_periodic_system_counter_update_period_ms);
+
+  system_instance_->update_thread_.reset(
+      new thread(boost::bind(&PeriodicCounterUpdater::UpdateLoop, system_instance_,
+          system_instance_)));
+
 }
 
-void PeriodicCounterUpdater::RegisterUpdateFunction(UpdateFn update_fn) {
-  lock_guard<SpinLock> l(instance_->update_fns_lock_);
-  instance_->update_fns_.push_back(update_fn);
+void PeriodicCounterUpdater::RegisterUpdateFunction(UpdateFn update_fn, bool is_system) {
+  PeriodicCounterUpdater* instance = is_system ? system_instance_ : instance_;
+  lock_guard<SpinLock> l(instance->update_fns_lock_);
+  instance->update_fns_.push_back(update_fn);
 }
 
 void PeriodicCounterUpdater::RegisterPeriodicCounter(
@@ -52,6 +70,9 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
     RuntimeProfile::SampleFunction sample_fn,
     RuntimeProfile::Counter* dst_counter, PeriodicCounterType type) {
   DCHECK(src_counter == NULL || sample_fn == NULL);
+  DCHECK(src_counter == NULL || src_counter->GetIsSystem() == dst_counter->GetIsSystem());
+  PeriodicCounterUpdater* instance = dst_counter->GetIsSystem() ?
+      system_instance_ : instance_;
 
   switch (type) {
     case RATE_COUNTER: {
@@ -59,8 +80,8 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
       counter.src_counter = src_counter;
       counter.sample_fn = sample_fn;
       counter.elapsed_ms = 0;
-      lock_guard<SpinLock> ratelock(instance_->rate_lock_);
-      instance_->rate_counters_[dst_counter] = counter;
+      lock_guard<SpinLock> ratelock(instance->rate_lock_);
+      instance->rate_counters_[dst_counter] = counter;
       break;
     }
     case SAMPLING_COUNTER: {
@@ -69,8 +90,8 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
       counter.sample_fn = sample_fn;
       counter.num_sampled = 0;
       counter.total_sampled_value = 0;
-      lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
-      instance_->sampling_counters_[dst_counter] = counter;
+      lock_guard<SpinLock> samplinglock(instance->sampling_lock_);
+      instance->sampling_counters_[dst_counter] = counter;
       break;
     }
     default:
@@ -79,35 +100,43 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
 }
 
 void PeriodicCounterUpdater::StopRateCounter(RuntimeProfile::Counter* counter) {
-  lock_guard<SpinLock> ratelock(instance_->rate_lock_);
-  instance_->rate_counters_.erase(counter);
+  PeriodicCounterUpdater* instance = counter->GetIsSystem() ?
+      system_instance_ : instance_;
+  lock_guard<SpinLock> ratelock(instance->rate_lock_);
+  instance->rate_counters_.erase(counter);
 }
 
 void PeriodicCounterUpdater::StopSamplingCounter(RuntimeProfile::Counter* counter) {
-  lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
-  instance_->sampling_counters_.erase(counter);
+  PeriodicCounterUpdater* instance = counter->GetIsSystem() ?
+      system_instance_ : instance_;
+  lock_guard<SpinLock> samplinglock(instance->sampling_lock_);
+  instance->sampling_counters_.erase(counter);
 }
 
 void PeriodicCounterUpdater::RegisterBucketingCounters(
     RuntimeProfile::Counter* src_counter, vector<RuntimeProfile::Counter*>* buckets) {
+  PeriodicCounterUpdater* instance = src_counter->GetIsSystem() ?
+      system_instance_ : instance_;
   BucketCountersInfo info;
   info.src_counter = src_counter;
   info.num_sampled = 0;
-  lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
-  instance_->bucketing_counters_[buckets] = info;
+  lock_guard<SpinLock> bucketinglock(instance->bucketing_lock_);
+  instance->bucketing_counters_[buckets] = info;
 }
 
 void PeriodicCounterUpdater::StopBucketingCounters(
-    vector<RuntimeProfile::Counter*>* buckets) {
+    vector<RuntimeProfile::Counter*>* buckets, bool is_system) {
   int64_t num_sampled = 0;
+  PeriodicCounterUpdater* instance = is_system ? system_instance_ : instance_;
   {
-    lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
+    lock_guard<SpinLock> bucketinglock(instance->bucketing_lock_);
     BucketCountersMap::iterator itr =
-        instance_->bucketing_counters_.find(buckets);
+        instance->bucketing_counters_.find(buckets);
     // If not registered, we have nothing to do.
-    if (itr == instance_->bucketing_counters_.end()) return;
+    if (itr == instance->bucketing_counters_.end()) return;
+    DCHECK(is_system == itr->second.src_counter->GetIsSystem());
     num_sampled = itr->second.num_sampled;
-    instance_->bucketing_counters_.erase(itr);
+    instance->bucketing_counters_.erase(itr);
   }
 
   if (num_sampled > 0) {
@@ -120,20 +149,24 @@ void PeriodicCounterUpdater::StopBucketingCounters(
 
 void PeriodicCounterUpdater::RegisterTimeSeriesCounter(
     RuntimeProfile::TimeSeriesCounter* counter) {
-  lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_);
-  instance_->time_series_counters_.insert(counter);
+  PeriodicCounterUpdater* instance = counter->GetIsSystem() ?
+      system_instance_ : instance_;
+  lock_guard<SpinLock> timeserieslock(instance->time_series_lock_);
+  instance->time_series_counters_.insert(counter);
 }
 
 void PeriodicCounterUpdater::StopTimeSeriesCounter(
     RuntimeProfile::TimeSeriesCounter* counter) {
-  lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_);
-  instance_->time_series_counters_.erase(counter);
+  PeriodicCounterUpdater* instance = counter->GetIsSystem() ?
+     system_instance_ : instance_;
+  lock_guard<SpinLock> timeserieslock(instance->time_series_lock_);
+  instance->time_series_counters_.erase(counter);
 }
 
-void PeriodicCounterUpdater::UpdateLoop() {
+void PeriodicCounterUpdater::UpdateLoop(PeriodicCounterUpdater* instance) {
   while (true) {
     system_time before_time = get_system_time();
-    SleepForMs(FLAGS_periodic_counter_update_period_ms);
+    SleepForMs(update_period_);
     posix_time::time_duration elapsed = get_system_time() - before_time;
     int elapsed_ms = elapsed.total_milliseconds();
 
@@ -143,7 +176,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
     }
 
     {
-      lock_guard<SpinLock> ratelock(instance_->rate_lock_);
+      lock_guard<SpinLock> ratelock(instance->rate_lock_);
       for (RateCounterMap::iterator it = rate_counters_.begin();
            it != rate_counters_.end(); ++it) {
         it->second.elapsed_ms += elapsed_ms;
@@ -160,7 +193,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
     }
 
     {
-      lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
+      lock_guard<SpinLock> samplinglock(instance->sampling_lock_);
       for (SamplingCounterMap::iterator it = sampling_counters_.begin();
            it != sampling_counters_.end(); ++it) {
         ++it->second.num_sampled;
@@ -179,7 +212,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
     }
 
     {
-      lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
+      lock_guard<SpinLock> bucketinglock(instance->bucketing_lock_);
       for (BucketCountersMap::iterator it = bucketing_counters_.begin();
            it != bucketing_counters_.end(); ++it) {
         int64_t val = it->second.src_counter->value();
@@ -190,7 +223,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
     }
 
     {
-      lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_);
+      lock_guard<SpinLock> timeserieslock(instance->time_series_lock_);
       for (TimeSeriesCounters::iterator it = time_series_counters_.begin();
            it != time_series_counters_.end(); ++it) {
         (*it)->AddSample(elapsed_ms);
diff --git a/be/src/util/periodic-counter-updater.h b/be/src/util/periodic-counter-updater.h
index 0f92070e9..6b3ef0eaf 100644
--- a/be/src/util/periodic-counter-updater.h
+++ b/be/src/util/periodic-counter-updater.h
@@ -39,6 +39,11 @@ namespace impala {
 /// future stale samples from polluting the useful values.
 class PeriodicCounterUpdater {
  public:
+
+  PeriodicCounterUpdater(const int32_t update_period)
+    : update_period_(update_period) {
+  }
+
   enum PeriodicCounterType {
     RATE_COUNTER = 0,
     SAMPLING_COUNTER,
@@ -52,7 +57,7 @@ class PeriodicCounterUpdater {
   /// Registers an update function that will be called before individual counters will be
   /// updated. This can be used to update some global metric once before reading it
   /// through individual counters.
-  static void RegisterUpdateFunction(UpdateFn update_fn);
+  static void RegisterUpdateFunction(UpdateFn update_fn, bool is_system);
 
   /// Registers a periodic counter to be updated by the update thread.
   /// Either sample_fn or dst_counter must be non-NULL.  When the periodic counter
@@ -80,7 +85,8 @@ class PeriodicCounterUpdater {
   /// convert the buckets from count to percentage. If not registered, has no effect.
   /// Perioidic counters are updated periodically so should be removed as soon as the
   /// underlying counter is no longer going to change.
-  static void StopBucketingCounters(std::vector<RuntimeProfile::Counter*>* buckets);
+  static void StopBucketingCounters(std::vector<RuntimeProfile::Counter*>* buckets,
+      bool is_system = false);
 
   /// Stops 'counter' from receiving any more samples.
   static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter* counter);
@@ -107,7 +113,7 @@ class PeriodicCounterUpdater {
 
   /// Loop for periodic counter update thread.  This thread wakes up once in a while
   /// and updates all the added rate counters and sampling counters.
-  [[noreturn]] void UpdateLoop();
+  [[noreturn]] void UpdateLoop(PeriodicCounterUpdater* instance);
 
   /// Thread performing asynchronous updates.
   boost::scoped_ptr<boost::thread> update_thread_;
@@ -149,8 +155,14 @@ class PeriodicCounterUpdater {
   typedef boost::unordered_set<RuntimeProfile::TimeSeriesCounter*> TimeSeriesCounters;
   TimeSeriesCounters time_series_counters_;
 
-  /// Singleton object that keeps track of all rate counters and the thread
-  /// for updating them.
+  /// Singleton object that keeps track of all profile rate counters and the thread
+  /// for updating them. Interval set by flag periodic_counter_update_period_ms.
   static PeriodicCounterUpdater* instance_;
+
+  /// Singleton object that keeps track of all system rate counters and the thread
+  /// for updating them. Interval set by flag periodic_system_counter_update_period_ms.
+  static PeriodicCounterUpdater* system_instance_;
+
+  int32_t update_period_;
 };
 }
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index 316c2d083..8b12662e1 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -54,6 +54,8 @@ namespace impala {
   #define ADD_COUNTER(profile, name, unit) (profile)->AddCounter(name, unit)
   #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) \
       (profile)->AddSamplingTimeSeriesCounter(name, src_counter)
+  #define ADD_SYSTEM_TIME_SERIES_COUNTER(profile, name, src_counter) \
+      (profile)->AddSamplingTimeSeriesCounter(name, src_counter, true)
   #define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS)
   #define ADD_SUMMARY_STATS_TIMER(profile, name) \
       (profile)->AddSummaryStatsCounter(name, TUnit::TIME_NS)
@@ -754,6 +756,9 @@ class RuntimeProfile::TimeSeriesCounter {
 
   TUnit::type unit() const { return unit_; }
 
+  void SetIsSystem() { is_system_ = true; }
+  bool GetIsSystem() const { return  is_system_; }
+
  private:
   friend class RuntimeProfile;
 
@@ -794,7 +799,7 @@ class RuntimeProfile::TimeSeriesCounter {
  protected:
   TimeSeriesCounter(const std::string& name, TUnit::type unit,
       SampleFunction fn = SampleFunction())
-    : name_(name), unit_(unit), sample_fn_(fn) {}
+    : name_(name), unit_(unit), sample_fn_(fn), is_system_(false) {}
 
   std::string name_;
   TUnit::type unit_;
@@ -802,6 +807,7 @@ class RuntimeProfile::TimeSeriesCounter {
   /// The number of samples that have been retrieved and cleared from this counter.
   int64_t previous_sample_count_ = 0;
   mutable SpinLock lock_;
+  bool is_system_;
 };
 
 typedef StreamingSampler<int64_t, 64> StreamingCounterSampler;
@@ -811,8 +817,8 @@ class RuntimeProfile::SamplingTimeSeriesCounter
   friend class RuntimeProfile;
 
   SamplingTimeSeriesCounter(
-      const std::string& name, TUnit::type unit, SampleFunction fn)
-    : TimeSeriesCounter(name, unit, fn) {}
+      const std::string& name, TUnit::type unit, SampleFunction fn, int initial_period)
+    : TimeSeriesCounter(name, unit, fn), samples_(initial_period) {}
 
   virtual void AddSampleLocked(int64_t sample, int ms_elapsed) override;
   virtual const int64_t* GetSamplesLocked( int* num_samples, int* period) const override;
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index bd5f4903f..3a446e897 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -1105,7 +1105,7 @@ void ValidateSampler(const StreamingSampler<int, 10>& sampler, int expected_num,
 }
 
 TEST(CountersTest, StreamingSampler) {
-  StreamingSampler<int, 10> sampler;
+  StreamingSampler<int, 10> sampler(500);
 
   int idx = 0;
   for (int i = 0; i < 3; ++i) {
@@ -1623,6 +1623,7 @@ TEST_P(TimeSeriesCounterResampleTest, TestPrettyPrint) {
   RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
 
   const TimeSeriesTestParam& param = GetParam();
+  FLAGS_periodic_counter_update_period_ms = 500;
   const int test_period = FLAGS_periodic_counter_update_period_ms;
 
   // Add a counter with a sample function that counts up, starting from 0.
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 9b7c2b407..0b5ec53ca 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -53,6 +53,7 @@
 
 DECLARE_int32(status_report_interval_ms);
 DECLARE_int32(periodic_counter_update_period_ms);
+DECLARE_int32(periodic_system_counter_update_period_ms);
 
 // This must be set on the coordinator to enable the alternative profile representation.
 // It should not be set on executors - the setting is sent to the executors by
@@ -2018,12 +2019,19 @@ RuntimeProfileBase::SummaryStatsCounter* RuntimeProfile::AddSummaryStatsCounter(
 }
 
 RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
-    const string& name, TUnit::type unit, SampleFunction fn) {
+    const string& name, TUnit::type unit, SampleFunction fn, bool is_system) {
   DCHECK(fn != nullptr);
   lock_guard<SpinLock> l(counter_map_lock_);
   TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
   if (it != time_series_counter_map_.end()) return it->second;
-  TimeSeriesCounter* counter = pool_->Add(new SamplingTimeSeriesCounter(name, unit, fn));
+  int32_t update_interval = is_system ?
+      FLAGS_periodic_system_counter_update_period_ms :
+      FLAGS_periodic_counter_update_period_ms;
+  TimeSeriesCounter* counter = pool_->Add(new SamplingTimeSeriesCounter(name,
+      unit, fn, update_interval));
+  if (is_system) {
+    counter->SetIsSystem();
+  }
   time_series_counter_map_[name] = counter;
   PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter);
   has_active_periodic_counters_ = true;
@@ -2031,10 +2039,10 @@ RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
 }
 
 RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
-    const string& name, Counter* src_counter) {
+    const string& name, Counter* src_counter, bool is_system) {
   DCHECK(src_counter != NULL);
   return AddSamplingTimeSeriesCounter(name, src_counter->unit(),
-      bind(&Counter::value, src_counter));
+      bind(&Counter::value, src_counter), is_system);
 }
 
 void RuntimeProfile::TimeSeriesCounter::AddSample(int ms_elapsed) {
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index c6b8fece6..419b8d916 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -99,7 +99,8 @@ class RuntimeProfileBase {
 
   class Counter {
    public:
-    Counter(TUnit::type unit, int64_t value = 0) : value_(value), unit_(unit) {}
+    Counter(TUnit::type unit, int64_t value = 0)
+      : value_(value), unit_(unit), is_system_(false) {}
     virtual ~Counter(){}
 
     virtual void Add(int64_t delta) {
@@ -144,11 +145,15 @@ class RuntimeProfileBase {
 
     TUnit::type unit() const { return unit_; }
 
+    void SetIsSystem() { is_system_ = true; }
+    bool GetIsSystem() const { return  is_system_; }
+
    protected:
     friend class RuntimeProfile;
 
     AtomicInt64 value_;
     TUnit::type unit_;
+    bool is_system_;
   };
 
   class AveragedCounter;
@@ -658,11 +663,11 @@ class RuntimeProfile : public RuntimeProfileBase {
   /// calling PeriodicCounterUpdater::StopTimeSeriesCounter() if the input stops changing.
   /// Note: these counters don't get merged (to make average profiles)
   TimeSeriesCounter* AddSamplingTimeSeriesCounter(const std::string& name,
-      TUnit::type unit, SampleFunction sample_fn);
+      TUnit::type unit, SampleFunction sample_fn, bool is_system = false);
 
   /// Same as above except the samples are collected from 'src_counter'.
   TimeSeriesCounter* AddSamplingTimeSeriesCounter(const std::string& name, Counter*
-      src_counter);
+      src_counter, bool is_system = false);
 
   /// Adds a chunked time series counter to the profile. This begins sampling immediately.
   /// This counter will collect new samples periodically by calling 'sample_fn()'. Samples
diff --git a/be/src/util/streaming-sampler.h b/be/src/util/streaming-sampler.h
index b395ace5d..cbfd2f204 100644
--- a/be/src/util/streaming-sampler.h
+++ b/be/src/util/streaming-sampler.h
@@ -37,7 +37,7 @@ template<typename T, int MAX_SAMPLES>
 class StreamingSampler {
   static_assert(std::is_arithmetic<T>::value, "Numerical type required");
  public:
-  StreamingSampler(int initial_period = 500)
+  StreamingSampler(int initial_period)
     : samples_collected_(0) ,
       period_(initial_period),
       current_sample_sum_(0),
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 3808eb579..39ba7f896 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -371,7 +371,7 @@ struct TQueryOptions {
   // See comment in ImpalaService.thrift
   74: optional string client_identifier;
 
-  75: optional double resource_trace_ratio = 0;
+  75: optional double resource_trace_ratio = 1;
 
   // See comment in ImpalaService.thrift.
   // The default value is set to 3 as this is the default value of HDFS replicas.
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 67f5ca282..d6cd299c6 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -592,10 +592,10 @@ class TestObservability(ImpalaTestSuite):
     assert any(expected_str in line for line in profile.splitlines())
 
   def test_query_profile_host_resource_metrics_off(self):
-    """Tests that the query profile does not contain resource usage metrics by default or
+    """Tests that the query profile does not contain resource usage metrics
        when disabled explicitly."""
     query = "select count(*), sleep(1000) from functional.alltypes"
-    for query_opts in [None, {'resource_trace_ratio': 0.0}]:
+    for query_opts in [{'resource_trace_ratio': 0.0}]:
       profile = self.execute_query(query, query_opts).runtime_profile
       # Assert that no host resource counters exist in the profile
       for line in profile.splitlines():
@@ -604,23 +604,24 @@ class TestObservability(ImpalaTestSuite):
         assert not re.search("HostDiskReadThroughput", line)
 
   def test_query_profile_contains_host_resource_metrics(self):
-    """Tests that the query profile contains various CPU and network metrics."""
-    query_opts = {'resource_trace_ratio': 1.0}
+    """Tests that the query profile contains various CPU and network metrics
+       by default or when enabled explicitly."""
     query = "select count(*), sleep(1000) from functional.alltypes"
-    profile = self.execute_query(query, query_opts).runtime_profile
-    # We check for 500ms because a query with 1s duration won't hit the 64 values limit
-    # that would trigger resampling.
-    expected_strs = ["HostCpuIoWaitPercentage (500.000ms):",
-                     "HostCpuSysPercentage (500.000ms):",
-                     "HostCpuUserPercentage (500.000ms):",
-                     "HostNetworkRx (500.000ms):",
-                     "HostNetworkTx (500.000ms):",
-                     "HostDiskReadThroughput (500.000ms):",
-                     "HostDiskWriteThroughput (500.000ms):"]
-
-    # Assert that all expected counters exist in the profile.
-    for expected_str in expected_strs:
-      assert any(expected_str in line for line in profile.splitlines()), expected_str
+    for query_opts in [{}, {'resource_trace_ratio': 1.0}]:
+      profile = self.execute_query(query, query_opts).runtime_profile
+      # We check for 50ms because a query with 1s duration won't hit the 64 values limit
+      # that would trigger resampling.
+      expected_strs = ["HostCpuIoWaitPercentage (50.000ms):",
+                     "HostCpuSysPercentage (50.000ms):",
+                     "HostCpuUserPercentage (50.000ms):",
+                     "HostNetworkRx (50.000ms):",
+                     "HostNetworkTx (50.000ms):",
+                     "HostDiskReadThroughput (50.000ms):",
+                     "HostDiskWriteThroughput (50.000ms):"]
+
+      # Assert that all expected counters exist in the profile.
+      for expected_str in expected_strs:
+        assert any(expected_str in line for line in profile.splitlines()), expected_str
 
     # Check that there are some values for each counter.
     for line in profile.splitlines():