You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2021/09/15 13:16:38 UTC

[impala] branch master updated (6d47927 -> 3850d49)

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 6d47927  IMPALA-10904: Add some perf tools to the repository
     new 1e21aa6  IMPALA-9495: Support struct in select list for ORC tables
     new 3850d49  IMPALA-9662,IMPALA-2019(part-3): Support UTF-8 mode in mask functions

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CMakeLists.txt                                     |   2 +-
 be/src/exec/hdfs-orc-scanner.cc                    |   8 +-
 be/src/exec/hdfs-scan-node-base.cc                 |   2 +-
 be/src/exec/hdfs-scanner.cc                        |   2 +-
 be/src/exec/orc-column-readers.cc                  |  35 +-
 be/src/exec/orc-column-readers.h                   |  13 +-
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |   4 +-
 .../parquet/parquet-collection-column-reader.cc    |   2 +-
 be/src/exprs/anyval-util.cc                        |   3 +
 be/src/exprs/expr-test.cc                          |  59 ++
 be/src/exprs/expr-value.h                          |   3 +
 be/src/exprs/mask-functions-ir.cc                  | 305 +++++++++--
 be/src/exprs/mask-functions.h                      |  30 +
 be/src/exprs/scalar-expr-evaluator.cc              |  29 +-
 be/src/exprs/scalar-expr-evaluator.h               |   9 +
 be/src/exprs/scalar-expr.cc                        |   8 +-
 be/src/exprs/scalar-expr.h                         |   7 +-
 be/src/exprs/scalar-expr.inline.h                  |   2 +
 be/src/exprs/slot-ref.cc                           |  50 +-
 be/src/exprs/slot-ref.h                            |  10 +
 be/src/runtime/buffered-tuple-stream-test.cc       |   6 +-
 be/src/runtime/buffered-tuple-stream.cc            |  45 +-
 be/src/runtime/buffered-tuple-stream.h             |   6 +
 be/src/runtime/descriptors.cc                      |  55 +-
 be/src/runtime/descriptors.h                       |  34 +-
 be/src/runtime/raw-value.cc                        |  91 ++++
 be/src/runtime/raw-value.h                         |  23 +
 be/src/runtime/row-batch-serialize-test.cc         |   6 +-
 be/src/runtime/tuple.cc                            |  39 +-
 be/src/runtime/tuple.h                             |   5 +
 be/src/runtime/types.cc                            |   3 +
 be/src/runtime/types.h                             | 123 +++--
 be/src/service/hs2-util.cc                         |  66 ++-
 be/src/service/impala-beeswax-server.cc            |   5 +
 be/src/service/query-result-set.cc                 |  82 +--
 be/src/udf/udf-internal.h                          |  33 ++
 be/src/udf/udf.cc                                  |  13 +
 be/src/udf/udf.h                                   |   3 +-
 be/src/util/debug-util.cc                          |   2 +-
 common/function-registry/impala_functions.py       |  10 +
 .../java/org/apache/impala/analysis/Analyzer.java  |   8 +-
 .../apache/impala/analysis/DescriptorTable.java    |  15 +-
 .../org/apache/impala/analysis/SelectStmt.java     |  14 +-
 .../org/apache/impala/analysis/SlotDescriptor.java |  15 +-
 .../java/org/apache/impala/analysis/SlotRef.java   | 140 ++++-
 .../java/org/apache/impala/analysis/SortInfo.java  |  16 +-
 .../java/org/apache/impala/analysis/Subquery.java  |   4 +-
 .../apache/impala/analysis/TupleDescriptor.java    | 148 ++++-
 .../java/org/apache/impala/catalog/StructType.java |  14 +
 .../java/org/apache/impala/common/TreeNode.java    |  10 +-
 .../org/apache/impala/planner/HdfsScanNode.java    |  14 +-
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  29 +-
 .../apache/impala/analysis/AnalyzeExprsTest.java   |  60 +-
 .../apache/impala/analysis/AnalyzeStmtsTest.java   | 169 +++---
 .../impala/analysis/AnalyzeUpsertStmtTest.java     |   7 +-
 testdata/ComplexTypesTbl/structs.orc               | Bin 0 -> 2744 bytes
 testdata/ComplexTypesTbl/structs.parq              | Bin 0 -> 4062 bytes
 testdata/ComplexTypesTbl/structs_nested.orc        | Bin 0 -> 1208 bytes
 testdata/ComplexTypesTbl/structs_nested.parq       | Bin 0 -> 1859 bytes
 .../functional/functional_schema_template.sql      |  64 +++
 .../datasets/functional/schema_constraints.csv     |   6 +
 .../QueryTest/compute-stats-with-structs.test      |  35 ++
 .../QueryTest/nested-struct-in-select-list.test    | 155 ++++++
 ...anger_column_masking_struct_in_select_list.test |  19 +
 .../queries/QueryTest/struct-in-select-list.test   | 602 +++++++++++++++++++++
 .../queries/QueryTest/utf8-string-functions.test   |  12 +
 tests/authorization/test_ranger.py                 |  47 ++
 tests/common/test_dimensions.py                    |   5 +
 tests/query_test/test_nested_types.py              |  81 +++
 69 files changed, 2516 insertions(+), 406 deletions(-)
 create mode 100644 testdata/ComplexTypesTbl/structs.orc
 create mode 100644 testdata/ComplexTypesTbl/structs.parq
 create mode 100644 testdata/ComplexTypesTbl/structs_nested.orc
 create mode 100644 testdata/ComplexTypesTbl/structs_nested.parq
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/compute-stats-with-structs.test
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/nested-struct-in-select-list.test
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking_struct_in_select_list.test
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/struct-in-select-list.test

[impala] 02/02: IMPALA-9662, IMPALA-2019(part-3): Support UTF-8 mode in mask functions

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3850d49711b88091101cfc3d89da28c76a17b04d
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Mon Aug 16 18:04:19 2021 +0800

    IMPALA-9662,IMPALA-2019(part-3): Support UTF-8 mode in mask functions
    
    Mask functions are used in Ranger column masking policies to mask
    sensitive data. There are 5 mask functions: mask(), mask_first_n(),
    mask_last_n(), mask_show_first_n(), mask_show_last_n(). Take mask() as
    an example, by default, it will mask uppercase to 'X', lowercase to 'x',
    digits to 'n' and leave other characters unmasked. For masking all
    characters to '*', we can use
      mask(my_col, '*', '*', '*', '*');
    The current implementations mask strings byte-to-byte, which have
    inconsistent results with Hive when the string contains unicode
    characters:
      mask('中国', '*', '*', '*', '*') => '******'
    Each Chinese character is encoded into 3 bytes in UTF-8 so we get the
    above result. The result in Hive is '**' since there are two Chinese
    characters.
    
    This patch provides consistent masking behavior with Hive for
    strings under the UTF-8 mode, i.e., set UTF8_MODE=true. In UTF-8 mode,
    the masked unit of a string is a unicode code point.
    
    Implementation
     - Extends the existing MaskTransform function to deal with unicode code
       points(represented by uint32_t).
     - Extends the existing GetFirstChar function to get the code point of
       given masked charactors in UTF-8 mode.
     - Implement a MaskSubStrUtf8 method as the core functionality.
     - Swith to use MaskSubStrUtf8 instead of MaskSubStr in UTF-8 mode.
     - For better testing, this patch also adds an overload for all mask
       functions for only masking other chars but keeping the
       upper/lower/digit chars unmasked. E.g. mask({col}, -1, -1, -1, 'X').
    
    Tests
     - Add BE tests in expr-test
     - Add e2e tests in utf8-string-functions.test
    
    Change-Id: I1276eccc94c9528507349b155a51e76f338367d5
    Reviewed-on: http://gerrit.cloudera.org:8080/17780
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 CMakeLists.txt                                     |   2 +-
 be/src/exprs/expr-test.cc                          |  59 ++++
 be/src/exprs/mask-functions-ir.cc                  | 305 +++++++++++++++++----
 be/src/exprs/mask-functions.h                      |  30 ++
 common/function-registry/impala_functions.py       |  10 +
 .../queries/QueryTest/utf8-string-functions.test   |  12 +
 6 files changed, 364 insertions(+), 54 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index de769c5..571886c 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -166,7 +166,7 @@ function(IMPALA_ADD_THIRDPARTY_LIB NAME HEADER STATIC_LIB SHARED_LIB)
 endfunction()
 
 
-find_package(Boost REQUIRED COMPONENTS thread regex filesystem system date_time random)
+find_package(Boost REQUIRED COMPONENTS thread regex filesystem system date_time random locale)
 # Mark Boost as a system header to avoid compile warnings.
 include_directories(SYSTEM ${Boost_INCLUDE_DIRS})
 message(STATUS "Boost include dir: " ${Boost_INCLUDE_DIRS})
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 46f2113..a7c0aa7 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -10646,6 +10646,65 @@ TEST_P(ExprTest, MaskHashTest) {
   TestIsNull("mask_hash(cast('2016-04-20' as timestamp))", TYPE_TIMESTAMP);
 }
 
+TEST_P(ExprTest, Utf8MaskTest) {
+  executor_->PushExecOption("utf8_mode=true");
+  // Default is no masking for other chars so Chinese charactors are unmasked.
+  TestStringValue("mask('hello李小龙')", "xxxxx李小龙");
+  // Keeps upper, lower, digit chars and masks other chars as 'x'.
+  TestStringValue("mask('hello李小龙', -1, -1, -1, 'X')", "helloXXX");
+  TestStringValue("mask_last_n('hello李小龙', 4, -1, -1, -1, 'x')", "helloxxx");
+  TestStringValue("mask_last_n('hello李小龙', 2, -1, -1, -1, 'x')", "hello李xx");
+  TestStringValue("mask_last_n('hello李小龙', 4, 'x', 'x', 'x', 'X')", "hellxXXX");
+  TestStringValue("mask_show_first_n('hello李小龙', 6, 'x', 'x', 'x', 'X')",
+      "hello李XX");
+  TestStringValue("mask_show_first_n('hello李小龙', 4, -1, -1, -1, 'X')", "helloXXX");
+  TestStringValue("mask_show_first_n('hello李小龙', 4, 'x', 'x', 'x', 'X')",
+      "hellxXXX");
+  TestStringValue("mask_first_n('hello李小龙', 5)", "xxxxx李小龙");
+  // Default is no masking for other chars so Chinese charactors are unmasked.
+  TestStringValue("mask_first_n('hello李小龙', 6)", "xxxxx李小龙");
+  TestStringValue("mask_first_n('hello李小龙', 6, 'x', 'x', 'x', 'X')",
+      "xxxxxX小龙");
+  TestStringValue("mask_show_last_n('hello李小龙', 2, 'x', 'x', 'x', 'X')",
+      "xxxxxX小龙");
+  TestStringValue("mask_show_last_n('hello李小龙', 4, 'x', 'x', 'x', 'X')",
+      "xxxxo李小龙");
+
+  // Test masking unicode upper/lower cases.
+  TestStringValue("mask('abcd áäèü ABCD ÁÄÈÜ')", "xxxx xxxx XXXX XXXX");
+  TestStringValue("mask('Ich möchte ein Bier. Tschüss')",
+      "Xxx xxxxxx xxx Xxxx. Xxxxxxx");
+  TestStringValue("mask('Hungarian áéíöóőüúű ÁÉÍÖÓŐÜÚŰ')",
+      "Xxxxxxxxx xxxxxxxxx XXXXXXXXX");
+  TestStringValue("mask('German äöüß ÄÖÜẞ')", "Xxxxxx xxxx XXXX");
+  TestStringValue(
+      "mask('French àâæçéèêëïîôœùûüÿ ÀÂÆÇÉÈÊËÏÎÔŒÙÛÜŸ')",
+      "Xxxxxx xxxxxxxxxxxxxxxx XXXXXXXXXXXXXXXX");
+  TestStringValue("mask('Greek αβξδ άέήώ ΑΒΞΔ ΆΈΉΏ 1234')",
+      "Xxxxx xxxx xxxx XXXX XXXX nnnn");
+  TestStringValue("mask_first_n('áéíöóőüúű')", "xxxxóőüúű");
+  TestStringValue("mask_show_first_n('áéíöóőüúű')", "áéíöxxxxx");
+  TestStringValue("mask_last_n('áéíöóőüúű')", "áéíöóxxxx");
+  TestStringValue("mask_show_last_n('áéíöóőüúű')", "xxxxxőüúű");
+
+  // Test masking to unicode code points. Specify -1(unmask) for masking upper/lower/digit
+  // chars.
+  TestStringValue("mask('hello李小龙', -1, -1, -1, '某')", "hello某某某");
+  TestStringValue("mask_last_n('hello李小龙', 4, -1, -1, -1, '某')",
+      "hello某某某");
+  TestStringValue("mask_last_n('hello李小龙', 2, -1, -1, -1, '某')",
+      "hello李某某");
+  TestStringValue("mask_show_first_n('hello李小龙', 4, -1, -1, -1, '某')",
+      "hello某某某");
+  TestStringValue("mask_show_first_n('hello李小龙', 6, -1, -1, -1, '某')",
+      "hello李某某");
+  TestStringValue("mask_first_n('李小龙hello', 4, -1, -1, -1, '某')",
+      "某某某hello");
+  TestStringValue("mask_show_last_n('李小龙hello', 5, -1, -1, -1, '某')",
+      "某某某hello");
+  executor_->PopExecOption();
+}
+
 TEST_P(ExprTest, Utf8Test) {
   // Verifies utf8_length() counts length by UTF-8 characters instead of bytes.
   // '你' and '好' are both encoded into 3 bytes.
diff --git a/be/src/exprs/mask-functions-ir.cc b/be/src/exprs/mask-functions-ir.cc
index 2bbde4e..c96be18 100644
--- a/be/src/exprs/mask-functions-ir.cc
+++ b/be/src/exprs/mask-functions-ir.cc
@@ -17,6 +17,8 @@
 
 #include "exprs/mask-functions.h"
 
+#include <boost/locale/generator.hpp>
+#include <boost/locale/utf8_codecvt.hpp>
 #include <gutil/strings/substitute.h>
 #include <openssl/crypto.h>
 #include <openssl/err.h>
@@ -31,6 +33,7 @@
 
 using namespace impala;
 using namespace impala_udf;
+using namespace boost::locale;
 
 const static int CHAR_COUNT = 4;
 const static int MASKED_UPPERCASE = 'X';
@@ -43,19 +46,43 @@ const static int MASKED_MONTH_COMPONENT_VAL = 0;
 const static int MASKED_YEAR_COMPONENT_VAL = 1;
 const static int UNMASKED_VAL = -1;
 
-/// Mask the given char depending on its type. UNMASKED_VAL(-1) means keeping the
-/// original value.
-static inline uint8_t MaskTransform(uint8_t val, int masked_upper_char,
-    int masked_lower_char, int masked_digit_char, int masked_other_char) {
-  if ('A' <= val && val <= 'Z') {
+/// Masks the given unicode code point depending on its range and the (optional) given
+/// locale. By default, if no locale is provided, i.e. loc == nullptr,
+/// lowercase/uppercase/digit characters are only recognized in ascii character set.
+/// UNMASKED_VAL(-1) means keeping the original value.
+/// Returns the masked code point.
+static inline uint32_t MaskTransform(uint32_t val, int masked_upper_char,
+    int masked_lower_char, int masked_digit_char, int masked_other_char,
+    std::locale* loc = nullptr) {
+  // Fast code path for masking ascii characters only.
+  if (loc == nullptr) {
+    if ('A' <= val && val <= 'Z') {
+      if (masked_upper_char == UNMASKED_VAL) return val;
+      return masked_upper_char;
+    }
+    if ('a' <= val && val <= 'z') {
+      if (masked_lower_char == UNMASKED_VAL) return val;
+      return masked_lower_char;
+    }
+    if ('0' <= val && val <= '9') {
+      if (masked_digit_char == UNMASKED_VAL) return val;
+      return masked_digit_char;
+    }
+    if (masked_other_char == UNMASKED_VAL) return val;
+    return masked_other_char;
+  }
+  // Check facet existence to avoid predicates throws exception.
+  DCHECK(std::has_facet<std::ctype<wchar_t>>(*loc))
+      << "Facet not found for locale " << loc->name();
+  if (isupper((wchar_t)val, *loc)) {
     if (masked_upper_char == UNMASKED_VAL) return val;
     return masked_upper_char;
   }
-  if ('a' <= val && val <= 'z') {
+  if (islower((wchar_t)val, *loc)) {
     if (masked_lower_char == UNMASKED_VAL) return val;
     return masked_lower_char;
   }
-  if ('0' <= val && val <= '9') {
+  if (isdigit((wchar_t)val, *loc)) {
     if (masked_digit_char == UNMASKED_VAL) return val;
     return masked_digit_char;
   }
@@ -64,7 +91,7 @@ static inline uint8_t MaskTransform(uint8_t val, int masked_upper_char,
 }
 
 /// Mask the substring in range [start, end) of the given string value. Using rules in
-/// 'MaskTransform'.
+/// 'MaskTransform'. Indices are counted in bytes.
 static StringVal MaskSubStr(FunctionContext* ctx, const StringVal& val,
     int start, int end, int masked_upper_char, int masked_lower_char,
     int masked_digit_char, int masked_other_char) {
@@ -82,6 +109,108 @@ static StringVal MaskSubStr(FunctionContext* ctx, const StringVal& val,
   return result;
 }
 
+/// Checks whether the unicode code point is malformed, i.e. illegal or incomplete, and
+/// warns if it is. Returns true if any warning is added.
+static bool CheckAndWarnCodePoint(FunctionContext* ctx, uint32_t code_point) {
+  if (code_point == utf::illegal || code_point == utf::incomplete) {
+    ctx->AddWarning(Substitute("String contains $0 code point. Return NULL.",
+        code_point == utf::illegal ? "illegal" : "incomplete").c_str());
+    return true;
+  }
+  return false;
+}
+
+/// Mask the substring in range [start, end) of the given string value. Using rules in
+/// 'MaskTransform'. Indices are counted in UTF-8 code points.
+static StringVal MaskSubStrUtf8(FunctionContext* ctx, const StringVal& val,
+    int start, int end, int masked_upper_char, int masked_lower_char,
+    int masked_digit_char, int masked_other_char) {
+  DCHECK_GE(start, 0);
+  DCHECK_LT(start, end);
+  DCHECK_LE(end, val.len);
+  const char* p_start = reinterpret_cast<char*>(val.ptr);
+  const char* p_end = p_start + val.len;
+  const char* p = p_start;
+  utf8_codecvt<char>::state_type cvt_state;
+  int char_cnt = 0;
+  // Skip leading 'start' code points. Leading bytes will be copied directly.
+  while (char_cnt < start && p != p_end) {
+    uint32_t codepoint = utf8_codecvt<char>::to_unicode(cvt_state, p, p_end);
+    if (CheckAndWarnCodePoint(ctx, codepoint)) return StringVal::null();
+    ++char_cnt;
+  }
+  // Calculating the result length in bytes.
+  int result_bytes = p - p_start;
+  int leading_bytes = result_bytes;
+  // Collect code points at range [start, end - 1) and mask them.
+  vector<uint32_t> masked_code_points;
+  // Create unicode locale for checking upper/lower cases or digits.
+  // TODO(quanlong): Avoid creating this everytime if this is time/resource-consuming.
+  boost::locale::generator gen;
+  unique_ptr<std::locale> loc = make_unique<std::locale>(gen("en_US.UTF-8"));
+  // Check facet existence to avoid predicates throws exception.
+  if (!std::has_facet<std::ctype<wchar_t>>(*loc)) {
+    ctx->SetError("Cannot mask unicode strings since locale en_US.UTF-8 not found!");
+    return StringVal();
+  }
+  while (char_cnt < end && p != p_end) {
+    // Parse and get the first code point in string range [p, p_end).
+    // 'to_unicode' will update the pointer 'p'.
+    uint32_t codepoint = utf8_codecvt<char>::to_unicode(cvt_state, p, p_end);
+    if (CheckAndWarnCodePoint(ctx, codepoint)) return StringVal::null();
+    codepoint = MaskTransform(codepoint, masked_upper_char, masked_lower_char,
+        masked_digit_char, masked_other_char, loc.get());
+    masked_code_points.push_back(codepoint);
+    result_bytes += utf::utf_traits<char>::width(codepoint);
+    ++char_cnt;
+  }
+  // Trailing bytes will be copied directly without masking.
+  int tail_len = p_end - p;
+  result_bytes += tail_len;
+
+  StringVal result(ctx, result_bytes);
+  if (UNLIKELY(result.is_null)) return result;
+  // Copy leading bytes.
+  Ubsan::MemCpy(result.ptr, val.ptr, leading_bytes);
+  // Converting masked code points to UTF-8 encoded bytes.
+  char* ptr = reinterpret_cast<char*>(result.ptr) + leading_bytes;
+  p_end = reinterpret_cast<char*>(result.ptr) + result_bytes;
+  for (uint32_t c : masked_code_points) {
+    uint32_t width = utf8_codecvt<char>::from_unicode(cvt_state, c, ptr, p_end);
+    DCHECK(width != utf::illegal && width != utf::incomplete);
+    ptr += width;
+    DCHECK(ptr <= p_end);
+  }
+  // Copy trailing bytes.
+  if (tail_len > 0) {
+    DCHECK(ptr < p_end);
+    Ubsan::MemCpy(ptr, val.ptr + val.len - tail_len, tail_len);
+  }
+  result.len = result_bytes;
+  return result;
+}
+
+/// Counting code points in the UTF-8 encoded string using the same method, 'to_unicode',
+/// as MaskSubStrUtf8 uses. So we can have a consistent behavior.
+/// Returns -1 if the string contains malformed(illegal/incomplete) code points.
+static int GetUtf8CodePointCount(FunctionContext* ctx, const StringVal& val) {
+  utf8_codecvt<char>::state_type cvt_state;
+  const char* p = reinterpret_cast<char*>(val.ptr);
+  const char* p_end = p + val.len;
+  int char_cnt = 0;
+  while (p != p_end) {
+    uint32_t c = utf8_codecvt<char>::to_unicode(cvt_state, p, p_end);
+    if (c == utf::illegal || c == utf::incomplete) {
+      ctx->SetError(Substitute("The $0-th code point $1 is $2",
+          char_cnt, AnyValUtil::ToString(val),
+          c == utf::illegal ? "illegal" : "incomplete").c_str());
+      return -1;
+    }
+    ++char_cnt;
+  }
+  return char_cnt;
+}
+
 /// Mask the given string except the first 'un_mask_char_count' chars. Ported from
 /// org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskShowFirstN.
 static inline StringVal MaskShowFirstNImpl(FunctionContext* ctx, const StringVal& val,
@@ -90,7 +219,11 @@ static inline StringVal MaskShowFirstNImpl(FunctionContext* ctx, const StringVal
   // To be consistent with Hive, negative char_count is treated as 0.
   if (un_mask_char_count < 0) un_mask_char_count = 0;
   if (val.is_null || val.len == 0 || un_mask_char_count >= val.len) return val;
-  return MaskSubStr(ctx, val, un_mask_char_count, val.len, masked_upper_char,
+  if (!ctx->impl()->GetConstFnAttr(FunctionContextImpl::UTF8_MODE)) {
+    return MaskSubStr(ctx, val, un_mask_char_count, val.len, masked_upper_char,
+        masked_lower_char, masked_digit_char, masked_other_char);
+  }
+  return MaskSubStrUtf8(ctx, val, un_mask_char_count, val.len, masked_upper_char,
       masked_lower_char, masked_digit_char, masked_other_char);
 }
 
@@ -102,8 +235,14 @@ static inline StringVal MaskShowLastNImpl(FunctionContext* ctx, const StringVal&
   // To be consistent with Hive, negative char_count is treated as 0.
   if (un_mask_char_count < 0) un_mask_char_count = 0;
   if (val.is_null || val.len == 0 || un_mask_char_count >= val.len) return val;
-  return MaskSubStr(ctx, val, 0, val.len - un_mask_char_count, masked_upper_char,
-      masked_lower_char, masked_digit_char, masked_other_char);
+  if (!ctx->impl()->GetConstFnAttr(FunctionContextImpl::UTF8_MODE)) {
+    return MaskSubStr(ctx, val, 0, val.len - un_mask_char_count, masked_upper_char,
+        masked_lower_char, masked_digit_char, masked_other_char);
+  }
+  int end = GetUtf8CodePointCount(ctx, val) - un_mask_char_count;
+  if (end <= 0) return val;
+  return MaskSubStrUtf8(ctx, val, 0, end, masked_upper_char, masked_lower_char,
+      masked_digit_char, masked_other_char);
 }
 
 /// Mask the first 'mask_char_count' chars of the given string. Ported from
@@ -113,7 +252,11 @@ static inline StringVal MaskFirstNImpl(FunctionContext* ctx, const StringVal& va
     int masked_digit_char, int masked_other_char) {
   if (mask_char_count <= 0 || val.is_null || val.len == 0) return val;
   if (mask_char_count > val.len) mask_char_count = val.len;
-  return MaskSubStr(ctx, val, 0, mask_char_count, masked_upper_char,
+  if (!ctx->impl()->GetConstFnAttr(FunctionContextImpl::UTF8_MODE)) {
+    return MaskSubStr(ctx, val, 0, mask_char_count, masked_upper_char, masked_lower_char,
+        masked_digit_char, masked_other_char);
+  }
+  return MaskSubStrUtf8(ctx, val, 0, mask_char_count, masked_upper_char,
       masked_lower_char, masked_digit_char, masked_other_char);
 }
 
@@ -124,8 +267,14 @@ static inline StringVal MaskLastNImpl(FunctionContext* ctx, const StringVal& val
     int masked_digit_char, int masked_other_char) {
   if (mask_char_count <= 0 || val.is_null || val.len == 0) return val;
   if (mask_char_count > val.len) mask_char_count = val.len;
-  return MaskSubStr(ctx, val, val.len - mask_char_count, val.len, masked_upper_char,
-      masked_lower_char, masked_digit_char, masked_other_char);
+  if (!ctx->impl()->GetConstFnAttr(FunctionContextImpl::UTF8_MODE)) {
+    return MaskSubStr(ctx, val, val.len - mask_char_count, val.len, masked_upper_char,
+        masked_lower_char, masked_digit_char, masked_other_char);
+  }
+  int start = GetUtf8CodePointCount(ctx, val) - mask_char_count;
+  if (start < 0) start = 0;
+  return MaskSubStrUtf8(ctx, val, start, val.len, masked_upper_char, masked_lower_char,
+      masked_digit_char, masked_other_char);
 }
 
 /// Mask the whole given string. Ported from
@@ -134,8 +283,12 @@ static inline StringVal MaskImpl(FunctionContext* ctx, const StringVal& val,
     int masked_upper_char, int masked_lower_char, int masked_digit_char,
     int masked_other_char) {
   if (val.is_null || val.len == 0) return val;
-  return MaskSubStr(ctx, val, 0, val.len, masked_upper_char,
-      masked_lower_char, masked_digit_char, masked_other_char);
+  if (!ctx->impl()->GetConstFnAttr(FunctionContextImpl::UTF8_MODE)) {
+    return MaskSubStr(ctx, val, 0, val.len, masked_upper_char,
+        masked_lower_char, masked_digit_char, masked_other_char);
+  }
+  return MaskSubStrUtf8(ctx, val, 0, val.len, masked_upper_char, masked_lower_char,
+      masked_digit_char, masked_other_char);
 }
 
 static inline int GetNumDigits(int64_t val) {
@@ -254,10 +407,26 @@ static DateVal MaskImpl(FunctionContext* ctx, const DateVal& val, int day_value,
   return DateValue(year, month, day).ToDateVal();
 }
 
-static inline uint8_t GetFirstChar(const StringVal& str, uint8_t default_value) {
+/// Gets the first character of 'str'. Returns 'default_value' if 'str' is empty.
+/// In UTF-8 mode, the first code point is returned.
+/// Otherwise, the first char is returned.
+static inline uint32_t GetFirstChar(FunctionContext* ctx, const StringVal& str,
+    uint32_t default_value) {
   // To be consistent with Hive, empty string is converted to default value. String with
   // length > 1 will only use its first char.
-  return str.len == 0 ? default_value : str.ptr[0];
+  if (str.len == 0) return default_value;
+  if (!ctx->impl()->GetConstFnAttr(FunctionContextImpl::UTF8_MODE)) return str.ptr[0];
+
+  utf8_codecvt<char>::state_type cvt_state;
+  const char* p = reinterpret_cast<char*>(str.ptr);
+  uint32_t c = utf8_codecvt<char>::to_unicode(cvt_state, p, p + str.len);
+  if (c == utf::illegal || c == utf::incomplete) {
+    string msg = Substitute("$0 unicode code point found in the beginning of $1",
+        c == utf::illegal ? "Illegal" : "Incomplete", AnyValUtil::ToString(str));
+    ctx->SetError(msg.c_str());
+    return default_value;
+  }
+  return c;
 }
 
 /// Get digit (masked_number) from StringVal. Only accept digits or -1.
@@ -288,10 +457,16 @@ StringVal MaskFunctions::MaskShowFirstN(FunctionContext* ctx, const StringVal& v
     const IntVal& char_count, const StringVal& upper_char, const StringVal& lower_char,
     const StringVal& digit_char, const StringVal& other_char) {
   return MaskShowFirstNImpl(ctx, val, char_count.val,
-      GetFirstChar(upper_char, MASKED_UPPERCASE),
-      GetFirstChar(lower_char, MASKED_LOWERCASE),
-      GetFirstChar(digit_char, MASKED_DIGIT),
-      GetFirstChar(other_char, MASKED_OTHER_CHAR));
+      GetFirstChar(ctx, upper_char, MASKED_UPPERCASE),
+      GetFirstChar(ctx, lower_char, MASKED_LOWERCASE),
+      GetFirstChar(ctx, digit_char, MASKED_DIGIT),
+      GetFirstChar(ctx, other_char, MASKED_OTHER_CHAR));
+}
+StringVal MaskFunctions::MaskShowFirstN(FunctionContext* ctx, const StringVal& val,
+    const IntVal& char_count, const IntVal& upper_char, const IntVal& lower_char,
+    const IntVal& digit_char, const StringVal& other_char) {
+  return MaskShowFirstNImpl(ctx, val, char_count.val, upper_char.val, lower_char.val,
+      digit_char.val, GetFirstChar(ctx, other_char, MASKED_OTHER_CHAR));
 }
 StringVal MaskFunctions::MaskShowFirstN(FunctionContext* ctx, const StringVal& val,
     const IntVal& char_count, const StringVal& upper_char, const StringVal& lower_char,
@@ -305,9 +480,9 @@ StringVal MaskFunctions::MaskShowFirstN(FunctionContext* ctx, const StringVal& v
     const StringVal& digit_char, const IntVal& other_char,
     const StringVal& number_char) {
   return MaskShowFirstNImpl(ctx, val, char_count.val,
-      GetFirstChar(upper_char, MASKED_UPPERCASE),
-      GetFirstChar(lower_char, MASKED_LOWERCASE),
-      GetFirstChar(digit_char, MASKED_DIGIT),
+      GetFirstChar(ctx, upper_char, MASKED_UPPERCASE),
+      GetFirstChar(ctx, lower_char, MASKED_LOWERCASE),
+      GetFirstChar(ctx, digit_char, MASKED_DIGIT),
       other_char.val);
 }
 StringVal MaskFunctions::MaskShowFirstN(FunctionContext* ctx, const StringVal& val,
@@ -369,10 +544,16 @@ StringVal MaskFunctions::MaskShowLastN(FunctionContext* ctx, const StringVal& va
     const IntVal& char_count, const StringVal& upper_char, const StringVal& lower_char,
     const StringVal& digit_char, const StringVal& other_char) {
   return MaskShowLastNImpl(ctx, val, char_count.val,
-      GetFirstChar(upper_char, MASKED_UPPERCASE),
-      GetFirstChar(lower_char, MASKED_LOWERCASE),
-      GetFirstChar(digit_char, MASKED_DIGIT),
-      GetFirstChar(other_char, MASKED_OTHER_CHAR));
+      GetFirstChar(ctx, upper_char, MASKED_UPPERCASE),
+      GetFirstChar(ctx, lower_char, MASKED_LOWERCASE),
+      GetFirstChar(ctx, digit_char, MASKED_DIGIT),
+      GetFirstChar(ctx, other_char, MASKED_OTHER_CHAR));
+}
+StringVal MaskFunctions::MaskShowLastN(FunctionContext* ctx, const StringVal& val,
+    const IntVal& char_count, const IntVal& upper_char, const IntVal& lower_char,
+    const IntVal& digit_char, const StringVal& other_char) {
+  return MaskShowLastNImpl(ctx, val, char_count.val, upper_char.val, lower_char.val,
+      digit_char.val, GetFirstChar(ctx, other_char, MASKED_OTHER_CHAR));
 }
 StringVal MaskFunctions::MaskShowLastN(FunctionContext* ctx, const StringVal& val,
     const IntVal& char_count, const StringVal& upper_char, const StringVal& lower_char,
@@ -386,9 +567,9 @@ StringVal MaskFunctions::MaskShowLastN(FunctionContext* ctx, const StringVal& va
     const StringVal& digit_char, const IntVal& other_char,
     const StringVal& number_char) {
   return MaskShowLastNImpl(ctx, val, char_count.val,
-      GetFirstChar(upper_char, MASKED_UPPERCASE),
-      GetFirstChar(lower_char, MASKED_LOWERCASE),
-      GetFirstChar(digit_char, MASKED_DIGIT),
+      GetFirstChar(ctx, upper_char, MASKED_UPPERCASE),
+      GetFirstChar(ctx, lower_char, MASKED_LOWERCASE),
+      GetFirstChar(ctx, digit_char, MASKED_DIGIT),
       other_char.val);
 }
 StringVal MaskFunctions::MaskShowLastN(FunctionContext* ctx, const StringVal& val,
@@ -440,10 +621,16 @@ StringVal MaskFunctions::MaskFirstN(FunctionContext* ctx, const StringVal& val,
     const IntVal& char_count, const StringVal& upper_char, const StringVal& lower_char,
     const StringVal& digit_char, const StringVal& other_char) {
   return MaskFirstNImpl(ctx, val, char_count.val,
-      GetFirstChar(upper_char, MASKED_UPPERCASE),
-      GetFirstChar(lower_char, MASKED_LOWERCASE),
-      GetFirstChar(digit_char, MASKED_DIGIT),
-      GetFirstChar(other_char, MASKED_OTHER_CHAR));
+      GetFirstChar(ctx, upper_char, MASKED_UPPERCASE),
+      GetFirstChar(ctx, lower_char, MASKED_LOWERCASE),
+      GetFirstChar(ctx, digit_char, MASKED_DIGIT),
+      GetFirstChar(ctx, other_char, MASKED_OTHER_CHAR));
+}
+StringVal MaskFunctions::MaskFirstN(FunctionContext* ctx, const StringVal& val,
+    const IntVal& char_count, const IntVal& upper_char, const IntVal& lower_char,
+    const IntVal& digit_char, const StringVal& other_char) {
+  return MaskFirstNImpl(ctx, val, char_count.val, upper_char.val, lower_char.val,
+      digit_char.val, GetFirstChar(ctx, other_char, MASKED_OTHER_CHAR));
 }
 StringVal MaskFunctions::MaskFirstN(FunctionContext* ctx, const StringVal& val,
     const IntVal& char_count, const StringVal& upper_char, const StringVal& lower_char,
@@ -457,9 +644,9 @@ StringVal MaskFunctions::MaskFirstN(FunctionContext* ctx, const StringVal& val,
     const StringVal& digit_char, const IntVal& other_char,
     const StringVal& number_char) {
   return MaskFirstNImpl(ctx, val, char_count.val,
-      GetFirstChar(upper_char, MASKED_UPPERCASE),
-      GetFirstChar(lower_char, MASKED_LOWERCASE),
-      GetFirstChar(digit_char, MASKED_DIGIT),
+      GetFirstChar(ctx, upper_char, MASKED_UPPERCASE),
+      GetFirstChar(ctx, lower_char, MASKED_LOWERCASE),
+      GetFirstChar(ctx, digit_char, MASKED_DIGIT),
       other_char.val);
 }
 StringVal MaskFunctions::MaskFirstN(FunctionContext* ctx, const StringVal& val,
@@ -511,10 +698,16 @@ StringVal MaskFunctions::MaskLastN(FunctionContext* ctx, const StringVal& val,
     const IntVal& char_count, const StringVal& upper_char, const StringVal& lower_char,
     const StringVal& digit_char, const StringVal& other_char) {
   return MaskLastNImpl(ctx, val, char_count.val,
-      GetFirstChar(upper_char, MASKED_UPPERCASE),
-      GetFirstChar(lower_char, MASKED_LOWERCASE),
-      GetFirstChar(digit_char, MASKED_DIGIT),
-      GetFirstChar(other_char, MASKED_OTHER_CHAR));
+      GetFirstChar(ctx, upper_char, MASKED_UPPERCASE),
+      GetFirstChar(ctx, lower_char, MASKED_LOWERCASE),
+      GetFirstChar(ctx, digit_char, MASKED_DIGIT),
+      GetFirstChar(ctx, other_char, MASKED_OTHER_CHAR));
+}
+StringVal MaskFunctions::MaskLastN(FunctionContext* ctx, const StringVal& val,
+    const IntVal& char_count, const IntVal& upper_char, const IntVal& lower_char,
+    const IntVal& digit_char, const StringVal& other_char) {
+  return MaskLastNImpl(ctx, val, char_count.val, upper_char.val, lower_char.val,
+      digit_char.val, GetFirstChar(ctx, other_char, MASKED_OTHER_CHAR));
 }
 StringVal MaskFunctions::MaskLastN(FunctionContext* ctx, const StringVal& val,
     const IntVal& char_count, const StringVal& upper_char, const StringVal& lower_char,
@@ -528,9 +721,9 @@ StringVal MaskFunctions::MaskLastN(FunctionContext* ctx, const StringVal& val,
     const StringVal& digit_char, const IntVal& other_char,
     const StringVal& number_char) {
   return MaskLastNImpl(ctx, val, char_count.val,
-      GetFirstChar(upper_char, MASKED_UPPERCASE),
-      GetFirstChar(lower_char, MASKED_LOWERCASE),
-      GetFirstChar(digit_char, MASKED_DIGIT),
+      GetFirstChar(ctx, upper_char, MASKED_UPPERCASE),
+      GetFirstChar(ctx, lower_char, MASKED_LOWERCASE),
+      GetFirstChar(ctx, digit_char, MASKED_DIGIT),
       other_char.val);
 }
 StringVal MaskFunctions::MaskLastN(FunctionContext* ctx, const StringVal& val,
@@ -577,10 +770,10 @@ StringVal MaskFunctions::Mask(FunctionContext* ctx, const StringVal& val,
     const StringVal& upper_char, const StringVal& lower_char,
     const StringVal& digit_char, const StringVal& other_char) {
   return MaskImpl(ctx, val,
-      GetFirstChar(upper_char, MASKED_UPPERCASE),
-      GetFirstChar(lower_char, MASKED_LOWERCASE),
-      GetFirstChar(digit_char, MASKED_DIGIT),
-      GetFirstChar(other_char, MASKED_OTHER_CHAR));
+      GetFirstChar(ctx, upper_char, MASKED_UPPERCASE),
+      GetFirstChar(ctx, lower_char, MASKED_LOWERCASE),
+      GetFirstChar(ctx, digit_char, MASKED_DIGIT),
+      GetFirstChar(ctx, other_char, MASKED_OTHER_CHAR));
 }
 StringVal MaskFunctions::Mask(FunctionContext* ctx, const StringVal& val,
     const StringVal& upper_char, const StringVal& lower_char,
@@ -599,9 +792,9 @@ StringVal MaskFunctions::Mask(FunctionContext* ctx, const StringVal& val,
     const StringVal& digit_char, const IntVal& other_char,
     const StringVal& number_char) {
   return MaskImpl(ctx, val,
-      GetFirstChar(upper_char, MASKED_UPPERCASE),
-      GetFirstChar(lower_char, MASKED_LOWERCASE),
-      GetFirstChar(digit_char, MASKED_DIGIT),
+      GetFirstChar(ctx, upper_char, MASKED_UPPERCASE),
+      GetFirstChar(ctx, lower_char, MASKED_LOWERCASE),
+      GetFirstChar(ctx, digit_char, MASKED_DIGIT),
       other_char.val);
 }
 StringVal MaskFunctions::Mask(FunctionContext* ctx, const StringVal& val,
@@ -619,6 +812,12 @@ StringVal MaskFunctions::Mask(FunctionContext* ctx, const StringVal& val,
   return Mask(ctx, val, upper_char, lower_char, digit_char, other_char, number_char);
 }
 StringVal MaskFunctions::Mask(FunctionContext* ctx, const StringVal& val,
+    const IntVal& upper_char, const IntVal& lower_char,
+    const IntVal& digit_char, const StringVal& other_char) {
+  return MaskImpl(ctx, val, upper_char.val, lower_char.val, digit_char.val,
+      GetFirstChar(ctx, other_char, MASKED_OTHER_CHAR));
+}
+StringVal MaskFunctions::Mask(FunctionContext* ctx, const StringVal& val,
     const IntVal& upper_char, const IntVal& lower_char, const IntVal& digit_char,
     const IntVal& other_char, const IntVal& number_char, const IntVal& day_value,
     const IntVal& month_value, const IntVal& year_value) {
diff --git a/be/src/exprs/mask-functions.h b/be/src/exprs/mask-functions.h
index c01aea7..3307933 100644
--- a/be/src/exprs/mask-functions.h
+++ b/be/src/exprs/mask-functions.h
@@ -103,6 +103,12 @@ class MaskFunctions {
       const IntVal& char_count, const StringVal& upper_char, const StringVal& lower_char,
       const StringVal& digit_char, const IntVal& other_char,
       const StringVal& number_char);
+  // Overload for only masking other chars. So we can support patterns like
+  //   mask_show_first_n({col}, 4, -1, -1, -1, 'x')
+  static StringVal MaskShowFirstN(FunctionContext* ctx, const StringVal& val,
+      const IntVal& char_count, const IntVal& upper_char, const IntVal& lower_char,
+      const IntVal& digit_char, const StringVal& other_char);
+  // Overload that all masked chars are given as integers.
   static StringVal MaskShowFirstN(FunctionContext* ctx, const StringVal& val,
       const IntVal& char_count, const IntVal& upper_char, const IntVal& lower_char,
       const IntVal& digit_char, const IntVal& other_char, const IntVal& number_char);
@@ -146,6 +152,12 @@ class MaskFunctions {
       const IntVal& char_count, const StringVal& upper_char, const StringVal& lower_char,
       const StringVal& digit_char, const IntVal& other_char,
       const StringVal& number_char);
+  // Overload for only masking other chars. So we can support patterns like
+  //   mask_show_last_n({col}, 4, -1, -1, -1, 'x')
+  static StringVal MaskShowLastN(FunctionContext* ctx, const StringVal& val,
+      const IntVal& char_count, const IntVal& upper_char, const IntVal& lower_char,
+      const IntVal& digit_char, const StringVal& other_char);
+  // Overload that all masked chars are given as integers.
   static StringVal MaskShowLastN(FunctionContext* ctx, const StringVal& val,
       const IntVal& char_count, const IntVal& upper_char, const IntVal& lower_char,
       const IntVal& digit_char, const IntVal& other_char, const IntVal& number_char);
@@ -184,6 +196,12 @@ class MaskFunctions {
       const IntVal& char_count, const StringVal& upper_char, const StringVal& lower_char,
       const StringVal& digit_char, const IntVal& other_char,
       const StringVal& number_char);
+  // Overload for only masking other chars. So we can support patterns like
+  //   mask_first_n({col}, 4, -1, -1, -1, 'x')
+  static StringVal MaskFirstN(FunctionContext* ctx, const StringVal& val,
+      const IntVal& char_count, const IntVal& upper_char, const IntVal& lower_char,
+      const IntVal& digit_char, const StringVal& other_char);
+  // Overload that all masked chars are given as integers.
   static StringVal MaskFirstN(FunctionContext* ctx, const StringVal& val,
       const IntVal& char_count, const IntVal& upper_char, const IntVal& lower_char,
       const IntVal& digit_char, const IntVal& other_char, const IntVal& number_char);
@@ -222,6 +240,12 @@ class MaskFunctions {
       const IntVal& char_count, const StringVal& upper_char, const StringVal& lower_char,
       const StringVal& digit_char, const IntVal& other_char,
       const StringVal& number_char);
+  // Overload for only masking other chars. So we can support patterns like
+  //   mask_first_n({col}, 4, -1, -1, -1, 'x')
+  static StringVal MaskLastN(FunctionContext* ctx, const StringVal& val,
+      const IntVal& char_count, const IntVal& upper_char, const IntVal& lower_char,
+      const IntVal& digit_char, const StringVal& other_char);
+  // Overload that all masked chars are given as integers.
   static StringVal MaskLastN(FunctionContext* ctx, const StringVal& val,
       const IntVal& char_count, const IntVal& upper_char, const IntVal& lower_char,
       const IntVal& digit_char, const IntVal& other_char, const IntVal& number_char);
@@ -271,6 +295,12 @@ class MaskFunctions {
       const StringVal& digit_char, const IntVal& other_char,
       const StringVal& number_char, const IntVal& day_value, const IntVal& month_value,
       const IntVal& year_value);
+  // Overload for only masking other chars. So we can support patterns like
+  //   mask({col}, -1, -1, -1, 'x')
+  static StringVal Mask(FunctionContext* ctx, const StringVal& val,
+      const IntVal& upper_char, const IntVal& lower_char,
+      const IntVal& digit_char, const StringVal& other_char);
+  // Overload that all masked chars are given as integers.
   static StringVal Mask(FunctionContext* ctx, const StringVal& val,
       const IntVal& upper_char, const IntVal& lower_char, const IntVal& digit_char,
       const IntVal& other_char, const IntVal& number_char, const IntVal& day_value,
diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py
index 345706d..dc01349 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -828,6 +828,8 @@ visible_functions = [
   [['mask_show_first_n'], 'STRING', ['STRING', 'INT'], 'impala::MaskFunctions::MaskShowFirstN'],
   [['mask_show_first_n'], 'STRING', ['STRING', 'INT', 'STRING', 'STRING', 'STRING', 'STRING'],
       'impala::MaskFunctions::MaskShowFirstN'],
+  [['mask_show_first_n'], 'STRING', ['STRING', 'INT', 'INT', 'INT', 'INT', 'STRING'],
+      'impala::MaskFunctions::MaskShowFirstN'],
   [['mask_show_first_n'], 'STRING', ['STRING', 'INT', 'STRING', 'STRING', 'STRING', 'STRING', 'INT'],
       'impala::MaskFunctions::MaskShowFirstN'],
   [['mask_show_first_n'], 'STRING', ['STRING', 'INT', 'STRING', 'STRING', 'STRING', 'INT', 'STRING'],
@@ -856,6 +858,8 @@ visible_functions = [
   [['mask_show_last_n'], 'STRING', ['STRING', 'INT'], 'impala::MaskFunctions::MaskShowLastN'],
   [['mask_show_last_n'], 'STRING', ['STRING', 'INT', 'STRING', 'STRING', 'STRING', 'STRING'],
       'impala::MaskFunctions::MaskShowLastN'],
+  [['mask_show_last_n'], 'STRING', ['STRING', 'INT', 'INT', 'INT', 'INT', 'STRING'],
+      'impala::MaskFunctions::MaskShowLastN'],
   [['mask_show_last_n'], 'STRING', ['STRING', 'INT', 'STRING', 'STRING', 'STRING', 'STRING', 'INT'],
       'impala::MaskFunctions::MaskShowLastN'],
   [['mask_show_last_n'], 'STRING', ['STRING', 'INT', 'STRING', 'STRING', 'STRING', 'INT', 'STRING'],
@@ -886,6 +890,8 @@ visible_functions = [
   [['mask_first_n'], 'STRING', ['STRING', 'INT'], 'impala::MaskFunctions::MaskFirstN'],
   [['mask_first_n'], 'STRING', ['STRING', 'INT', 'STRING', 'STRING', 'STRING', 'STRING'],
       'impala::MaskFunctions::MaskFirstN'],
+  [['mask_first_n'], 'STRING', ['STRING', 'INT', 'INT', 'INT', 'INT', 'STRING'],
+      'impala::MaskFunctions::MaskFirstN'],
   [['mask_first_n'], 'STRING', ['STRING', 'INT', 'STRING', 'STRING', 'STRING', 'STRING', 'INT'],
       'impala::MaskFunctions::MaskFirstN'],
   [['mask_first_n'], 'STRING', ['STRING', 'INT', 'STRING', 'STRING', 'STRING', 'INT', 'STRING'],
@@ -916,6 +922,8 @@ visible_functions = [
   [['mask_last_n'], 'STRING', ['STRING', 'INT'], 'impala::MaskFunctions::MaskLastN'],
   [['mask_last_n'], 'STRING', ['STRING', 'INT', 'STRING', 'STRING', 'STRING', 'STRING'],
       'impala::MaskFunctions::MaskLastN'],
+  [['mask_last_n'], 'STRING', ['STRING', 'INT', 'INT', 'INT', 'INT', 'STRING'],
+      'impala::MaskFunctions::MaskLastN'],
   [['mask_last_n'], 'STRING', ['STRING', 'INT', 'STRING', 'STRING', 'STRING', 'STRING', 'INT'],
       'impala::MaskFunctions::MaskLastN'],
   [['mask_last_n'], 'STRING', ['STRING', 'INT', 'STRING', 'STRING', 'STRING', 'INT', 'STRING'],
@@ -945,6 +953,8 @@ visible_functions = [
   [['mask'], 'STRING', ['STRING'], 'impala::MaskFunctions::Mask'],
   [['mask'], 'STRING', ['STRING', 'STRING', 'STRING', 'STRING', 'STRING'],
       'impala::MaskFunctions::Mask'],
+  [['mask'], 'STRING', ['STRING', 'INT', 'INT', 'INT', 'STRING'],
+      'impala::MaskFunctions::Mask'],
   [['mask'], 'STRING', ['STRING', 'STRING', 'STRING', 'STRING', 'STRING', 'INT'],
       'impala::MaskFunctions::Mask'],
   [['mask'], 'STRING', ['STRING', 'STRING', 'STRING', 'STRING', 'INT', 'STRING'],
diff --git a/testdata/workloads/functional-query/queries/QueryTest/utf8-string-functions.test b/testdata/workloads/functional-query/queries/QueryTest/utf8-string-functions.test
index 1f7e4b8..84bab4b 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/utf8-string-functions.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/utf8-string-functions.test
@@ -168,3 +168,15 @@ select locate('SQL', '最快的SQL引擎跑SQL'),
 ---- TYPES
 INT,INT,INT,INT,INT,INT
 ====
+---- QUERY
+set utf8_mode=true;
+select mask('SQL引擎', 'x', 'x', 'x', 'x'),
+  mask_last_n('SQL引擎', 2, 'x', 'x', 'x', 'x'),
+  mask_show_first_n('SQL引擎', 2, 'x', 'x', 'x', 'x'),
+  mask_first_n('SQL引擎', 2, 'x', 'x', 'x', 'x'),
+  mask_show_last_n('SQL引擎', 2, 'x', 'x', 'x', 'x');
+---- RESULTS: RAW_STRING
+'xxxxx','SQLxx','SQxxx','xxL引擎','xxx引擎'
+---- TYPES
+STRING,STRING,STRING,STRING,STRING
+====

[impala] 01/02: IMPALA-9495: Support struct in select list for ORC tables

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1e21aa6b9641941fe75eb05fb519d2a4d56e5daf
Author: Gabor Kaszab <ga...@cloudera.com>
AuthorDate: Fri Apr 9 16:11:39 2021 +0200

    IMPALA-9495: Support struct in select list for ORC tables
    
    This patch implements the functionality to allow structs in the select
    list of inline views, topmost blocks. When displaying the value of a
    struct it is formatted into a JSON value and returned as a string. An
    example of such a value:
    
    SELECT struct_col FROM some_table;
    '{"int_struct_member":12,"string_struct_member":"string value"}'
    
    Another example where we query a nested struct:
    SELECT outer_struct_col FROM some_table;
    '{"inner_struct":{"string_member":"string value","int_member":12}}'
    
    Note, the conversion from struct to JSON happens on the server side
    before sending out the value in HS2 to the client. However, HS2 is
    capable of handling struct values as well so in a later change we might
    want to add a functionality to send the struct in thrift to the client
    so that the client can use the struct directly.
    
    -- Internal representation of a struct:
    When scanning a struct the rowbatch will hold the values of the
    struct's children as if they were queried one by one directly in the
    select list.
    
    E.g. Taking the following table:
    CREATE TABLE tbl (id int, s struct<a:int,b:string>) STORED AS ORC
    
    And running the following query:
    SELECT id, s FROM tbl;
    
    After scanning a row in a row batch will hold the following values:
    (note the biggest size comes first)
     1: The pointer for the string in s.b
     2: The length for the string in s.b
     3: The int value for s.a
     4: The int value of id
     5: A single null byte for all the slots: id, s, s.a, s.b
    
    The size of a struct has an effect on the order of the memory layout of
    a row batch. The struct size is calculated by summing the size of its
    fields and then the struct gets a place in the row batch to precede all
    smaller slots by size. Note, all the fields of a struct are consecutive
    to each other in the row batch. Inside a struct the order of the fields
    is also based on their size as it does in a regular case for primitives.
    
    When evaluating a struct as a SlotRef a newly introduced StructVal will
    be used to refer to the actual values of a struct in the row batch.
    This StructVal holds a vector of pointers where each pointer represents
    a member of the struct. Following the above example the StructVal would
    keep two pointers, one to point to an IntVal and one to point to a
    StringVal.
    
    -- Changes related to tuple and slot descriptors:
    When providing a struct in the select list there is going to be a
    SlotDescriptor for the struct slot in the topmost TupleDescriptor.
    Additionally, another TupleDesriptor is created to hold SlotDescriptors
    for each of the struct's children. The struct SlotDescriptor points to
    the newly introduced TupleDescriptor using 'itemTupleId'.
    The offsets for the children of the struct is calculated from the
    beginning of the topmost TupleDescriptor and not from the
    TupleDescriptor that directly holds the struct's children. The null
    indicator bytes as well are stored on the level of the topmost
    TupleDescriptor.
    
    -- Changes related to scalar expressions:
    A struct in the select list is translated into an expression tree where
    the top of this tree is a SlotRef for the struct itself and its
    children in the tree are SlotRefs for the members of the struct. When
    evaluating a struct SlotRef after the null checks the evaluation is
    delegated to the children SlotRefs.
    
    -- Restrictions:
      - Codegen support is not included in this patch.
      - Only ORC file format is supported by this patch.
      - Only HS2 client supports returning structs. Beeswax support is not
        implemented as it is going to be deprecated anyway. Currently we
        receive an error when trying to query a struct through Beeswax.
    
    -- Tests added:
      - The ORC and Parquet functional databases are extended with 3 new
        tables:
        1: A small table with one level structs, holding different
        kind of primitive types as members.
        2: A small table with 2 and 3 level nested structs.
        3: A bigger, partitioned table constructed from alltypes where all
        the columns except the 'id' column are put into a struct.
      - struct-in-select-list.test and nested-struct-in-select-list.test
        uses these new tables to query structs directly or through an
        inline view.
    
    Change-Id: I0fbe56bdcd372b72e99c0195d87a818e7fa4bc3a
    Reviewed-on: http://gerrit.cloudera.org:8080/17638
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-orc-scanner.cc                    |   8 +-
 be/src/exec/hdfs-scan-node-base.cc                 |   2 +-
 be/src/exec/hdfs-scanner.cc                        |   2 +-
 be/src/exec/orc-column-readers.cc                  |  35 +-
 be/src/exec/orc-column-readers.h                   |  13 +-
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |   4 +-
 .../parquet/parquet-collection-column-reader.cc    |   2 +-
 be/src/exprs/anyval-util.cc                        |   3 +
 be/src/exprs/expr-value.h                          |   3 +
 be/src/exprs/scalar-expr-evaluator.cc              |  29 +-
 be/src/exprs/scalar-expr-evaluator.h               |   9 +
 be/src/exprs/scalar-expr.cc                        |   8 +-
 be/src/exprs/scalar-expr.h                         |   7 +-
 be/src/exprs/scalar-expr.inline.h                  |   2 +
 be/src/exprs/slot-ref.cc                           |  50 +-
 be/src/exprs/slot-ref.h                            |  10 +
 be/src/runtime/buffered-tuple-stream-test.cc       |   6 +-
 be/src/runtime/buffered-tuple-stream.cc            |  45 +-
 be/src/runtime/buffered-tuple-stream.h             |   6 +
 be/src/runtime/descriptors.cc                      |  55 +-
 be/src/runtime/descriptors.h                       |  34 +-
 be/src/runtime/raw-value.cc                        |  91 ++++
 be/src/runtime/raw-value.h                         |  23 +
 be/src/runtime/row-batch-serialize-test.cc         |   6 +-
 be/src/runtime/tuple.cc                            |  39 +-
 be/src/runtime/tuple.h                             |   5 +
 be/src/runtime/types.cc                            |   3 +
 be/src/runtime/types.h                             | 123 +++--
 be/src/service/hs2-util.cc                         |  66 ++-
 be/src/service/impala-beeswax-server.cc            |   5 +
 be/src/service/query-result-set.cc                 |  82 +--
 be/src/udf/udf-internal.h                          |  33 ++
 be/src/udf/udf.cc                                  |  13 +
 be/src/udf/udf.h                                   |   3 +-
 be/src/util/debug-util.cc                          |   2 +-
 .../java/org/apache/impala/analysis/Analyzer.java  |   8 +-
 .../apache/impala/analysis/DescriptorTable.java    |  15 +-
 .../org/apache/impala/analysis/SelectStmt.java     |  14 +-
 .../org/apache/impala/analysis/SlotDescriptor.java |  15 +-
 .../java/org/apache/impala/analysis/SlotRef.java   | 140 ++++-
 .../java/org/apache/impala/analysis/SortInfo.java  |  16 +-
 .../java/org/apache/impala/analysis/Subquery.java  |   4 +-
 .../apache/impala/analysis/TupleDescriptor.java    | 148 ++++-
 .../java/org/apache/impala/catalog/StructType.java |  14 +
 .../java/org/apache/impala/common/TreeNode.java    |  10 +-
 .../org/apache/impala/planner/HdfsScanNode.java    |  14 +-
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  29 +-
 .../apache/impala/analysis/AnalyzeExprsTest.java   |  60 +-
 .../apache/impala/analysis/AnalyzeStmtsTest.java   | 169 +++---
 .../impala/analysis/AnalyzeUpsertStmtTest.java     |   7 +-
 testdata/ComplexTypesTbl/structs.orc               | Bin 0 -> 2744 bytes
 testdata/ComplexTypesTbl/structs.parq              | Bin 0 -> 4062 bytes
 testdata/ComplexTypesTbl/structs_nested.orc        | Bin 0 -> 1208 bytes
 testdata/ComplexTypesTbl/structs_nested.parq       | Bin 0 -> 1859 bytes
 .../functional/functional_schema_template.sql      |  64 +++
 .../datasets/functional/schema_constraints.csv     |   6 +
 .../QueryTest/compute-stats-with-structs.test      |  35 ++
 .../QueryTest/nested-struct-in-select-list.test    | 155 ++++++
 ...anger_column_masking_struct_in_select_list.test |  19 +
 .../queries/QueryTest/struct-in-select-list.test   | 602 +++++++++++++++++++++
 tests/authorization/test_ranger.py                 |  47 ++
 tests/common/test_dimensions.py                    |   5 +
 tests/query_test/test_nested_types.py              |  81 +++
 63 files changed, 2152 insertions(+), 352 deletions(-)

diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index abfab5e..d26839c 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -468,12 +468,10 @@ Status HdfsOrcScanner::ResolveColumns(const TupleDescriptor& tuple_desc,
       continue;
     }
 
-    // 'col_path'(SchemaPath) of the SlotDescriptor won't map to a STRUCT column.
-    // We only deal with collection columns (ARRAY/MAP) and primitive columns here.
-    if (slot_desc->type().IsCollectionType()) {
+    if (slot_desc->type().IsComplexType()) {
       // Recursively resolve nested columns
-      DCHECK(slot_desc->collection_item_descriptor() != nullptr);
-      const TupleDescriptor* item_tuple_desc = slot_desc->collection_item_descriptor();
+      DCHECK(slot_desc->children_tuple_descriptor() != nullptr);
+      const TupleDescriptor* item_tuple_desc = slot_desc->children_tuple_descriptor();
       RETURN_IF_ERROR(ResolveColumns(*item_tuple_desc, selected_nodes, pos_slots));
     } else {
       VLOG(3) << "Add ORC column " << node->getColumnId() << " for "
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 82bea13..a3a6338 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -886,7 +886,7 @@ void HdfsScanNodeBase::InitNullCollectionValues(const TupleDescriptor* tuple_des
       continue;
     }
     // Recursively traverse collection items.
-    const TupleDescriptor* item_desc = slot_desc->collection_item_descriptor();
+    const TupleDescriptor* item_desc = slot_desc->children_tuple_descriptor();
     if (item_desc->collection_slots().empty()) continue;
     for (int i = 0; i < slot->num_tuples; ++i) {
       int item_offset = i * item_desc->byte_size();
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 6275aa2..19a80d2 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -93,7 +93,7 @@ Status HdfsScanner::Open(ScannerContext* context) {
     for (auto& entry : *(scan_node_->thrift_dict_filter_conjuncts_map())) {
       SlotDescriptor* slot_desc = state_->desc_tbl().GetSlotDescriptor(entry.first);
       TupleId tuple_id = (slot_desc->type().IsCollectionType() ?
-          slot_desc->collection_item_descriptor()->id() :
+          slot_desc->children_tuple_descriptor()->id() :
           slot_desc->parent()->id());
       auto conjunct_evals_it = conjunct_evals_map_.find(tuple_id);
       DCHECK(conjunct_evals_it != conjunct_evals_map_.end());
diff --git a/be/src/exec/orc-column-readers.cc b/be/src/exec/orc-column-readers.cc
index 8bfa039..53495b5 100644
--- a/be/src/exec/orc-column-readers.cc
+++ b/be/src/exec/orc-column-readers.cc
@@ -48,7 +48,14 @@ OrcColumnReader* OrcColumnReader::Create(const orc::Type* node,
   DCHECK(slot_desc != nullptr);
   OrcColumnReader* reader = nullptr;
   if (node->getKind() == orc::TypeKind::STRUCT) {
-    reader = new OrcStructReader(node, slot_desc, scanner);
+    if (slot_desc->type().IsStructType() &&
+        slot_desc->children_tuple_descriptor() != nullptr) {
+      // This is the case where we should materialize the struct and its children.
+      reader = new OrcStructReader(node, slot_desc,
+          slot_desc->children_tuple_descriptor(), scanner);
+    } else {
+      reader = new OrcStructReader(node, slot_desc, scanner);
+    }
   } else if (node->getKind() == orc::TypeKind::LIST) {
     reader = new OrcListReader(node, slot_desc, scanner);
   } else if (node->getKind() == orc::TypeKind::MAP) {
@@ -291,7 +298,7 @@ bool OrcStructReader::EndOfBatch() {
 inline uint64_t OrcComplexColumnReader::GetTargetColId(
     const SlotDescriptor* slot_desc) const {
   return slot_desc->type().IsCollectionType() ?
-         GetColId(slot_desc->collection_item_descriptor()):
+         GetColId(slot_desc->children_tuple_descriptor()):
          GetColId(slot_desc);
 }
 
@@ -381,6 +388,16 @@ OrcStructReader::OrcStructReader(const orc::Type* node,
   }
 }
 
+OrcStructReader::OrcStructReader(const orc::Type* node, const SlotDescriptor* slot_desc,
+    const TupleDescriptor* children_tuple, HdfsOrcScanner* scanner)
+    : OrcComplexColumnReader(node, slot_desc, scanner) {
+  tuple_desc_ = children_tuple;
+  materialize_tuple_ = true;
+  for (SlotDescriptor* child_slot : tuple_desc_->slots()) {
+    CreateChildForSlot(node, child_slot);
+  }
+}
+
 OrcStructReader::OrcStructReader(const orc::Type* node,
     const SlotDescriptor* slot_desc, HdfsOrcScanner* scanner)
     : OrcComplexColumnReader(node, slot_desc, scanner) {
@@ -399,7 +416,7 @@ Status OrcStructReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
     return child->ReadValue(row_idx, tuple, pool);
   }
   if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
-    for (OrcColumnReader* child : children_) child->SetNullSlot(tuple);
+    SetNullSlot(tuple);
     return Status::OK();
   }
   for (OrcColumnReader* child : children_) {
@@ -472,6 +489,10 @@ void OrcStructReader::FillSyntheticRowId(ScratchTupleBatch* scratch_batch,
 
 Status OrcStructReader::ReadValueBatch(int row_idx, ScratchTupleBatch* scratch_batch,
     MemPool* pool, int scratch_batch_idx) {
+  if (materialize_tuple_) {
+    return OrcBatchedReader::ReadValueBatch(row_idx, scratch_batch, pool,
+        scratch_batch_idx);
+  }
   for (OrcColumnReader* child : children_) {
     RETURN_IF_ERROR(
         child->ReadValueBatch(row_idx, scratch_batch, pool, scratch_batch_idx));
@@ -515,7 +536,7 @@ OrcCollectionReader::OrcCollectionReader(const orc::Type* node,
     // This is a collection SlotDescriptor whose item TupleDescriptor matches
     // 'node'. We should materialize the slot (creating a CollectionValue) and its
     // collection tuples (see more in HdfsOrcScanner::AssembleCollection).
-    tuple_desc_ = slot_desc->collection_item_descriptor();
+    tuple_desc_ = slot_desc->children_tuple_descriptor();
     materialize_tuple_ = true;
   }
 }
@@ -527,7 +548,9 @@ Status OrcCollectionReader::AssembleCollection(int row_idx, Tuple* tuple, MemPoo
   }
   auto coll_slot = reinterpret_cast<CollectionValue*>(GetSlot(tuple));
   *coll_slot = CollectionValue();
-  const TupleDescriptor* tuple_desc = slot_desc_->collection_item_descriptor();
+  const TupleDescriptor* tuple_desc = slot_desc_->children_tuple_descriptor();
+  DCHECK(tuple_desc != nullptr) << "There is no children tuple for slot ID: " <<
+      slot_desc_->id();
   CollectionValueBuilder builder(coll_slot, *tuple_desc, pool, scanner_->state_);
   return scanner_->AssembleCollection(*this, row_idx, &builder);
 }
@@ -605,7 +628,7 @@ void OrcListReader::CreateChildForSlot(const orc::Type* node,
   // We have a position slot descriptor if it refers to this LIST ORC type, but it isn't
   // a collection slot.
   bool is_pos_slot = slot_col_id == node->getColumnId() &&
-                     slot_desc->collection_item_descriptor() == nullptr;
+                     slot_desc->children_tuple_descriptor() == nullptr;
   if (is_pos_slot) {
     DCHECK(pos_slot_desc_ == nullptr) << "Should have unique pos slot";
     pos_slot_desc_ = slot_desc;
diff --git a/be/src/exec/orc-column-readers.h b/be/src/exec/orc-column-readers.h
index 30fb0cc..ea87c6a 100644
--- a/be/src/exec/orc-column-readers.h
+++ b/be/src/exec/orc-column-readers.h
@@ -479,9 +479,8 @@ class OrcDecimal16ColumnReader
 /// sub queries). The root reader is always an OrcStructReader since the root of the ORC
 /// schema is represented as a STRUCT type.
 ///
-/// For collection readers, they can be divided into two kinds by whether they should
-/// materialize collection tuples (reflected by materialize_tuple_). (STRUCTs always
-/// delegate materialization to their children.)
+/// For complex readers, they can be divided into two kinds by whether they should
+/// materialize their tuples (reflected by materialize_tuple_).
 ///
 /// For collection type readers that materialize a CollectionValue they create a
 /// CollectionValueBuilder when 'ReadValue' is called. Then recursively delegate the
@@ -569,6 +568,13 @@ class OrcStructReader : public OrcComplexColumnReader {
   OrcStructReader(const orc::Type* node, const TupleDescriptor* table_tuple_desc,
       HdfsOrcScanner* scanner);
 
+  /// Constructor for a slot that materializes all it's children. E.g. when a struct is
+  /// given in the select list.
+  OrcStructReader(const orc::Type* node, const SlotDescriptor* slot_desc,
+      const TupleDescriptor* children_tuple, HdfsOrcScanner* scanner);
+
+  /// Constructor for a struct that is not mapped directly to a slot instead it refers to
+  /// a descendant column.
   OrcStructReader(const orc::Type* node, const SlotDescriptor* slot_desc,
       HdfsOrcScanner* scanner);
 
@@ -623,6 +629,7 @@ class OrcStructReader : public OrcComplexColumnReader {
 
   void SetNullSlot(Tuple* tuple) override {
     for (OrcColumnReader* child : children_) child->SetNullSlot(tuple);
+    tuple->SetNull(DCHECK_NOTNULL(slot_desc_)->null_indicator_offset());
   }
 
   void CreateChildForSlot(const orc::Type* curr_node, const SlotDescriptor* slot_desc);
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index f804e36..d1df159 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -2497,8 +2497,8 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
 
     if (col_reader->IsCollectionReader()) {
       // Recursively populate col_reader's children
-      DCHECK(slot_desc->collection_item_descriptor() != nullptr);
-      const TupleDescriptor* item_tuple_desc = slot_desc->collection_item_descriptor();
+      DCHECK(slot_desc->children_tuple_descriptor() != nullptr);
+      const TupleDescriptor* item_tuple_desc = slot_desc->children_tuple_descriptor();
       CollectionColumnReader* collection_reader =
           static_cast<CollectionColumnReader*>(col_reader);
       RETURN_IF_ERROR(CreateColumnReaders(
diff --git a/be/src/exec/parquet/parquet-collection-column-reader.cc b/be/src/exec/parquet/parquet-collection-column-reader.cc
index 8c28ab7..f35800d 100644
--- a/be/src/exec/parquet/parquet-collection-column-reader.cc
+++ b/be/src/exec/parquet/parquet-collection-column-reader.cc
@@ -119,7 +119,7 @@ bool CollectionColumnReader::ReadSlot(CollectionValue* slot, MemPool* pool) {
   // Recursively read the collection into a new CollectionValue.
   *slot = CollectionValue();
   CollectionValueBuilder builder(
-      slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_);
+      slot, *slot_desc_->children_tuple_descriptor(), pool, parent_->state_);
   bool continue_execution =
       parent_->AssembleCollection(children_, new_collection_rep_level(), &builder);
   if (!continue_execution) return false;
diff --git a/be/src/exprs/anyval-util.cc b/be/src/exprs/anyval-util.cc
index 8f4f927..fdff658 100644
--- a/be/src/exprs/anyval-util.cc
+++ b/be/src/exprs/anyval-util.cc
@@ -90,6 +90,9 @@ FunctionContext::TypeDesc AnyValUtil::ColumnTypeToTypeDesc(const ColumnType& typ
     case TYPE_DATE:
       out.type = FunctionContext::TYPE_DATE;
       break;
+    case TYPE_STRUCT:
+      out.type = FunctionContext::TYPE_STRUCT;
+      break;
     default:
       DCHECK(false) << "Unknown type: " << type;
   }
diff --git a/be/src/exprs/expr-value.h b/be/src/exprs/expr-value.h
index d9ee6b5..6773cc4 100644
--- a/be/src/exprs/expr-value.h
+++ b/be/src/exprs/expr-value.h
@@ -23,6 +23,7 @@
 #include "runtime/decimal-value.h"
 #include "runtime/string-value.inline.h"
 #include "runtime/timestamp-value.h"
+#include "udf/udf-internal.h"
 #include "util/decimal-constants.h"
 
 namespace impala {
@@ -42,6 +43,7 @@ struct ExprValue {
   Decimal8Value decimal8_val;
   Decimal16Value decimal16_val;
   CollectionValue collection_val;
+  impala_udf::StructVal struct_val;
   DateValue date_val;
 
   ExprValue()
@@ -58,6 +60,7 @@ struct ExprValue {
       decimal8_val(),
       decimal16_val(),
       collection_val(),
+      struct_val(),
       date_val(0) {
   }
 
diff --git a/be/src/exprs/scalar-expr-evaluator.cc b/be/src/exprs/scalar-expr-evaluator.cc
index 1f667da..4548b87 100644
--- a/be/src/exprs/scalar-expr-evaluator.cc
+++ b/be/src/exprs/scalar-expr-evaluator.cc
@@ -91,6 +91,12 @@ Status ScalarExprEvaluator::Create(const ScalarExpr& root, RuntimeState* state,
     DCHECK_EQ(root.fn_ctx_idx_, -1);
     DCHECK((*eval)->fn_ctxs_ptr_ == nullptr);
   }
+  if (root.type().IsStructType()) {
+    DCHECK(root.GetNumChildren() > 0);
+    Status status = Create(root.children(), state, pool, expr_perm_pool,
+        expr_results_pool, &((*eval)->childEvaluators_));
+    DCHECK((*eval)->childEvaluators_.size() == root.GetNumChildren());
+  }
   (*eval)->initialized_ = true;
   return Status::OK();
 }
@@ -113,9 +119,15 @@ void ScalarExprEvaluator::CreateFnCtxs(RuntimeState* state, const ScalarExpr& ex
   const int fn_ctx_idx = expr.fn_ctx_idx();
   const bool has_fn_ctx = fn_ctx_idx != -1;
   vector<FunctionContext::TypeDesc> arg_types;
-  for (const ScalarExpr* child : expr.children()) {
-    CreateFnCtxs(state, *child, expr_perm_pool, expr_results_pool);
-    if (has_fn_ctx) arg_types.push_back(AnyValUtil::ColumnTypeToTypeDesc(child->type()));
+  // It's not needed to create contexts for the children of structs here as Create() is
+  // called recursively for each of their children and that will take care of the context
+  // creation as well.
+  if (!expr.type().IsStructType()) {
+    for (const ScalarExpr* child : expr.children()) {
+      CreateFnCtxs(state, *child, expr_perm_pool, expr_results_pool);
+      if (has_fn_ctx) arg_types.push_back(
+          AnyValUtil::ColumnTypeToTypeDesc(child->type()));
+    }
   }
   if (has_fn_ctx) {
     FunctionContext::TypeDesc return_type =
@@ -158,6 +170,7 @@ void ScalarExprEvaluator::Close(RuntimeState* state) {
     delete fn_ctxs_[i];
   }
   fn_ctxs_.clear();
+  for (ScalarExprEvaluator* child : childEvaluators_) child->Close(state);
   // Memory allocated by 'fn_ctx_' is still in the MemPools. It's the responsibility of
   // the owners of those pools to free it.
   closed_ = true;
@@ -355,6 +368,12 @@ void* ScalarExprEvaluator::GetValue(const ScalarExpr& expr, const TupleRow* row)
       result_.collection_val.num_tuples = v.num_tuples;
       return &result_.collection_val;
     }
+    case TYPE_STRUCT: {
+      StructVal v = expr.GetStructVal(this, row);
+      if (v.is_null) return nullptr;
+      result_.struct_val = v;
+      return &result_.struct_val;
+    }
     default:
       DCHECK(false) << "Type not implemented: " << expr.type_.DebugString();
       return nullptr;
@@ -413,6 +432,10 @@ CollectionVal ScalarExprEvaluator::GetCollectionVal(const TupleRow* row) {
   return root_.GetCollectionVal(this, row);
 }
 
+StructVal ScalarExprEvaluator::GetStructVal(const TupleRow* row) {
+  return root_.GetStructVal(this, row);
+}
+
 TimestampVal ScalarExprEvaluator::GetTimestampVal(const TupleRow* row) {
   return root_.GetTimestampVal(this, row);
 }
diff --git a/be/src/exprs/scalar-expr-evaluator.h b/be/src/exprs/scalar-expr-evaluator.h
index 0f2c150..7821b76 100644
--- a/be/src/exprs/scalar-expr-evaluator.h
+++ b/be/src/exprs/scalar-expr-evaluator.h
@@ -41,6 +41,7 @@ using impala_udf::TimestampVal;
 using impala_udf::StringVal;
 using impala_udf::DecimalVal;
 using impala_udf::CollectionVal;
+using impala_udf::StructVal;
 using impala_udf::DateVal;
 
 class MemPool;
@@ -161,6 +162,7 @@ class ScalarExprEvaluator {
   DoubleVal GetDoubleVal(const TupleRow* row);
   StringVal GetStringVal(const TupleRow* row);
   CollectionVal GetCollectionVal(const TupleRow* row);
+  StructVal GetStructVal(const TupleRow* row);
   TimestampVal GetTimestampVal(const TupleRow* row);
   DecimalVal GetDecimalVal(const TupleRow* row);
   DateVal GetDateVal(const TupleRow* row);
@@ -211,6 +213,8 @@ class ScalarExprEvaluator {
   /// not strip these symbols.
   static void InitBuiltinsDummy();
 
+  std::vector<ScalarExprEvaluator*>& GetChildEvaluators() { return childEvaluators_; }
+
   static const char* LLVM_CLASS_NAME;
 
  protected:
@@ -231,6 +235,7 @@ class ScalarExprEvaluator {
 
  private:
   friend class ScalarExpr;
+  friend class SlotRef;
 
   /// FunctionContexts for nodes in this Expr tree. Created by this ScalarExprEvaluator
   /// and live in the same object pool as this evaluator (i.e. same life span as the
@@ -253,6 +258,10 @@ class ScalarExprEvaluator {
   /// This is used in interpreted path when we need to return a void*.
   ExprValue result_;
 
+  /// For a struct scalar expression there is one evaluator created for each child of
+  /// the struct. This is empty for non-struct expressions.
+  std::vector<ScalarExprEvaluator*> childEvaluators_;
+
   /// True if this evaluator came from a Clone() call. Used to manage FunctionStateScope.
   bool is_clone_ = false;
 
diff --git a/be/src/exprs/scalar-expr.cc b/be/src/exprs/scalar-expr.cc
index dafc22a..0bf2069 100644
--- a/be/src/exprs/scalar-expr.cc
+++ b/be/src/exprs/scalar-expr.cc
@@ -213,7 +213,12 @@ Status ScalarExpr::CreateNode(
 Status ScalarExpr::OpenEvaluator(FunctionContext::FunctionStateScope scope,
     RuntimeState* state, ScalarExprEvaluator* eval) const {
   for (int i = 0; i < children_.size(); ++i) {
-    RETURN_IF_ERROR(children_[i]->OpenEvaluator(scope, state, eval));
+    ScalarExprEvaluator* child_eval = eval;
+    if (type_.IsStructType()) {
+      DCHECK_EQ(children_.size(), eval->GetChildEvaluators().size());
+      child_eval = eval->GetChildEvaluators()[i];
+    }
+    RETURN_IF_ERROR(children_[i]->OpenEvaluator(scope, state, child_eval));
   }
   return Status::OK();
 }
@@ -386,6 +391,7 @@ SCALAR_EXPR_GET_VAL_INTERPRETED(TimestampVal);
 SCALAR_EXPR_GET_VAL_INTERPRETED(DecimalVal);
 SCALAR_EXPR_GET_VAL_INTERPRETED(DateVal);
 SCALAR_EXPR_GET_VAL_INTERPRETED(CollectionVal);
+SCALAR_EXPR_GET_VAL_INTERPRETED(StructVal);
 
 string ScalarExpr::DebugString(const string& expr_name) const {
   stringstream out;
diff --git a/be/src/exprs/scalar-expr.h b/be/src/exprs/scalar-expr.h
index 4c0f3b6..341a467 100644
--- a/be/src/exprs/scalar-expr.h
+++ b/be/src/exprs/scalar-expr.h
@@ -56,6 +56,7 @@ using impala_udf::StringVal;
 using impala_udf::DecimalVal;
 using impala_udf::DateVal;
 using impala_udf::CollectionVal;
+using impala_udf::StructVal;
 
 class FragmentState;
 struct LibCacheEntry;
@@ -232,6 +233,7 @@ class ScalarExpr : public Expr {
   friend class Predicate;
   friend class ScalarExprEvaluator;
   friend class ScalarFnCall;
+  friend class SlotRef;
 
   /// For BE tests
   friend class ExprTest;
@@ -242,7 +244,7 @@ class ScalarExpr : public Expr {
   /// nodes which need FunctionContext in the tree. 'next_fn_ctx_idx' is the index
   /// of the next available entry in the vector. It's updated as this function is
   /// called recursively down the tree.
-  void AssignFnCtxIdx(int* next_fn_ctx_idx);
+  virtual void AssignFnCtxIdx(int* next_fn_ctx_idx);
 
   int fn_ctx_idx() const { return fn_ctx_idx_; }
 
@@ -272,6 +274,7 @@ class ScalarExpr : public Expr {
   DoubleVal GetDoubleVal(ScalarExprEvaluator*, const TupleRow*) const;
   StringVal GetStringVal(ScalarExprEvaluator*, const TupleRow*) const;
   CollectionVal GetCollectionVal(ScalarExprEvaluator*, const TupleRow*) const;
+  StructVal GetStructVal(ScalarExprEvaluator*, const TupleRow*) const;
   TimestampVal GetTimestampVal(ScalarExprEvaluator*, const TupleRow*) const;
   DecimalVal GetDecimalVal(ScalarExprEvaluator*, const TupleRow*) const;
   DateVal GetDateVal(ScalarExprEvaluator*, const TupleRow*) const;
@@ -293,6 +296,8 @@ class ScalarExpr : public Expr {
   virtual StringVal GetStringValInterpreted(ScalarExprEvaluator*, const TupleRow*) const;
   virtual CollectionVal GetCollectionValInterpreted(
       ScalarExprEvaluator*, const TupleRow*) const;
+  virtual StructVal GetStructValInterpreted(
+      ScalarExprEvaluator*, const TupleRow*) const;
   virtual TimestampVal GetTimestampValInterpreted(
       ScalarExprEvaluator*, const TupleRow*) const;
   virtual DecimalVal GetDecimalValInterpreted(
diff --git a/be/src/exprs/scalar-expr.inline.h b/be/src/exprs/scalar-expr.inline.h
index 614d346..373ab47 100644
--- a/be/src/exprs/scalar-expr.inline.h
+++ b/be/src/exprs/scalar-expr.inline.h
@@ -37,6 +37,7 @@ namespace impala {
 /// * ScalarExpr::GetStringVal()
 /// * ScalarExpr::GetDateVal()
 /// * ScalarExpr::GetCollectionVal()
+/// * ScalarExpr::GetStructVal()
 #pragma push_macro("SCALAR_EXPR_GET_VAL")
 #define SCALAR_EXPR_GET_VAL(val_type, type_validation)                                 \
   typedef val_type (*val_type##Wrapper)(ScalarExprEvaluator*, const TupleRow*);        \
@@ -63,6 +64,7 @@ SCALAR_EXPR_GET_VAL(StringVal, type_.IsStringType()
     || type_.type == PrimitiveType::TYPE_FIXED_UDA_INTERMEDIATE);
 SCALAR_EXPR_GET_VAL(DateVal, type_.type == PrimitiveType::TYPE_DATE);
 SCALAR_EXPR_GET_VAL(CollectionVal, type_.IsCollectionType());
+SCALAR_EXPR_GET_VAL(StructVal, type_.IsStructType());
 #pragma pop_macro("SCALAR_EXPR_GET_VAL")
 
 }
diff --git a/be/src/exprs/slot-ref.cc b/be/src/exprs/slot-ref.cc
index 661c7ef..462559c 100644
--- a/be/src/exprs/slot-ref.cc
+++ b/be/src/exprs/slot-ref.cc
@@ -46,7 +46,7 @@ SlotRef::SlotRef(const TExprNode& node)
     slot_offset_(-1),  // invalid
     null_indicator_offset_(0, 0),
     slot_id_(node.slot_ref.slot_id) {
-    // slot_/null_indicator_offset_ are set in Prepare()
+    // slot_/null_indicator_offset_ are set in Init()
 }
 
 SlotRef::SlotRef(const SlotDescriptor* desc)
@@ -54,7 +54,7 @@ SlotRef::SlotRef(const SlotDescriptor* desc)
     slot_offset_(-1),
     null_indicator_offset_(0, 0),
     slot_id_(desc->id()) {
-    // slot_/null_indicator_offset_ are set in Prepare()
+    // slot_/null_indicator_offset_ are set in Init()
 }
 
 SlotRef::SlotRef(const SlotDescriptor* desc, const ColumnType& type)
@@ -62,7 +62,7 @@ SlotRef::SlotRef(const SlotDescriptor* desc, const ColumnType& type)
     slot_offset_(-1),
     null_indicator_offset_(0, 0),
     slot_id_(desc->id()) {
-    // slot_/null_indicator_offset_ are set in Prepare()
+    // slot_/null_indicator_offset_ are set in Init()
 }
 
 SlotRef::SlotRef(const ColumnType& type, int offset, const bool nullable /* = false */)
@@ -74,7 +74,7 @@ SlotRef::SlotRef(const ColumnType& type, int offset, const bool nullable /* = fa
 
 Status SlotRef::Init(
     const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) {
-  DCHECK_EQ(children_.size(), 0);
+  DCHECK(type_.IsStructType() || children_.size() == 0);
   RETURN_IF_ERROR(ScalarExpr::Init(row_desc, is_entry_point, state));
   if (slot_id_ != -1) {
     const SlotDescriptor* slot_desc = state->desc_tbl().GetSlotDescriptor(slot_id_);
@@ -85,7 +85,11 @@ Status SlotRef::Init(
       LOG(INFO) << error.str();
       return Status(error.str());
     }
-    tuple_idx_ = row_desc.GetTupleIdx(slot_desc->parent()->id());
+    if (slot_desc->parent()->isTupleOfStructSlot()) {
+      tuple_idx_ = row_desc.GetTupleIdx(slot_desc->parent()->getMasterTuple()->id());
+    } else {
+      tuple_idx_ = row_desc.GetTupleIdx(slot_desc->parent()->id());
+    }
     if (tuple_idx_ == RowDescriptor::INVALID_IDX) {
       TupleDescriptor* d =
           state->desc_tbl().GetTupleDescriptor(slot_desc->parent()->id());
@@ -95,7 +99,9 @@ Status SlotRef::Init(
       return Status(error);
     }
     DCHECK(tuple_idx_ != RowDescriptor::INVALID_IDX);
-    tuple_is_nullable_ = row_desc.TupleIsNullable(tuple_idx_);
+    if (!slot_desc->parent()->isTupleOfStructSlot()) {
+      tuple_is_nullable_ = row_desc.TupleIsNullable(tuple_idx_);
+    }
     slot_offset_ = slot_desc->tuple_offset();
     null_indicator_offset_ = slot_desc->null_indicator_offset();
   }
@@ -117,6 +123,17 @@ string SlotRef::DebugString() const {
   return out.str();
 }
 
+void SlotRef::AssignFnCtxIdx(int* next_fn_ctx_idx) {
+  if (!type_.IsStructType()) {
+    ScalarExpr::AssignFnCtxIdx(next_fn_ctx_idx);
+    return;
+  }
+  fn_ctx_idx_start_ = *next_fn_ctx_idx;
+  fn_ctx_idx_ = 0;
+  fn_ctx_idx_end_ = 1;
+  for (ScalarExpr* child : children()) child->AssignFnCtxIdx(next_fn_ctx_idx);
+}
+
 // There are four possible cases we may generate:
 //   1. Tuple is non-nullable and slot is non-nullable
 //   2. Tuple is non-nullable and slot is nullable
@@ -451,4 +468,25 @@ CollectionVal SlotRef::GetCollectionValInterpreted(
   return CollectionVal(coll_value->ptr, coll_value->num_tuples);
 }
 
+StructVal SlotRef::GetStructValInterpreted(
+    ScalarExprEvaluator* eval, const TupleRow* row) const {
+  DCHECK(type_.IsStructType() && children_.size() > 0);
+  DCHECK_EQ(children_.size(), eval->GetChildEvaluators().size());
+  Tuple* t = row->GetTuple(tuple_idx_);
+  if (t == nullptr || t->IsNull(null_indicator_offset_)) return StructVal::null();
+
+  FunctionContext* fn_ctx = eval->fn_context(fn_ctx_idx_);
+  DCHECK(fn_ctx != nullptr);
+  StructVal struct_val(fn_ctx, children_.size());
+  vector<ScalarExprEvaluator*>& child_evaluators = eval->GetChildEvaluators();
+  for (int i = 0; i < child_evaluators.size(); ++i) {
+    ScalarExpr* child_expr = children_[i];
+    ScalarExprEvaluator* child_eval = child_evaluators[i];
+    DCHECK(child_eval != nullptr);
+    void* child_val = child_eval->GetValue(*child_expr, row);
+    struct_val.addChild(child_val, i);
+  }
+  return struct_val;
+}
+
 } // namespace impala
diff --git a/be/src/exprs/slot-ref.h b/be/src/exprs/slot-ref.h
index 8f817a4..acbbd06 100644
--- a/be/src/exprs/slot-ref.h
+++ b/be/src/exprs/slot-ref.h
@@ -58,14 +58,24 @@ class SlotRef : public ScalarExpr {
   virtual int GetSlotIds(std::vector<SlotId>* slot_ids) const override;
   const SlotId& slot_id() const { return slot_id_; }
   static const char* LLVM_CLASS_NAME;
+  int GetTupleIdx() const { return tuple_idx_; }
+  NullIndicatorOffset GetNullIndicatorOffset() const { return null_indicator_offset_; }
+  int GetSlotOffset() const { return slot_offset_; }
 
  protected:
   friend class ScalarExpr;
   friend class ScalarExprEvaluator;
 
+  /// For struct SlotRefs we need a FunctionContext so that we can use it later for
+  /// allocating memory to StructVals.
+  /// If this SlotRef is not a struct then the same function in ScalarExpr is called.
+  virtual void AssignFnCtxIdx(int* next_fn_ctx_idx) override;
+
   GENERATE_GET_VAL_INTERPRETED_OVERRIDES_FOR_ALL_SCALAR_TYPES
   virtual CollectionVal GetCollectionValInterpreted(
       ScalarExprEvaluator*, const TupleRow*) const override;
+  virtual StructVal GetStructValInterpreted(
+      ScalarExprEvaluator*, const TupleRow*) const override;
 
  private:
   int tuple_idx_;  // within row
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index dfdfeba..acec1fb 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -1878,7 +1878,7 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
     tuple0->SetNull(tuple_descs[0]->slots()[1]->null_indicator_offset());
     tuple1->SetNull(tuple_descs[1]->slots()[0]->null_indicator_offset());
     const SlotDescriptor* array_slot_desc = tuple_descs[0]->slots()[0];
-    const TupleDescriptor* item_desc = array_slot_desc->collection_item_descriptor();
+    const TupleDescriptor* item_desc = array_slot_desc->children_tuple_descriptor();
 
     int array_len = array_lens[array_len_index++ % num_array_lens];
     CollectionValue* cv = tuple0->GetCollectionSlot(array_slot_desc->tuple_offset());
@@ -1932,7 +1932,7 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
       ASSERT_TRUE(tuple0->IsNull(tuple_descs[0]->slots()[1]->null_indicator_offset()));
       ASSERT_TRUE(tuple1->IsNull(tuple_descs[1]->slots()[0]->null_indicator_offset()));
 
-      const TupleDescriptor* item_desc = array_slot_desc->collection_item_descriptor();
+      const TupleDescriptor* item_desc = array_slot_desc->children_tuple_descriptor();
       int expected_array_len = array_lens[array_len_index++ % num_array_lens];
       CollectionValue* cv = tuple0->GetCollectionSlot(array_slot_desc->tuple_offset());
       ASSERT_EQ(expected_array_len, cv->num_tuples);
@@ -1987,7 +1987,7 @@ TEST_F(ArrayTupleStreamTest, TestComputeRowSize) {
   // Tuple 0 has an array.
   int expected_row_size = tuple_null_indicator_bytes + array_desc_->GetRowSize();
   const SlotDescriptor* array_slot = tuple_descs[0]->slots()[0];
-  const TupleDescriptor* item_desc = array_slot->collection_item_descriptor();
+  const TupleDescriptor* item_desc = array_slot->children_tuple_descriptor();
   int array_len = 128;
   CollectionValue* cv = tuple0->GetCollectionSlot(array_slot->tuple_offset());
   CollectionValueBuilder builder(
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index 01edc84..64a47e3 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -73,29 +73,30 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
     const TupleDescriptor* tuple_desc = desc_->tuple_descriptors()[i];
     const int tuple_byte_size = tuple_desc->byte_size();
     fixed_tuple_sizes_.push_back(tuple_byte_size);
+    CollectInlinedSlots(tuple_desc, ext_varlen_slots, i);
+  }
+}
 
-    vector<SlotDescriptor*> tuple_string_slots;
-    vector<SlotDescriptor*> tuple_coll_slots;
-    for (int j = 0; j < tuple_desc->slots().size(); ++j) {
-      SlotDescriptor* slot = tuple_desc->slots()[j];
-      if (!slot->type().IsVarLenType()) continue;
-      if (ext_varlen_slots.find(slot->id()) == ext_varlen_slots.end()) {
-        if (slot->type().IsVarLenStringType()) {
-          tuple_string_slots.push_back(slot);
-        } else {
-          DCHECK(slot->type().IsCollectionType());
-          tuple_coll_slots.push_back(slot);
-        }
-      }
-    }
-    if (!tuple_string_slots.empty()) {
-      inlined_string_slots_.push_back(make_pair(i, tuple_string_slots));
+void BufferedTupleStream::CollectInlinedSlots(const TupleDescriptor* tuple_desc,
+    const set<SlotId>& ext_varlen_slots, int tuple_idx) {
+  vector<SlotDescriptor*> inlined_string_slots;
+  vector<SlotDescriptor*> inlined_coll_slots;
+  for (SlotDescriptor* slot : tuple_desc->string_slots()) {
+    if (ext_varlen_slots.find(slot->id()) == ext_varlen_slots.end()) {
+      inlined_string_slots.push_back(slot);
     }
-
-    if (!tuple_coll_slots.empty()) {
-      inlined_coll_slots_.push_back(make_pair(i, tuple_coll_slots));
+  }
+  for (SlotDescriptor* slot : tuple_desc->collection_slots()) {
+    if (ext_varlen_slots.find(slot->id()) == ext_varlen_slots.end()) {
+      inlined_coll_slots.push_back(slot);
     }
   }
+  if (!inlined_string_slots.empty()) {
+    inlined_string_slots_.push_back(make_pair(tuple_idx, inlined_string_slots));
+  }
+  if (!inlined_coll_slots.empty()) {
+    inlined_coll_slots_.push_back(make_pair(tuple_idx, inlined_coll_slots));
+  }
 }
 
 void BufferedTupleStream::CheckConsistencyFull(const ReadIterator& read_it) const {
@@ -919,7 +920,7 @@ void BufferedTupleStream::FixUpCollectionsForRead(
     if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
 
     CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset());
-    const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor();
+    const TupleDescriptor& item_desc = *slot_desc->children_tuple_descriptor();
     int coll_byte_size = cv->num_tuples * item_desc.byte_size();
     cv->ptr = reinterpret_cast<uint8_t*>(read_iter->read_ptr_);
     read_iter->AdvanceReadPtr(coll_byte_size);
@@ -964,7 +965,7 @@ int64_t BufferedTupleStream::ComputeRowSize(TupleRow* row) const noexcept {
     for (auto it = slots.begin(); it != slots.end(); ++it) {
       if (tuple->IsNull((*it)->null_indicator_offset())) continue;
       CollectionValue* cv = tuple->GetCollectionSlot((*it)->tuple_offset());
-      const TupleDescriptor& item_desc = *(*it)->collection_item_descriptor();
+      const TupleDescriptor& item_desc = *(*it)->children_tuple_descriptor();
       size += cv->num_tuples * item_desc.byte_size();
 
       if (!item_desc.HasVarlenSlots()) continue;
@@ -1117,7 +1118,7 @@ bool BufferedTupleStream::CopyCollections(const Tuple* tuple,
   for (const SlotDescriptor* slot_desc : collection_slots) {
     if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
     const CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset());
-    const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor();
+    const TupleDescriptor& item_desc = *slot_desc->children_tuple_descriptor();
     if (LIKELY(cv->num_tuples > 0)) {
       int coll_byte_size = cv->num_tuples * item_desc.byte_size();
       if (UNLIKELY(*data + coll_byte_size > data_end)) return false;
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
index bba6479..c65248b 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -667,6 +667,12 @@ class BufferedTupleStream {
   /// kept pinned until the caller calls UnpinStream().
   bool pinned_ = true;
 
+  /// Populates 'inlined_string_slots_' and 'inlined_coll_slots_' under the index of
+  /// 'tuple_idx' with the inlined string and collection slots from the slots of
+  /// 'tuple_desc'. Excludes the slots in 'ext_varlen_slot'.
+  void CollectInlinedSlots(const TupleDescriptor* tuple_desc,
+      const std::set<SlotId>& ext_varlen_slots, int tuple_idx);
+
   /// Return true if 'page' is the current page for the embedded read iterator.
   bool is_read_page(const Page* page) const {
     return read_it_.read_page_ != pages_.end() && &*read_it_.read_page_ == page;
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index 607a675..eb5bd1d 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -98,24 +98,23 @@ ostream& operator<<(ostream& os, const NullIndicatorOffset& null_indicator) {
 }
 
 SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc,
-    const TupleDescriptor* parent, const TupleDescriptor* collection_item_descriptor)
+    const TupleDescriptor* parent, const TupleDescriptor* children_tuple_descriptor)
   : id_(tdesc.id),
     type_(ColumnType::FromThrift(tdesc.slotType)),
     parent_(parent),
-    collection_item_descriptor_(collection_item_descriptor),
+    children_tuple_descriptor_(children_tuple_descriptor),
     col_path_(tdesc.materializedPath),
     tuple_offset_(tdesc.byteOffset),
     null_indicator_offset_(tdesc.nullIndicatorByte, tdesc.nullIndicatorBit),
     slot_idx_(tdesc.slotIdx),
     slot_size_(type_.GetSlotSize()) {
-  DCHECK_NE(type_.type, TYPE_STRUCT);
   DCHECK(parent_ != nullptr) << tdesc.parent;
-  if (type_.IsCollectionType()) {
+  if (type_.IsComplexType()) {
     DCHECK(tdesc.__isset.itemTupleId);
-    DCHECK(collection_item_descriptor_ != nullptr) << tdesc.itemTupleId;
+    DCHECK(children_tuple_descriptor_ != nullptr) << tdesc.itemTupleId;
   } else {
     DCHECK(!tdesc.__isset.itemTupleId);
-    DCHECK(collection_item_descriptor == nullptr);
+    DCHECK(children_tuple_descriptor == nullptr);
   }
 }
 
@@ -138,8 +137,8 @@ string SlotDescriptor::DebugString() const {
     out << col_path_[i];
   }
   out << "]";
-  if (collection_item_descriptor_ != nullptr) {
-    out << " collection_item_tuple_id=" << collection_item_descriptor_->id();
+  if (children_tuple_descriptor_ != nullptr) {
+    out << " children_tuple_id=" << children_tuple_descriptor_->id();
   }
   out << " offset=" << tuple_offset_ << " null=" << null_indicator_offset_.DebugString()
       << " slot_idx=" << slot_idx_ << " field_idx=" << slot_idx_
@@ -156,6 +155,10 @@ bool SlotDescriptor::LayoutEquals(const SlotDescriptor& other_desc) const {
   return true;
 }
 
+inline bool SlotDescriptor::IsChildOfStruct() const {
+  return parent_->isTupleOfStructSlot();
+}
+
 ColumnDescriptor::ColumnDescriptor(const TColumnDescriptor& tdesc)
   : name_(tdesc.name),
     type_(ColumnType::FromThrift(tdesc.type)) {
@@ -342,8 +345,12 @@ TupleDescriptor::TupleDescriptor(const TTupleDescriptor& tdesc)
 void TupleDescriptor::AddSlot(SlotDescriptor* slot) {
   slots_.push_back(slot);
   if (slot->type().IsVarLenStringType()) {
-    string_slots_.push_back(slot);
-    has_varlen_slots_ = true;
+    TupleDescriptor* target_tuple = this;
+    // If this is a tuple for struct children then we populate the 'string_slots_' of
+    // the topmost tuple and not this one.
+    if (isTupleOfStructSlot()) target_tuple = master_tuple_;
+    target_tuple->string_slots_.push_back(slot);
+    target_tuple->has_varlen_slots_ = true;
   }
   if (slot->type().IsCollectionType()) {
     collection_slots_.push_back(slot);
@@ -351,16 +358,6 @@ void TupleDescriptor::AddSlot(SlotDescriptor* slot) {
   }
 }
 
-bool TupleDescriptor::ContainsStringData() const {
-  if (!string_slots_.empty()) return true;
-  for (int i = 0; i < collection_slots_.size(); ++i) {
-    if (collection_slots_[i]->collection_item_descriptor_->ContainsStringData()) {
-      return true;
-    }
-  }
-  return false;
-}
-
 string TupleDescriptor::DebugString() const {
   stringstream out;
   out << "Tuple(id=" << id_ << " size=" << byte_size_;
@@ -614,8 +611,7 @@ Status DescriptorTbl::CreateInternal(ObjectPool* pool, const TDescriptorTable& t
     (*tbl)->tbl_desc_map_[tdesc.id] = desc;
   }
 
-  for (size_t i = 0; i < thrift_tbl.tupleDescriptors.size(); ++i) {
-    const TTupleDescriptor& tdesc = thrift_tbl.tupleDescriptors[i];
+  for (const TTupleDescriptor& tdesc : thrift_tbl.tupleDescriptors) {
     TupleDescriptor* desc = pool->Add(new TupleDescriptor(tdesc));
     // fix up table pointer
     if (tdesc.__isset.tableId) {
@@ -624,15 +620,22 @@ Status DescriptorTbl::CreateInternal(ObjectPool* pool, const TDescriptorTable& t
     (*tbl)->tuple_desc_map_[tdesc.id] = desc;
   }
 
-  for (size_t i = 0; i < thrift_tbl.slotDescriptors.size(); ++i) {
-    const TSlotDescriptor& tdesc = thrift_tbl.slotDescriptors[i];
+  for (const TSlotDescriptor& tdesc : thrift_tbl.slotDescriptors) {
     // Tuple descriptors are already populated in tbl
     TupleDescriptor* parent = (*tbl)->GetTupleDescriptor(tdesc.parent);
     DCHECK(parent != nullptr);
-    TupleDescriptor* collection_item_descriptor = tdesc.__isset.itemTupleId ?
+    TupleDescriptor* children_tuple_descriptor = tdesc.__isset.itemTupleId ?
         (*tbl)->GetTupleDescriptor(tdesc.itemTupleId) : nullptr;
     SlotDescriptor* slot_d = pool->Add(
-        new SlotDescriptor(tdesc, parent, collection_item_descriptor));
+        new SlotDescriptor(tdesc, parent, children_tuple_descriptor));
+    if (slot_d->type().IsStructType() && children_tuple_descriptor != nullptr &&
+        children_tuple_descriptor->getMasterTuple() == nullptr) {
+      TupleDescriptor* master_tuple = parent;
+      // If this struct is nested into another structs then get the topmost tuple for the
+      // master.
+      if (parent->getMasterTuple() != nullptr) master_tuple = parent->getMasterTuple();
+      children_tuple_descriptor->setMasterTuple(master_tuple);
+    }
     (*tbl)->slot_desc_map_[tdesc.id] = slot_d;
     parent->AddSlot(slot_d);
   }
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 4cbaacb..3c9d9e7 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -115,8 +115,8 @@ class SlotDescriptor {
   SlotId id() const { return id_; }
   const ColumnType& type() const { return type_; }
   const TupleDescriptor* parent() const { return parent_; }
-  const TupleDescriptor* collection_item_descriptor() const {
-    return collection_item_descriptor_;
+  const TupleDescriptor* children_tuple_descriptor() const {
+    return children_tuple_descriptor_;
   }
   /// Returns the column index of this slot, including partition keys.
   /// (e.g., col_pos - num_partition_keys = the table column this slot corresponds to)
@@ -168,6 +168,8 @@ class SlotDescriptor {
   void CodegenSetNullIndicator(LlvmCodeGen* codegen, LlvmBuilder* builder,
       llvm::Value* tuple, llvm::Value* is_null) const;
 
+  /// Returns true if this slot is a child of a struct slot.
+  inline bool IsChildOfStruct() const;
  private:
   friend class DescriptorTbl;
   friend class TupleDescriptor;
@@ -175,8 +177,8 @@ class SlotDescriptor {
   const SlotId id_;
   const ColumnType type_;
   const TupleDescriptor* parent_;
-  /// Non-NULL only for collection slots
-  const TupleDescriptor* collection_item_descriptor_;
+  /// Non-NULL only for complex type slots
+  const TupleDescriptor* children_tuple_descriptor_;
   // TODO for 2.3: rename to materialized_path_
   const SchemaPath col_path_;
   const int tuple_offset_;
@@ -189,9 +191,9 @@ class SlotDescriptor {
   /// the byte size of this slot.
   const int slot_size_;
 
-  /// collection_item_descriptor should be non-NULL iff this is a collection slot
+  /// 'children_tuple_descriptor' should be non-NULL iff this is a complex type slot.
   SlotDescriptor(const TSlotDescriptor& tdesc, const TupleDescriptor* parent,
-      const TupleDescriptor* collection_item_descriptor);
+      const TupleDescriptor* children_tuple_descriptor);
 
   /// Generate LLVM code at the insert position of 'builder' to get the i8 value of
   /// the byte containing 'null_indicator_offset' in 'tuple'. If 'null_byte_ptr' is
@@ -452,8 +454,10 @@ class TupleDescriptor {
   TupleId id() const { return id_; }
   std::string DebugString() const;
 
-  /// Returns true if this tuple or any nested collection item tuples have string slots.
-  bool ContainsStringData() const;
+  bool isTupleOfStructSlot() const { return master_tuple_ != nullptr; }
+
+  TupleDescriptor* getMasterTuple() const { return master_tuple_; }
+  void setMasterTuple(TupleDescriptor* desc) { master_tuple_ = desc; }
 
   /// Return true if the physical layout of this descriptor matches that of other_desc,
   /// but not necessarily the id.
@@ -501,6 +505,20 @@ class TupleDescriptor {
   /// collection, empty otherwise.
   SchemaPath tuple_path_;
 
+  /// If this tuple represents the children of a struct slot then 'master_tuple_' is the
+  /// tuple that holds the topmost struct slot. For example:
+  /// - Tuple0
+  ///     - Slot1 e.g. INT slot
+  ///     - Slot2 e.g. STRUCT slot
+  ///         - Tuple1 (Holds the children of the struct)
+  ///             - Slot3 e.g. INT child of the STRUCT
+  ///             - Slot4 e.g. STRING child of the STRUCT
+  /// In the above example the 'master_tuple_' for Tuple1 (that is the struct's tuple to
+  /// hold its children) would be Tuple0. In case the STRUCT in Slot2 was a nested struct
+  /// in any depth then the 'master_tuple_' for any of the tuples under Slot2 would be
+  /// again Tuple0.
+  TupleDescriptor* master_tuple_ = nullptr;
+
   TupleDescriptor(const TTupleDescriptor& tdesc);
   void AddSlot(SlotDescriptor* slot);
 
diff --git a/be/src/runtime/raw-value.cc b/be/src/runtime/raw-value.cc
index bf81869..30c941f 100644
--- a/be/src/runtime/raw-value.cc
+++ b/be/src/runtime/raw-value.cc
@@ -24,12 +24,15 @@
 #include "runtime/raw-value.inline.h"
 #include "runtime/string-value.inline.h"
 #include "runtime/tuple.h"
+#include "udf/udf-internal.h"
 #include "util/ubsan.h"
 
 #include "common/names.h"
 
 namespace impala {
 
+using impala_udf::StructVal;
+
 const int RawValue::ASCII_PRECISION;
 constexpr double RawValue::CANONICAL_DOUBLE_NAN;
 constexpr float RawValue::CANONICAL_FLOAT_NAN;
@@ -194,6 +197,10 @@ void RawValue::Write(const void* value, void* dst, const ColumnType& type,
       dest->ptr = src->ptr;
       break;
     }
+    case TYPE_STRUCT: {
+      // Structs should be handled by a different Write() function within this class.
+      DCHECK(false);
+    }
     default:
       DCHECK(false) << "RawValue::Write(): bad type: " << type.DebugString();
   }
@@ -209,6 +216,69 @@ void RawValue::Write(const void* value, Tuple* tuple, const SlotDescriptor* slot
   }
 }
 
+template <bool COLLECT_STRING_VALS>
+void RawValue::Write(const void* value, Tuple* tuple,
+    const SlotDescriptor* slot_desc, MemPool* pool,
+    vector<StringValue*>* string_values) {
+  DCHECK(value != nullptr && tuple != nullptr && slot_desc != nullptr &&
+      string_values != nullptr);
+  DCHECK(string_values->size() == 0);
+
+  if (slot_desc->type().IsStructType()) {
+    WriteStruct<COLLECT_STRING_VALS>(value, tuple, slot_desc, pool, string_values);
+  } else {
+    WritePrimitive<COLLECT_STRING_VALS>(value, tuple, slot_desc, pool, string_values);
+  }
+}
+
+template <bool COLLECT_STRING_VALS>
+void RawValue::WriteStruct(const void* value, Tuple* tuple,
+    const SlotDescriptor* slot_desc, MemPool* pool,
+    vector<StringValue*>* string_values) {
+  DCHECK(tuple != nullptr);
+  DCHECK(slot_desc->type().IsStructType());
+  DCHECK(slot_desc->children_tuple_descriptor() != nullptr);
+  if (value == nullptr) {
+    tuple->SetStructToNull(slot_desc);
+    return;
+  }
+  const StructVal* src = reinterpret_cast<const StructVal*>(value);
+  const TupleDescriptor* children_tuple_desc = slot_desc->children_tuple_descriptor();
+  DCHECK_EQ(src->num_children, children_tuple_desc->slots().size());
+
+  for (int i = 0; i < src->num_children; ++i) {
+    SlotDescriptor* child_slot = children_tuple_desc->slots()[i];
+    uint8_t* src_child = src->ptr[i];
+    if (child_slot->type().IsStructType()) {
+      // Recursive call in case of nested structs.
+      WriteStruct<COLLECT_STRING_VALS>(src_child, tuple, child_slot, pool,
+          string_values);
+      continue;
+    }
+    if (src_child == nullptr) {
+      tuple->SetNull(child_slot->null_indicator_offset());
+    } else {
+      WritePrimitive<COLLECT_STRING_VALS>(src_child, tuple, child_slot, pool,
+          string_values);
+    }
+  }
+}
+
+template <bool COLLECT_STRING_VALS>
+void RawValue::WritePrimitive(const void* value, Tuple* tuple,
+    const SlotDescriptor* slot_desc, MemPool* pool,
+    vector<StringValue*>* string_values) {
+  DCHECK(value != nullptr && tuple != nullptr && slot_desc != nullptr &&
+      string_values != nullptr);
+  DCHECK(!slot_desc->type().IsComplexType());
+
+  void* dst = tuple->GetSlot(slot_desc->tuple_offset());
+  Write(value, dst, slot_desc->type(), pool);
+  if (COLLECT_STRING_VALS && slot_desc->type().IsVarLenStringType()) {
+    string_values->push_back(reinterpret_cast<StringValue*>(dst));
+  }
+}
+
 void RawValue::PrintValue(
     const void* value, const ColumnType& type, int scale, std::stringstream* stream) {
   if (value == NULL) {
@@ -299,4 +369,25 @@ void RawValue::PrintValue(
   // Undo setting stream to fixed
   stream->flags(old_flags);
 }
+
+template void RawValue::Write<true>(const void* value, Tuple* tuple,
+    const SlotDescriptor* slot_desc, MemPool* pool,
+    std::vector<StringValue*>* string_values);
+template void RawValue::Write<false>(const void* value, Tuple* tuple,
+    const SlotDescriptor* slot_desc, MemPool* pool,
+    std::vector<StringValue*>* string_values);
+
+template void RawValue::WriteStruct<true>(const void* value, Tuple* tuple,
+      const SlotDescriptor* slot_desc, MemPool* pool,
+      std::vector<StringValue*>* string_values);
+template void RawValue::WriteStruct<false>(const void* value, Tuple* tuple,
+      const SlotDescriptor* slot_desc, MemPool* pool,
+      std::vector<StringValue*>* string_values);
+
+template void RawValue::WritePrimitive<true>(const void* value, Tuple* tuple,
+      const SlotDescriptor* slot_desc, MemPool* pool,
+      std::vector<StringValue*>* string_values);
+template void RawValue::WritePrimitive<false>(const void* value, Tuple* tuple,
+      const SlotDescriptor* slot_desc, MemPool* pool,
+      std::vector<StringValue*>* string_values);
 }
diff --git a/be/src/runtime/raw-value.h b/be/src/runtime/raw-value.h
index d2251a5..570d58a 100644
--- a/be/src/runtime/raw-value.h
+++ b/be/src/runtime/raw-value.h
@@ -27,6 +27,7 @@ namespace impala {
 
 class MemPool;
 class SlotDescriptor;
+struct StringValue;
 class Tuple;
 
 /// Useful utility functions for runtime values (which are passed around as void*).
@@ -125,6 +126,13 @@ class RawValue {
   /// src must be non-NULL.
   static void Write(const void* src, void* dst, const ColumnType& type, MemPool* pool);
 
+  /// Wrapper function for Write() to handle struct slots and its children. Additionally,
+  /// gathers the string slots of the slot tree into 'string_values'.
+  template <bool COLLECT_STRING_VALS>
+  static void Write(const void* value, Tuple* tuple,
+      const SlotDescriptor* slot_desc, MemPool* pool,
+    std::vector<StringValue*>* string_values);
+
   /// Returns true if v1 == v2.
   /// This is more performant than Compare() == 0 for string equality, mostly because of
   /// the length comparison check.
@@ -146,5 +154,20 @@ class RawValue {
 
   // Returns positive zero for floating point types.
   static inline const void* PositiveFloatingZero(const ColumnType& type);
+
+private:
+  /// Recursive helper function for Write() to handle struct slots.
+  template <bool COLLECT_STRING_VALS>
+  static void WriteStruct(const void* value, Tuple* tuple,
+      const SlotDescriptor* slot_desc, MemPool* pool,
+      std::vector<StringValue*>* string_values);
+
+  /// Gets the destination slot from 'tuple' and 'slot_desc', writes value to this slot
+  /// using Write(). Collects pointer of the string slots to 'string_values'. 'slot_desc'
+  /// has to be primitive type.
+  template <bool COLLECT_STRING_VALS>
+  static void WritePrimitive(const void* value, Tuple* tuple,
+      const SlotDescriptor* slot_desc, MemPool* pool,
+      std::vector<StringValue*>* string_values);
 };
 }
diff --git a/be/src/runtime/row-batch-serialize-test.cc b/be/src/runtime/row-batch-serialize-test.cc
index 4b99b87..83836f8 100644
--- a/be/src/runtime/row-batch-serialize-test.cc
+++ b/be/src/runtime/row-batch-serialize-test.cc
@@ -212,7 +212,7 @@ class RowBatchSerializeTest : public testing::Test {
       }
 
       if (type.IsCollectionType()) {
-        const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor();
+        const TupleDescriptor& item_desc = *slot_desc->children_tuple_descriptor();
         CollectionValue* coll_value = reinterpret_cast<CollectionValue*>(slot);
         CollectionValue* deserialized_coll_value =
             reinterpret_cast<CollectionValue*>(deserialized_slot);
@@ -259,7 +259,7 @@ class RowBatchSerializeTest : public testing::Test {
         break;
       }
       case TYPE_ARRAY: {
-        const TupleDescriptor* item_desc = slot_desc.collection_item_descriptor();
+        const TupleDescriptor* item_desc = slot_desc.children_tuple_descriptor();
         int array_len = rand() % (MAX_ARRAY_LEN + 1);
         CollectionValue cv;
         CollectionValueBuilder builder(&cv, *item_desc, pool, runtime_state_, array_len);
@@ -721,7 +721,7 @@ TEST_F(RowBatchSerializeTest, DedupPathologicalFull) {
   // The last tuple is a duplicated array with a large string inside.
   const TupleDescriptor* array_tuple_desc = row_desc.tuple_descriptors()[array_tuple_idx];
   const SlotDescriptor* array_slot_desc = array_tuple_desc->slots()[0];
-  const TupleDescriptor* array_item_desc = array_slot_desc->collection_item_descriptor();
+  const TupleDescriptor* array_item_desc = array_slot_desc->children_tuple_descriptor();
   const SlotDescriptor* string_slot_desc = array_item_desc->slots()[0];
   MemPool* pool = batch->tuple_data_pool();
   for (int i = 0; i < num_distinct_array_tuples; ++i) {
diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc
index e5688d3..1b74971 100644
--- a/be/src/runtime/tuple.cc
+++ b/be/src/runtime/tuple.cc
@@ -72,7 +72,7 @@ int64_t Tuple::VarlenByteSize(const TupleDescriptor& desc) const {
     if (IsNull((*slot)->null_indicator_offset())) continue;
     const CollectionValue* coll_value = GetCollectionSlot((*slot)->tuple_offset());
     uint8_t* coll_data = coll_value->ptr;
-    const TupleDescriptor& item_desc = *(*slot)->collection_item_descriptor();
+    const TupleDescriptor& item_desc = *(*slot)->children_tuple_descriptor();
     for (int i = 0; i < coll_value->num_tuples; ++i) {
       result += reinterpret_cast<Tuple*>(coll_data)->TotalByteSize(item_desc);
       coll_data += item_desc.byte_size();
@@ -112,7 +112,7 @@ void Tuple::DeepCopyVarlenData(const TupleDescriptor& desc, MemPool* pool) {
     DCHECK((*slot)->type().IsCollectionType());
     if (IsNull((*slot)->null_indicator_offset())) continue;
     CollectionValue* cv = GetCollectionSlot((*slot)->tuple_offset());
-    const TupleDescriptor* item_desc = (*slot)->collection_item_descriptor();
+    const TupleDescriptor* item_desc = (*slot)->children_tuple_descriptor();
     int coll_byte_size = cv->num_tuples * item_desc->byte_size();
     uint8_t* coll_data = reinterpret_cast<uint8_t*>(pool->Allocate(coll_byte_size));
     memcpy(coll_data, cv->ptr, coll_byte_size);
@@ -156,7 +156,7 @@ void Tuple::DeepCopyVarlenData(const TupleDescriptor& desc, char** data, int* of
     if (IsNull((*slot)->null_indicator_offset())) continue;
 
     CollectionValue* coll_value = GetCollectionSlot((*slot)->tuple_offset());
-    const TupleDescriptor& item_desc = *(*slot)->collection_item_descriptor();
+    const TupleDescriptor& item_desc = *(*slot)->children_tuple_descriptor();
     int coll_byte_size = coll_value->num_tuples * item_desc.byte_size();
     memcpy(*data, coll_value->ptr, coll_byte_size);
     uint8_t* coll_data = reinterpret_cast<uint8_t*>(*data);
@@ -197,7 +197,7 @@ void Tuple::ConvertOffsetsToPointers(const TupleDescriptor& desc, uint8_t* tuple
     coll_value->ptr = tuple_data + offset;
 
     uint8_t* coll_data = coll_value->ptr;
-    const TupleDescriptor& item_desc = *(*slot)->collection_item_descriptor();
+    const TupleDescriptor& item_desc = *(*slot)->children_tuple_descriptor();
     for (int i = 0; i < coll_value->num_tuples; ++i) {
       reinterpret_cast<Tuple*>(coll_data)->ConvertOffsetsToPointers(
           item_desc, tuple_data);
@@ -233,20 +233,35 @@ void Tuple::MaterializeExprs(TupleRow* row, const TupleDescriptor& desc,
         slot_desc->type() == evals[i]->root().type());
     void* src = evals[i]->GetValue(row);
     if (src != NULL) {
-      void* dst = GetSlot(slot_desc->tuple_offset());
-      RawValue::Write(src, dst, slot_desc->type(), pool);
-      if (COLLECT_STRING_VALS && slot_desc->type().IsVarLenStringType()) {
-        StringValue* string_val = reinterpret_cast<StringValue*>(dst);
-        *(non_null_string_values++) = string_val;
-        *total_string_lengths += string_val->len;
-        ++(*num_non_null_string_values);
+      vector<StringValue*> string_values;
+      RawValue::Write<COLLECT_STRING_VALS>(src, this, slot_desc, pool, &string_values);
+      if (string_values.size() > 0) {
+        for (StringValue* string_val : string_values) {
+          *(non_null_string_values++) = string_val;
+          *total_string_lengths += string_val->len;
+        }
+        (*num_non_null_string_values) += string_values.size();
       }
     } else {
-      SetNull(slot_desc->null_indicator_offset());
+      if (slot_desc->type().IsStructType()) {
+        SetStructToNull(slot_desc);
+      } else {
+        SetNull(slot_desc->null_indicator_offset());
+      }
     }
   }
 }
 
+void Tuple::SetStructToNull(const SlotDescriptor* const slot_desc) {
+  DCHECK(slot_desc != nullptr && slot_desc->type().IsStructType());
+  DCHECK(slot_desc->children_tuple_descriptor() != nullptr);
+  SetNull(slot_desc->null_indicator_offset());
+  for (SlotDescriptor* child_slot : slot_desc->children_tuple_descriptor()->slots()) {
+    SetNull(child_slot->null_indicator_offset());
+    if (child_slot->type().IsStructType()) SetStructToNull(child_slot);
+  }
+}
+
 char* Tuple::AllocateStrings(const char* err_ctx, RuntimeState* state,
     int64_t bytes, MemPool* pool, Status* status) noexcept {
   char* buf = reinterpret_cast<char*>(pool->TryAllocateUnaligned(bytes));
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index a7554d8..82c78f0 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -250,6 +250,11 @@ class Tuple {
     return (*null_indicator_byte & offset.bit_mask) != 0;
   }
 
+  /// 'slot_desc' describes a struct slot in this tuple. Sets 'slot_desc' to null in this
+  /// tuple and iterates its children and sets all of them to null too. Recursively
+  /// iterates nested structs.
+  void SetStructToNull(const SlotDescriptor* const slot_desc);
+
   /// Set the null indicators on 'num_tuples' tuples. The first tuple is stored at
   /// 'tuple_mem' and subsequent tuples must be stored at a stride of 'tuple_stride'
   /// bytes.
diff --git a/be/src/runtime/types.cc b/be/src/runtime/types.cc
index 21f7b65..5ed3aa5 100644
--- a/be/src/runtime/types.cc
+++ b/be/src/runtime/types.cc
@@ -306,6 +306,9 @@ TTypeEntry ColumnType::ToHs2Type() const {
           (type == TYPE_CHAR) ? TTypeId::CHAR_TYPE : TTypeId::VARCHAR_TYPE);
       break;
     }
+    case TYPE_STRUCT:
+      type_entry.__set_type(TTypeId::STRING_TYPE);
+      break;
     default:
       // HiveServer2 does not have a type for invalid, date, datetime or
       // fixed_uda_intermediate.
diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h
index 684c41e..2202f84 100644
--- a/be/src/runtime/types.h
+++ b/be/src/runtime/types.h
@@ -215,6 +215,10 @@ struct ColumnType {
     return type == TYPE_STRUCT || type == TYPE_ARRAY || type == TYPE_MAP;
   }
 
+  inline bool IsStructType() const {
+    return type == TYPE_STRUCT;
+  }
+
   inline bool IsCollectionType() const {
     return type == TYPE_ARRAY || type == TYPE_MAP;
   }
@@ -224,8 +228,72 @@ struct ColumnType {
   }
 
   /// Returns the byte size of this type.  Returns 0 for variable length types.
-  inline int GetByteSize() const {
-    switch (type) {
+  inline int GetByteSize() const { return GetByteSize(*this); }
+
+  /// Returns the size of a slot for this type.
+  inline int GetSlotSize() const { return GetSlotSize(*this); }
+
+  static inline int GetDecimalByteSize(int precision) {
+    DCHECK_GT(precision, 0);
+    if (precision <= MAX_DECIMAL4_PRECISION) return 4;
+    if (precision <= MAX_DECIMAL8_PRECISION) return 8;
+    return 16;
+  }
+
+  /// Returns the IR version of this ColumnType. Only implemented for scalar types. LLVM
+  /// optimizer can pull out fields of the returned ConstantStruct for constant folding.
+  llvm::ConstantStruct* ToIR(LlvmCodeGen* codegen) const;
+
+  apache::hive::service::cli::thrift::TTypeEntry ToHs2Type() const;
+  std::string DebugString() const;
+
+  /// Used to create a possibly nested type from the flattened Thrift representation.
+  ///
+  /// 'idx' is an in/out parameter that is initially set to the index of the type in
+  /// 'types' being constructed, and is set to the index of the next type in 'types' that
+  /// needs to be processed (or the size 'types' if all nodes have been processed).
+  ColumnType(const std::vector<TTypeNode>& types, int* idx);
+
+ private:
+  /// Recursive implementation of ToThrift() that populates 'thrift_type' with the
+  /// TTypeNodes for this type and its children.
+  void ToThrift(TColumnType* thrift_type) const;
+
+  /// Helper function for GetSlotSize() so that struct size could be calculated
+  /// recursively.
+  static inline int GetSlotSize(const ColumnType& col_type) {
+    switch (col_type.type) {
+      case TYPE_STRUCT: {
+        int struct_size = 0;
+        for (ColumnType child_type : col_type.children) {
+          struct_size += GetSlotSize(child_type);
+        }
+        return struct_size;
+      }
+      case TYPE_STRING:
+      case TYPE_VARCHAR:
+        return 12;
+      case TYPE_CHAR:
+      case TYPE_FIXED_UDA_INTERMEDIATE:
+        return col_type.len;
+      case TYPE_ARRAY:
+      case TYPE_MAP:
+        return 12;
+      default:
+        return GetByteSize(col_type);
+    }
+  }
+
+  /// Helper function for GetByteSize()
+  static inline int GetByteSize(const ColumnType& col_type) {
+    switch (col_type.type) {
+      case TYPE_STRUCT: {
+        int struct_size = 0;
+        for (ColumnType child_type : col_type.children) {
+          struct_size += GetByteSize(child_type);
+        }
+        return struct_size;
+      }
       case TYPE_ARRAY:
       case TYPE_MAP:
       case TYPE_STRING:
@@ -233,7 +301,7 @@ struct ColumnType {
         return 0;
       case TYPE_CHAR:
       case TYPE_FIXED_UDA_INTERMEDIATE:
-        return len;
+        return col_type.len;
       case TYPE_NULL:
       case TYPE_BOOLEAN:
       case TYPE_TINYINT:
@@ -251,58 +319,13 @@ struct ColumnType {
         // This is the size of the slot, the actual size of the data is 12.
         return 16;
       case TYPE_DECIMAL:
-        return GetDecimalByteSize(precision);
+        return GetDecimalByteSize(col_type.precision);
       case INVALID_TYPE:
       default:
-        DCHECK(false) << "NYI: " << type;
+        DCHECK(false) << "NYI: " << col_type.type;
     }
     return 0;
   }
-
-  /// Returns the size of a slot for this type.
-  inline int GetSlotSize() const {
-    switch (type) {
-      case TYPE_STRING:
-      case TYPE_VARCHAR:
-        return 12;
-      case TYPE_CHAR:
-      case TYPE_FIXED_UDA_INTERMEDIATE:
-        return len;
-      case TYPE_ARRAY:
-      case TYPE_MAP:
-        return 12;
-      case TYPE_STRUCT:
-        DCHECK(false) << "TYPE_STRUCT slot not possible";
-      default:
-        return GetByteSize();
-    }
-  }
-
-  static inline int GetDecimalByteSize(int precision) {
-    DCHECK_GT(precision, 0);
-    if (precision <= MAX_DECIMAL4_PRECISION) return 4;
-    if (precision <= MAX_DECIMAL8_PRECISION) return 8;
-    return 16;
-  }
-
-  /// Returns the IR version of this ColumnType. Only implemented for scalar types. LLVM
-  /// optimizer can pull out fields of the returned ConstantStruct for constant folding.
-  llvm::ConstantStruct* ToIR(LlvmCodeGen* codegen) const;
-
-  apache::hive::service::cli::thrift::TTypeEntry ToHs2Type() const;
-  std::string DebugString() const;
-
- private:
-  /// Used to create a possibly nested type from the flattened Thrift representation.
-  ///
-  /// 'idx' is an in/out parameter that is initially set to the index of the type in
-  /// 'types' being constructed, and is set to the index of the next type in 'types' that
-  /// needs to be processed (or the size 'types' if all nodes have been processed).
-  ColumnType(const std::vector<TTypeNode>& types, int* idx);
-
-  /// Recursive implementation of ToThrift() that populates 'thrift_type' with the
-  /// TTypeNodes for this type and its children.
-  void ToThrift(TColumnType* thrift_type) const;
 };
 
 std::ostream& operator<<(std::ostream& os, const ColumnType& type);
diff --git a/be/src/service/hs2-util.cc b/be/src/service/hs2-util.cc
index 1870b34..f7731b9 100644
--- a/be/src/service/hs2-util.cc
+++ b/be/src/service/hs2-util.cc
@@ -17,6 +17,10 @@
 
 #include "service/hs2-util.h"
 
+#include <rapidjson/rapidjson.h>
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/writer.h>
+
 #include "common/logging.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "runtime/date-value.h"
@@ -24,6 +28,7 @@
 #include "runtime/raw-value.inline.h"
 #include "runtime/row-batch.h"
 #include "runtime/types.h"
+#include "udf/udf-internal.h"
 #include "util/bit-util.h"
 
 #include <gutil/strings/substitute.h>
@@ -336,6 +341,60 @@ static void DecimalExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
   }
 }
 
+// Gets a StructVal and puts it's JSON format into 'out_stream'. Uses 'column_type' to
+// figure out field names and types. This functions can call itself recursively in case
+// of nested structs.
+static void StructValToJSON(const StructVal& struct_val, const ColumnType& column_type,
+    rapidjson::Writer<rapidjson::StringBuffer>* writer) {
+  DCHECK(column_type.type == TYPE_STRUCT);
+  DCHECK_EQ(struct_val.num_children, column_type.children.size());
+  writer->StartObject();
+  for (int i = 0; i < struct_val.num_children; ++i) {
+    writer->String(column_type.field_names[i].c_str());
+    void* child = (void*)(struct_val.ptr[i]);
+    if (child == nullptr) {
+      writer->Null();
+    } else if (column_type.children[i].IsStructType()) {
+      StructValToJSON(*((StructVal*)child), column_type.children[i], writer);
+    } else {
+      string tmp;
+      RawValue::PrintValue(child, column_type.children[i], -1, &tmp);
+      const ColumnType& child_type = column_type.children[i];
+      if (child_type.IsStringType() || child_type.IsDateType() ||
+          child_type.IsTimestampType()) {
+        writer->String(tmp.c_str());
+      } else if (child_type.IsBooleanType()) {
+        writer->Bool( *(reinterpret_cast<bool*>(child)) );
+      } else {
+        writer->RawValue(tmp.c_str(), tmp.size(), rapidjson::kNumberType);
+      }
+    }
+  }
+  writer->EndObject();
+}
+
+static void StructExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
+    const TColumnType& type, RowBatch* batch, int start_idx, int num_rows,
+    uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) {
+  DCHECK(type.types.size() > 1);
+  ReserveSpace(num_rows, output_row_idx, &column->stringVal);
+  FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) {
+    StructVal struct_val = expr_eval->GetStructVal(it.Get());
+    if (struct_val.is_null) {
+      column->stringVal.values.emplace_back();
+    } else {
+      int idx = 0;
+      ColumnType column_type(type.types, &idx);
+      rapidjson::StringBuffer buffer;
+      rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+      StructValToJSON(struct_val, column_type, &writer);
+      column->stringVal.values.emplace_back(buffer.GetString());
+    }
+    SetNullBit(output_row_idx, struct_val.is_null, &column->stringVal.nulls);
+    ++output_row_idx;
+  }
+}
+
 // For V6 and above
 void impala::ExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
     const TColumnType& type, RowBatch* batch, int start_idx, int num_rows,
@@ -344,6 +403,11 @@ void impala::ExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
   // the type for every row.
   // TODO: instead of relying on stamped out implementations, we could codegen this loop
   // to inline the expression evaluation into the loop body.
+  if (type.types[0].type == TTypeNodeType::STRUCT) {
+    StructExprValuesToHS2TColumn(
+        expr_eval, type, batch, start_idx, num_rows, output_row_idx, column);
+    return;
+  }
   switch (type.types[0].scalar_type.type) {
     case TPrimitiveType::NULL_TYPE:
     case TPrimitiveType::BOOLEAN:
@@ -398,7 +462,7 @@ void impala::ExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval,
     }
     default:
       DCHECK(false) << "Unhandled type: "
-                    << TypeToString(ThriftToType(type.types[0].scalar_type.type));
+          << TypeToString(ThriftToType(type.types[0].scalar_type.type));
   }
 }
 
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 79a500f..cc1aa53 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -231,6 +231,11 @@ void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata,
     results_metadata.schema.fieldSchemas.resize(result_set_md->columns.size());
     for (int i = 0; i < results_metadata.schema.fieldSchemas.size(); ++i) {
       const TColumnType& type = result_set_md->columns[i].columnType;
+      DCHECK_LE(1, type.types.size());
+      if (type.types[0].type != TTypeNodeType::SCALAR) {
+        RaiseBeeswaxException("Returning complex types is not supported through the "
+            "beeswax interface", SQLSTATE_GENERAL_ERROR);
+      }
       DCHECK_EQ(1, type.types.size());
       DCHECK_EQ(TTypeNodeType::SCALAR, type.types[0].type);
       DCHECK(type.types[0].__isset.scalar_type);
diff --git a/be/src/service/query-result-set.cc b/be/src/service/query-result-set.cc
index f2d5b8e..e6d67ee 100644
--- a/be/src/service/query-result-set.cc
+++ b/be/src/service/query-result-set.cc
@@ -402,45 +402,51 @@ int64_t HS2ColumnarResultSet::ByteSize(int start_idx, int num_rows) {
 
 void HS2ColumnarResultSet::InitColumns() {
   result_set_->__isset.columns = true;
-  for (const TColumn& col : metadata_.columns) {
-    DCHECK(col.columnType.types.size() == 1)
-        << "Structured columns unsupported in HS2 interface";
-    ThriftTColumn column;
-    switch (col.columnType.types[0].scalar_type.type) {
-      case TPrimitiveType::NULL_TYPE:
-      case TPrimitiveType::BOOLEAN:
-        column.__isset.boolVal = true;
-        break;
-      case TPrimitiveType::TINYINT:
-        column.__isset.byteVal = true;
-        break;
-      case TPrimitiveType::SMALLINT:
-        column.__isset.i16Val = true;
-        break;
-      case TPrimitiveType::INT:
-        column.__isset.i32Val = true;
-        break;
-      case TPrimitiveType::BIGINT:
-        column.__isset.i64Val = true;
-        break;
-      case TPrimitiveType::FLOAT:
-      case TPrimitiveType::DOUBLE:
-        column.__isset.doubleVal = true;
-        break;
-      case TPrimitiveType::TIMESTAMP:
-      case TPrimitiveType::DATE:
-      case TPrimitiveType::DECIMAL:
-      case TPrimitiveType::VARCHAR:
-      case TPrimitiveType::CHAR:
-      case TPrimitiveType::STRING:
-        column.__isset.stringVal = true;
-        break;
-      default:
-        DCHECK(false) << "Unhandled column type: "
-                      << TypeToString(
-                             ThriftToType(col.columnType.types[0].scalar_type.type));
+  for (const TColumn& col_input : metadata_.columns) {
+    ThriftTColumn col_output;
+    if (col_input.columnType.types[0].type == TTypeNodeType::STRUCT) {
+      DCHECK(col_input.columnType.types.size() > 0);
+      // Return structs as string.
+      col_output.__isset.stringVal = true;
+    } else {
+      DCHECK(col_input.columnType.types.size() == 1);
+      DCHECK(col_input.columnType.types[0].__isset.scalar_type);
+      TPrimitiveType::type input_type = col_input.columnType.types[0].scalar_type.type;
+      switch (input_type) {
+        case TPrimitiveType::NULL_TYPE:
+        case TPrimitiveType::BOOLEAN:
+          col_output.__isset.boolVal = true;
+          break;
+        case TPrimitiveType::TINYINT:
+          col_output.__isset.byteVal = true;
+          break;
+        case TPrimitiveType::SMALLINT:
+          col_output.__isset.i16Val = true;
+          break;
+        case TPrimitiveType::INT:
+          col_output.__isset.i32Val = true;
+          break;
+        case TPrimitiveType::BIGINT:
+          col_output.__isset.i64Val = true;
+          break;
+        case TPrimitiveType::FLOAT:
+        case TPrimitiveType::DOUBLE:
+          col_output.__isset.doubleVal = true;
+          break;
+        case TPrimitiveType::TIMESTAMP:
+        case TPrimitiveType::DATE:
+        case TPrimitiveType::DECIMAL:
+        case TPrimitiveType::VARCHAR:
+        case TPrimitiveType::CHAR:
+        case TPrimitiveType::STRING:
+          col_output.__isset.stringVal = true;
+          break;
+        default:
+          DCHECK(false) << "Unhandled column type: "
+              << TypeToString(ThriftToType(input_type));
+      }
     }
-    result_set_->columns.push_back(column);
+    result_set_->columns.push_back(col_output);
   }
 }
 
diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h
index 9a212d1..1a72d1a 100644
--- a/be/src/udf/udf-internal.h
+++ b/be/src/udf/udf-internal.h
@@ -315,6 +315,39 @@ struct CollectionVal : public AnyVal {
   }
 };
 
+/// A struct is represented by a vector of pointers where these pointers point to the
+/// children of the struct.
+struct StructVal : public AnyVal {
+  int num_children;
+
+  /// Pointer to the start of the vector of children pointers. These children pointers in
+  /// fact are AnyVal pointers where a null pointer means that this child is NULL.
+  /// The buffer is not null-terminated.
+  /// Memory allocation to 'ptr' is done using FunctionContext. As a result it's not
+  /// needed to take care of memory deallocation in StructVal as it will be done through
+  /// FunctionContext automatically.
+  uint8_t** ptr;
+
+  StructVal() : AnyVal(true), num_children(0), ptr(nullptr) {}
+
+  StructVal(FunctionContext* ctx, int num_children) : AnyVal(),
+      num_children(num_children), ptr(nullptr) {
+    ReserveMemory(ctx);
+  }
+
+  static StructVal null() { return StructVal(); }
+
+  void addChild(void* child, int idx) {
+    assert(idx >= 0 && idx < num_children);
+    ptr[idx] = (uint8_t*)child;
+  }
+
+private:
+  /// Uses FunctionContext to reserve memory for 'num_children' number of pointers. Sets
+  /// 'ptr' to the beginning of this allocated memory.
+  void ReserveMemory(FunctionContext* ctx);
+};
+
 #pragma GCC diagnostic ignored "-Winvalid-offsetof"
 static_assert(sizeof(CollectionVal) == sizeof(StringVal), "Wrong size.");
 static_assert(
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index 8410c0f..e636f81 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -548,6 +548,19 @@ bool StringVal::Resize(FunctionContext* ctx, int new_len) noexcept {
   return false;
 }
 
+void StructVal::ReserveMemory(FunctionContext* ctx) {
+  assert(ctx != nullptr);
+  assert(num_children >= 0);
+  assert(is_null == false);
+  if (num_children == 0) return;
+  ptr = reinterpret_cast<uint8_t**>(
+      ctx->impl()->AllocateForResults(sizeof(uint8_t*) * num_children));
+  if (UNLIKELY(ptr == nullptr)) {
+    num_children = 0;
+    is_null = true;
+  }
+}
+
 // TODO: why doesn't libudasample.so build if this in udf-ir.cc?
 const FunctionContext::TypeDesc* FunctionContext::GetArgType(int arg_idx) const {
   if (arg_idx < 0 || arg_idx >= impl_->arg_types_.size()) return NULL;
diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h
index 2358672..45d1b22 100644
--- a/be/src/udf/udf.h
+++ b/be/src/udf/udf.h
@@ -90,7 +90,8 @@ class FunctionContext {
     TYPE_DECIMAL,
     TYPE_VARCHAR,
     // A fixed-size buffer, passed as a StringVal.
-    TYPE_FIXED_UDA_INTERMEDIATE
+    TYPE_FIXED_UDA_INTERMEDIATE,
+    TYPE_STRUCT
   };
 
   struct TypeDesc {
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index 2c3f5be..1fa1850 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -181,7 +181,7 @@ string PrintTuple(const Tuple* t, const TupleDescriptor& d) {
     if (t->IsNull(slot_d->null_indicator_offset())) {
       out << "null";
     } else if (slot_d->type().IsCollectionType()) {
-      const TupleDescriptor* item_d = slot_d->collection_item_descriptor();
+      const TupleDescriptor* item_d = slot_d->children_tuple_descriptor();
       const CollectionValue* coll_value =
           reinterpret_cast<const CollectionValue*>(t->GetSlot(slot_d->tuple_offset()));
       uint8_t* coll_buf = coll_value->ptr;
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 575e7b1..9734c3a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -1303,7 +1303,7 @@ public class Analyzer {
       registerColumnPrivReq(result);
       return result;
     }
-    // SlotRefs with a scalar type are registered against the slot's
+    // SlotRefs with a scalar or struct types are registered against the slot's
     // fully-qualified lowercase path.
     String key = slotPath.toString();
     Preconditions.checkState(key.equals(key.toLowerCase()),
@@ -2849,6 +2849,12 @@ public class Analyzer {
       // Type compatible with the i-th exprs of all expr lists.
       // Initialize with type of i-th expr in first list.
       Type compatibleType = firstList.get(i).getType();
+      if (firstList.get(i) instanceof SlotRef &&
+          compatibleType.isStructType()) {
+        throw new AnalysisException(String.format(
+            "Set operations don't support STRUCT type. %s in %s", compatibleType.toSql(),
+            firstList.get(i).toSql()));
+      }
       widestExprs.add(firstList.get(i));
       for (int j = 1; j < exprLists.size(); ++j) {
         Preconditions.checkState(exprLists.get(j).size() == firstList.size());
diff --git a/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java b/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java
index 7e41e1a..765697b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java
@@ -140,7 +140,20 @@ public class DescriptorTable {
       SlotDescriptor slotDesc = getSlotDesc(id);
       if (slotDesc.isMaterialized()) continue;
       slotDesc.setIsMaterialized(true);
-      affectedTuples.add(slotDesc.getParent());
+      // Don't add the TupleDescriptor that is for struct children.
+      if (slotDesc.getParent().getParentSlotDesc() == null) {
+        affectedTuples.add(slotDesc.getParent());
+      }
+      if (slotDesc.getType().isStructType()) {
+        TupleDescriptor childrenTuple = slotDesc.getItemTupleDesc();
+        Preconditions.checkNotNull(childrenTuple);
+        Preconditions.checkState(childrenTuple.getSlots().size() > 0);
+        List<SlotId> childrenIds = Lists.newArrayList();
+        for (SlotDescriptor childSlot : childrenTuple.getSlots()) {
+          childrenIds.add(childSlot.getId());
+        }
+        markSlotsMaterialized(childrenIds);
+      }
     }
     return affectedTuples;
   }
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index e5a7fa6..b74546f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -393,14 +393,20 @@ public class SelectStmt extends QueryStmt {
       }
 
       for (Expr expr: resultExprs_) {
-        // Complex types are currently not supported in the select list because
+        // Collection types are currently not supported in the select list because
         // we'd need to serialize them in a meaningful way.
-        if (expr.getType().isComplexType()) {
+        if (expr.getType().isCollectionType()) {
           throw new AnalysisException(String.format(
-              "Expr '%s' in select list returns a complex type '%s'.\n" +
-              "Only scalar types are allowed in the select list.",
+              "Expr '%s' in select list returns a collection type '%s'.\n" +
+              "Collection types are not allowed in the select list.",
               expr.toSql(), expr.getType().toSql()));
         }
+        if (expr.getType().isStructType()) {
+          if (!analyzer_.getQueryCtx().client_request.query_options.disable_codegen) {
+            throw new AnalysisException("Struct type in select list is not allowed " +
+                "when Codegen is ON. You might want to set DISABLE_CODEGEN=true");
+          }
+        }
         if (!expr.getType().isSupported()) {
           throw new AnalysisException("Unsupported type '"
               + expr.getType().toSql() + "' in '" + expr.toSql() + "'.");
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
index 1ed175f..1cd72a5 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
@@ -46,7 +46,7 @@ public class SlotDescriptor {
   private Path path_;
   private Type type_;
 
-  // Tuple descriptor for collection items. Only set if type_ is an array or map.
+  // Tuple descriptor for nested items. Set if type_ is an array, map or struct.
   private TupleDescriptor itemTupleDesc_;
 
   // for SlotRef.toSql() in the absence of a path
@@ -89,6 +89,7 @@ public class SlotDescriptor {
     parent_ = parent;
     type_ = src.type_;
     itemTupleDesc_ = src.itemTupleDesc_;
+    if (itemTupleDesc_ != null) itemTupleDesc_.setParentSlotDesc(this);
     path_ = src.path_;
     label_ = src.label_;
     sourceExprs_ = src.sourceExprs_;
@@ -120,6 +121,10 @@ public class SlotDescriptor {
         itemTupleDesc_ == null, "Item tuple descriptor already set.");
     itemTupleDesc_ = t;
   }
+  public void clearItemTupleDesc() {
+    Preconditions.checkState(itemTupleDesc_ != null);
+    itemTupleDesc_ = null;
+  }
   public boolean isMaterialized() { return isMaterialized_; }
   public void setIsMaterialized(boolean value) {
     if (isMaterialized_ == value) return;
@@ -145,7 +150,8 @@ public class SlotDescriptor {
   public void setPath(Path path) {
     Preconditions.checkNotNull(path);
     Preconditions.checkState(path.isRootedAtTuple());
-    Preconditions.checkState(path.getRootDesc() == parent_);
+    Preconditions.checkState(path.getRootDesc() == parent_ ||
+        parent_.getType().isStructType());
     path_ = path;
     type_ = path_.destType();
     label_ = Joiner.on(".").join(path.getRawPath());
@@ -239,8 +245,8 @@ public class SlotDescriptor {
     Preconditions.checkState(path_.isResolved());
 
     List<Integer> materializedPath = Lists.newArrayList(path_.getAbsolutePath());
-    // For scalar types, the materialized path is the same as path_
-    if (type_.isScalarType()) return materializedPath;
+    // For scalar types and structs the materialized path is the same as path_
+    if (type_.isScalarType() || type_.isStructType()) return materializedPath;
     Preconditions.checkState(type_.isCollectionType());
     Preconditions.checkState(path_.getFirstCollectionIndex() != -1);
     // Truncate materializedPath after first collection element
@@ -324,6 +330,7 @@ public class SlotDescriptor {
         .add("nullIndicatorBit", nullIndicatorBit_)
         .add("slotIdx", slotIdx_)
         .add("stats", stats_)
+        .add("itemTupleDesc", itemTupleDesc_)
         .toString();
   }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
index 0ca1819..a31c51a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
@@ -21,7 +21,11 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.impala.analysis.Path.PathType;
+import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.StructField;
+import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
@@ -42,6 +46,9 @@ public class SlotRef extends Expr {
   // Results of analysis.
   private SlotDescriptor desc_;
 
+  // The resolved path after resolving 'rawPath_'.
+  private Path resolvedPath_ = null;
+
   public SlotRef(List<String> rawPath) {
     super();
     rawPath_ = rawPath;
@@ -64,7 +71,8 @@ public class SlotRef extends Expr {
   public SlotRef(SlotDescriptor desc) {
     super();
     if (desc.isScanSlot()) {
-      rawPath_ = desc.getPath().getRawPath();
+      resolvedPath_ = desc.getPath();
+      rawPath_ = resolvedPath_.getRawPath();
     } else {
       rawPath_ = null;
     }
@@ -82,6 +90,7 @@ public class SlotRef extends Expr {
    */
   private SlotRef(SlotRef other) {
     super(other);
+    resolvedPath_ = other.resolvedPath_;
     rawPath_ = other.rawPath_;
     label_ = other.label_;
     desc_ = other.desc_;
@@ -108,20 +117,30 @@ public class SlotRef extends Expr {
     return numDistinctValues;
   }
 
+  /**
+   * Resetting a struct SlotRef remove its children as an analyzeImpl() on this
+   * particular SlotRef will create the children again.
+   */
+  @Override
+  public SlotRef reset() {
+    if (type_.isStructType()) clearChildren();
+    super.reset();
+    return this;
+  }
+
   @Override
   protected void analyzeImpl(Analyzer analyzer) throws AnalysisException {
     // TODO: derived slot refs (e.g., star-expanded) will not have rawPath set.
     // Change construction to properly handle such cases.
     Preconditions.checkState(rawPath_ != null);
-    Path resolvedPath = null;
     try {
-      resolvedPath = analyzer.resolvePathWithMasking(rawPath_, PathType.SLOT_REF);
+      resolvedPath_ = analyzer.resolvePathWithMasking(rawPath_, PathType.SLOT_REF);
     } catch (TableLoadingException e) {
       // Should never happen because we only check registered table aliases.
       Preconditions.checkState(false);
     }
-    Preconditions.checkNotNull(resolvedPath);
-    desc_ = analyzer.registerSlotRef(resolvedPath);
+    Preconditions.checkNotNull(resolvedPath_);
+    desc_ = analyzer.registerSlotRef(resolvedPath_);
     type_ = desc_.getType();
     if (!type_.isSupported()) {
       throw new UnsupportedFeatureException("Unsupported type '"
@@ -134,17 +153,118 @@ public class SlotRef extends Expr {
       throw new UnsupportedFeatureException("Unsupported type in '" + toSql() + "'.");
     }
     // Register scalar columns of a catalog table.
-    if (!resolvedPath.getMatchedTypes().isEmpty()
-        && !resolvedPath.getMatchedTypes().get(0).isComplexType()) {
+    if (!resolvedPath_.getMatchedTypes().isEmpty()
+        && !resolvedPath_.getMatchedTypes().get(0).isComplexType()) {
       analyzer.registerScalarColumnForMasking(desc_);
     }
 
     numDistinctValues_ = adjustNumDistinctValues();
-    FeTable rootTable = resolvedPath.getRootTable();
+    FeTable rootTable = resolvedPath_.getRootTable();
     if (rootTable != null && rootTable.getNumRows() > 0) {
       // The NDV cannot exceed the #rows in the table.
       numDistinctValues_ = Math.min(numDistinctValues_, rootTable.getNumRows());
     }
+    if (type_.isStructType() && rootTable != null) {
+      if (!(rootTable instanceof FeFsTable)) {
+        throw new AnalysisException(String.format(
+            "%s is not supported when querying STRUCT type %s",
+            rootTable.getClass().toString(), type_.toSql()));
+      }
+      FeFsTable feTable = (FeFsTable)rootTable;
+      for (HdfsFileFormat format : feTable.getFileFormats()) {
+        if (format != HdfsFileFormat.ORC) {
+          throw new AnalysisException("Querying STRUCT is only supported for ORC file " +
+              "format.");
+        }
+      }
+    }
+    if (type_.isStructType()) expandSlotRefForStruct(analyzer);
+  }
+
+  // This function expects this SlotRef to be a Struct and creates SlotRefs to represent
+  // the children of the struct. Also creates slot and tuple descriptors for the children
+  // of the struct.
+  private void expandSlotRefForStruct(Analyzer analyzer) throws AnalysisException {
+    Preconditions.checkState(type_ != null && type_.isStructType());
+    // If the same struct is present multiple times in the select list we create only a
+    // single TupleDescriptor instead of one for each occurence.
+    if (desc_.getItemTupleDesc() == null) {
+      checkForUnsupportedFieldsForStruct();
+      createStructTuplesAndSlots(analyzer, resolvedPath_);
+    }
+    addStructChildrenAsSlotRefs(analyzer, desc_.getItemTupleDesc());
+  }
+
+  // Expects the type of this SlotRef as a StructType. Throws an AnalysisException if any
+  // of the struct fields of this Slot ref is a collection or unsupported type.
+  private void checkForUnsupportedFieldsForStruct() throws AnalysisException {
+    Preconditions.checkState(type_ instanceof StructType);
+    for (StructField structField : ((StructType)type_).getFields()) {
+      if (!structField.getType().isSupported()) {
+        throw new AnalysisException("Unsupported type '"
+            + structField.getType().toSql() + "' in '" + toSql() + "'.");
+      }
+      if (structField.getType().isCollectionType()) {
+        throw new AnalysisException("Struct containing a collection type is not " +
+            "allowed in the select list.");
+      }
+    }
+  }
+
+  /**
+   * Creates a TupleDescriptor to hold the children of a struct slot and then creates and
+   * adds SlotDescriptors as struct children to this TupleDescriptor. Sets the created
+   * parent TupleDescriptor to 'desc_.itemTupleDesc_'.
+   */
+  public void createStructTuplesAndSlots(Analyzer analyzer, Path resolvedPath) {
+    TupleDescriptor structTuple =
+        analyzer.getDescTbl().createTupleDescriptor("struct_tuple");
+    if (resolvedPath != null) structTuple.setPath(resolvedPath);
+    structTuple.setType((StructType)type_);
+    structTuple.setParentSlotDesc(desc_);
+    for (StructField structField : ((StructType)type_).getFields()) {
+      SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(structTuple);
+      // 'resolvedPath_' could be null e.g. when the query has an order by clause and
+      // this is the sorting tuple.
+      if (resolvedPath != null) {
+        Path relPath = Path.createRelPath(resolvedPath, structField.getName());
+        relPath.resolve();
+        slotDesc.setPath(relPath);
+      }
+      slotDesc.setType(structField.getType());
+      slotDesc.setIsMaterialized(true);
+    }
+    desc_.setItemTupleDesc(structTuple);
+  }
+
+  /**
+   * Assuming that 'structTuple' is the tuple for struct children this function iterates
+   * its slots, creates a SlotRef for each slot and adds them to 'children_' of this
+   * SlotRef.
+   */
+  public void addStructChildrenAsSlotRefs(Analyzer analyzer,
+      TupleDescriptor structTuple) throws AnalysisException {
+    Preconditions.checkState(structTuple != null);
+    Preconditions.checkState(structTuple.getParentSlotDesc() != null);
+    Preconditions.checkState(structTuple.getParentSlotDesc().getType().isStructType());
+    for (SlotDescriptor childSlot : structTuple.getSlots()) {
+      SlotRef childSlotRef = new SlotRef(childSlot);
+      children_.add(childSlotRef);
+      if (childSlot.getType().isStructType()) {
+        childSlotRef.expandSlotRefForStruct(analyzer);
+      }
+    }
+  }
+
+  /**
+   * The TreeNode.collect() function shouldn't iterate the children of this SlotRef if
+   * this is a struct SlotRef. The desired functionality is to collect the struct
+   * SlotRefs but not their children.
+   */
+  @Override
+  protected boolean shouldCollectRecursively() {
+    if (desc_ != null && desc_.getType().isStructType()) return false;
+    return true;
   }
 
   @Override
@@ -265,7 +385,9 @@ public class SlotRef extends Expr {
   }
 
   @Override
-  public Expr clone() { return new SlotRef(this); }
+  public Expr clone() {
+    return new SlotRef(this);
+  }
 
   @Override
   public String toString() {
diff --git a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
index 8a7780f..8d96f72 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
@@ -23,6 +23,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.TreeNode;
 import org.apache.impala.planner.PlanNode;
 import org.apache.impala.thrift.TSortingOrder;
@@ -254,7 +255,20 @@ public class SortInfo {
         dstSlotDesc.initFromExpr(srcExpr);
       }
       dstSlotDesc.setSourceExpr(srcExpr);
-      outputSmap_.put(srcExpr.clone(), new SlotRef(dstSlotDesc));
+      SlotRef dstExpr = new SlotRef(dstSlotDesc);
+      if (dstSlotDesc.getType().isStructType() &&
+          dstSlotDesc.getItemTupleDesc() != null) {
+        dstSlotDesc.clearItemTupleDesc();
+        dstExpr.createStructTuplesAndSlots(analyzer, null);
+        try {
+          dstExpr.addStructChildrenAsSlotRefs(analyzer, dstSlotDesc.getItemTupleDesc());
+        } catch (AnalysisException ex) {
+          // Adding SlotRefs shouldn't throw here as the source SlotRef had already been
+          // analysed.
+          Preconditions.checkNotNull(null);
+        }
+      }
+      outputSmap_.put(srcExpr.clone(), dstExpr);
       materializedExprs_.add(srcExpr);
     }
   }
diff --git a/fe/src/main/java/org/apache/impala/analysis/Subquery.java b/fe/src/main/java/org/apache/impala/analysis/Subquery.java
index 9c894c8..8fac4a1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Subquery.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Subquery.java
@@ -86,7 +86,9 @@ public class Subquery extends Expr {
     List<Expr> stmtResultExprs = stmt_.getResultExprs();
     if (stmtResultExprs.size() == 1) {
       type_ = stmtResultExprs.get(0).getType();
-      Preconditions.checkState(!type_.isComplexType());
+      if (type_.isComplexType()) {
+        throw new AnalysisException("A subquery can't return complex types. " + toSql());
+      }
     } else {
       type_ = createStructTypeFromExprList();
     }
diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
index 98a5ac3..f51a36e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
@@ -30,6 +30,7 @@ import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.StructType;
+import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TTupleDescriptor;
 
 import com.google.common.base.Joiner;
@@ -109,6 +110,10 @@ public class TupleDescriptor {
   // Tuple of the table masking view that masks this tuple's table.
   private TupleDescriptor maskedByTuple_ = null;
 
+  // If this is a tuple representing the children of a struct slot then 'parentSlot_'
+  // is the struct slot where this tuple belongs. Otherwise it's null.
+  private SlotDescriptor parentStructSlot_ = null;
+
   public TupleDescriptor(TupleId id, String debugName) {
     id_ = id;
     path_ = null;
@@ -166,7 +171,7 @@ public class TupleDescriptor {
   public void setPath(Path p) {
     Preconditions.checkNotNull(p);
     Preconditions.checkState(p.isResolved());
-    Preconditions.checkState(p.destType().isCollectionType());
+    Preconditions.checkState(p.destType().isComplexType());
     path_ = p;
     if (p.destTable() != null) {
       // Do not use Path.getTypeAsStruct() to only allow implicit path resolutions,
@@ -209,6 +214,14 @@ public class TupleDescriptor {
     table.getDesc().maskedByTuple_ = this;
   }
 
+  public void setParentSlotDesc(SlotDescriptor parent) {
+    Preconditions.checkState(parent.getType().isStructType(),
+        "Parent for a TupleDescriptor should be a STRUCT. Actual type is " +
+        parent.getType() + " Tuple ID: " + getId());
+    parentStructSlot_ = parent;
+  }
+  public SlotDescriptor getParentSlotDesc() { return parentStructSlot_; }
+
   public String debugString() {
     String tblStr = (getTable() == null ? "null" : getTable().getFullName());
     List<String> slotStrings = new ArrayList<>();
@@ -223,6 +236,9 @@ public class TupleDescriptor {
         .add("is_materialized", isMaterialized_)
         .add("slots", "[" + Joiner.on(", ").join(slotStrings) + "]");
     if (maskedTable_ != null) toStrHelper.add("masks", maskedTable_.getId());
+    if (parentStructSlot_ != null) {
+      toStrHelper.add("parentSlot", parentStructSlot_.getId());
+    }
     return toStrHelper.toString();
   }
 
@@ -274,8 +290,25 @@ public class TupleDescriptor {
     computeMemLayout();
   }
 
-  public void computeMemLayout() {
-    if (hasMemLayout_) return;
+  /**
+   * Computes the memory layout within this tuple including the total size of the tuple,
+   * size of each underlying slot, slot offsets, offset for the tuple level null
+   * indicator bytes and the null indicator bits for the slots.
+   * For struct tuples the offsets are calculated from the topmost parent's beginning and
+   * not neccessarily starting from zero within this tuple.
+   * Returns the nullIndicatorByte and nullIndicator bit in a Pair<>. This is needed to
+   * handle the case when there is a struct or nested structs in the tuple and top level
+   * nullIndicatorByte and nullIndicatorBit has to be adjusted based on the actual
+   * structure of the structs.
+   */
+  public Pair<Integer, Integer> computeMemLayout() {
+    if (parentStructSlot_ != null) {
+      // If this TupleDescriptor represents the children of a STRUCT then the slot
+      // offsets are adjusted with the parent struct's offset.
+      Preconditions.checkState(parentStructSlot_.getType().isStructType());
+      Preconditions.checkState(parentStructSlot_.getByteOffset() != -1);
+    }
+    if (hasMemLayout_) return null;
     hasMemLayout_ = true;
 
     boolean alwaysAddNullBit = hasNullableKuduScanSlots();
@@ -289,50 +322,59 @@ public class TupleDescriptor {
     int totalSlotSize = 0;
     for (SlotDescriptor d: slots_) {
       if (!d.isMaterialized()) continue;
-      ColumnStats stats = d.getStats();
-      int slotSize = d.getType().getSlotSize();
-
-      if (stats.hasAvgSize()) {
-        avgSerializedSize_ += d.getStats().getAvgSerializedSize();
-      } else {
-        // TODO: for computed slots, try to come up with stats estimates
-        avgSerializedSize_ += slotSize;
-      }
-      // Add padding for a KUDU string slot.
-      if (d.isKuduStringSlot()) {
-        slotSize += KUDU_STRING_PADDING;
-        avgSerializedSize_ += KUDU_STRING_PADDING;
-      }
+      int slotSize = getSlotSize(d);
+      addToAvgSerializedSize(d);
+
       if (!slotsBySize.containsKey(slotSize)) {
         slotsBySize.put(slotSize, new ArrayList<>());
       }
-      totalSlotSize += slotSize;
       slotsBySize.get(slotSize).add(d);
-      if (d.getIsNullable() || alwaysAddNullBit) ++numNullBits;
+
+      totalSlotSize += slotSize;
+      numNullBits += getNumNullBits(d, alwaysAddNullBit);
     }
     // we shouldn't have anything of size <= 0
     Preconditions.checkState(!slotsBySize.containsKey(0));
     Preconditions.checkState(!slotsBySize.containsKey(-1));
 
-    // assign offsets to slots in order of descending size
-    numNullBytes_ = (numNullBits + 7) / 8;
+    // The total number of bytes for nullable scalar or nested struct fields will be
+    // computed for the struct at the top level (i.e., parentStructSlot_ == null).
+
+    // If this descriptor is inside a struct then don't need to count for an additional
+    // null byte here as the null indicator will be on the top level tuple. In other
+    // words the total number of bytes for nullable scalar or nested struct fields will
+    // be computed for the struct at the top level (i.e., parentStructSlot_ == null).
+    numNullBytes_ = (parentStructSlot_ == null) ? (numNullBits + 7) / 8 : 0;
     int slotOffset = 0;
     int nullIndicatorByte = totalSlotSize;
+    if (parentStructSlot_ != null) {
+      nullIndicatorByte = parentStructSlot_.getNullIndicatorByte();
+    }
     int nullIndicatorBit = 0;
+    if (parentStructSlot_ != null) {
+      // If this is a child tuple from a struct then get the next available bit from the
+      // parent struct.
+      nullIndicatorBit = (parentStructSlot_.getNullIndicatorBit() + 1) % 8;
+      // If the parent struct ran out of null bits in the current null byte just before
+      // this tuple then start using a new byte.
+      if (nullIndicatorBit == 0) ++nullIndicatorByte;
+    }
     // slotIdx is the index into the resulting tuple struct.  The first (largest) field
     // is 0, next is 1, etc.
     int slotIdx = 0;
     // sort slots in descending order of size
     List<Integer> sortedSizes = new ArrayList<>(slotsBySize.keySet());
     Collections.sort(sortedSizes, Collections.reverseOrder());
+    // assign offsets to slots in order of descending size
     for (int slotSize: sortedSizes) {
       if (slotsBySize.get(slotSize).isEmpty()) continue;
       for (SlotDescriptor d: slotsBySize.get(slotSize)) {
         Preconditions.checkState(d.isMaterialized());
         d.setByteSize(slotSize);
-        d.setByteOffset(slotOffset);
-        d.setSlotIdx(slotIdx++);
+        d.setByteOffset((parentStructSlot_ == null) ? slotOffset :
+            parentStructSlot_.getByteOffset() + slotOffset);
         slotOffset += slotSize;
+        d.setSlotIdx(slotIdx++);
 
         // assign null indicator
         if (d.getIsNullable() || alwaysAddNullBit) {
@@ -348,11 +390,22 @@ public class TupleDescriptor {
           d.setNullIndicatorBit(-1);
           d.setNullIndicatorByte(0);
         }
+        // For struct slots calculate the mem layout for the tuple representing it's
+        // children.
+        if (d.getType().isStructType()) {
+          Pair<Integer, Integer> nullIndicators =
+              d.getItemTupleDesc().computeMemLayout();
+          // Adjust the null indicator byte and bit according to what is set in the
+          // struct's children
+          nullIndicatorByte = nullIndicators.first;
+          nullIndicatorBit = nullIndicators.second;
+        }
       }
     }
     Preconditions.checkState(slotOffset == totalSlotSize);
 
     byteSize_ = totalSlotSize + numNullBytes_;
+    return new Pair<Integer, Integer>(nullIndicatorByte, nullIndicatorBit);
   }
 
   /**
@@ -364,6 +417,55 @@ public class TupleDescriptor {
   }
 
   /**
+   * Receives a SlotDescriptor as a parameter and returns its size.
+   */
+  private int getSlotSize(SlotDescriptor slotDesc) {
+    int slotSize = slotDesc.getType().getSlotSize();
+    // Add padding for a KUDU string slot.
+    if (slotDesc.isKuduStringSlot()) {
+      slotSize += KUDU_STRING_PADDING;
+    }
+    return slotSize;
+  }
+
+  /**
+   * Gets a SlotDescriptor as parameter and calculates its average serialized size and
+   * adds the result to 'avgSerializedSize_'.
+   */
+  private void addToAvgSerializedSize(SlotDescriptor slotDesc) {
+    ColumnStats stats = slotDesc.getStats();
+    if (stats.hasAvgSize()) {
+      avgSerializedSize_ += stats.getAvgSerializedSize();
+    } else {
+      // Note, there are no stats for complex types slots so can't use average serialized
+      // size from stats for them.
+      // TODO: for computed slots, try to come up with stats estimates
+      avgSerializedSize_ += slotDesc.getType().getSlotSize();
+    }
+    // Add padding for a KUDU string slot.
+    if (slotDesc.isKuduStringSlot()) {
+      avgSerializedSize_ += KUDU_STRING_PADDING;
+    }
+  }
+
+  // Function to calculate the number of null bits required for a slot descriptor. In
+  // case of a struct slot it calls itself recursively to get the required null bits for
+  // the struct's children.
+  private int getNumNullBits(SlotDescriptor slotDesc, boolean alwaysAddNullBit) {
+    Preconditions.checkState(!slotDesc.getType().isStructType() ||
+        slotDesc.getIsNullable());
+    if (!slotDesc.getIsNullable() && !alwaysAddNullBit) return 0;
+    if (!slotDesc.getType().isStructType()) return 1;
+    TupleDescriptor childrenTuple = slotDesc.getItemTupleDesc();
+    Preconditions.checkState(childrenTuple != null);
+    int numNullBits = 1;
+    for (SlotDescriptor child : childrenTuple.getSlots()) {
+      numNullBits += getNumNullBits(child, alwaysAddNullBit);
+    }
+    return numNullBits;
+  }
+
+  /**
    * Returns true if this tuple has at least one materialized nullable Kudu scan slot.
    */
   private boolean hasNullableKuduScanSlots() {
diff --git a/fe/src/main/java/org/apache/impala/catalog/StructType.java b/fe/src/main/java/org/apache/impala/catalog/StructType.java
index 6509e5a..dc637d0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/StructType.java
+++ b/fe/src/main/java/org/apache/impala/catalog/StructType.java
@@ -95,6 +95,20 @@ public class StructType extends Type {
     }
   }
 
+  /**
+   * The size of a struct slot is the sum of the size of its children. Don't have to
+   * count for null indicators as they are not stored on the level of the struct slot,
+   * instead it's on the topmost tuple's level.
+   */
+  @Override
+  public int getSlotSize() {
+    int size = 0;
+    for (StructField structField : fields_) {
+      size += structField.getType().getSlotSize();
+    }
+    return size;
+  }
+
   @Override
   public boolean equals(Object other) {
     if (!(other instanceof StructType)) return false;
diff --git a/fe/src/main/java/org/apache/impala/common/TreeNode.java b/fe/src/main/java/org/apache/impala/common/TreeNode.java
index 7d3afa1..db6f332 100644
--- a/fe/src/main/java/org/apache/impala/common/TreeNode.java
+++ b/fe/src/main/java/org/apache/impala/common/TreeNode.java
@@ -104,6 +104,12 @@ public abstract class TreeNode<NodeType extends TreeNode<NodeType>> {
   }
 
   /**
+   * This function helps to decide if a collect() should call itself recursively with the
+   * children of a given TreeNode.
+   */
+  protected boolean shouldCollectRecursively() { return true; }
+
+  /**
    * Add all nodes in the tree that satisfy 'predicate' to the list 'matches'
    * This node is checked first, followed by its children in order. If the node
    * itself matches, the children are skipped.
@@ -120,7 +126,9 @@ public abstract class TreeNode<NodeType extends TreeNode<NodeType>> {
       matches.add((D) this);
       return;
     }
-    for (NodeType child: children_) child.collect(predicate, matches);
+    if (shouldCollectRecursively()) {
+      for (NodeType child: children_) child.collect(predicate, matches);
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index f8edfbd..f8dcb68 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -2066,7 +2066,7 @@ public class HdfsScanNode extends ScanNode {
             columnByteSizes.add(computeMinScalarColumnMemReservation(column));
           }
         } else {
-          appendMinColumnMemReservationsForCollection(slot, columnByteSizes);
+          appendMinColumnMemReservationsForComplexType(slot, columnByteSizes);
         }
       }
     }
@@ -2080,14 +2080,14 @@ public class HdfsScanNode extends ScanNode {
 
   /**
    * Helper for computeMinColumnMemReservations() - compute minimum memory reservations
-   * for all of the scalar columns read from disk when materializing collectionSlot.
+   * for all of the scalar columns read from disk when materializing complexSlot.
    * Appends one number per scalar column to columnMemReservations.
    */
-  private void appendMinColumnMemReservationsForCollection(SlotDescriptor collectionSlot,
+  private void appendMinColumnMemReservationsForComplexType(SlotDescriptor complexSlot,
       List<Long> columnMemReservations) {
-    Preconditions.checkState(collectionSlot.getType().isCollectionType());
+    Preconditions.checkState(complexSlot.getType().isComplexType());
     boolean addedColumn = false;
-    for (SlotDescriptor nestedSlot: collectionSlot.getItemTupleDesc().getSlots()) {
+    for (SlotDescriptor nestedSlot: complexSlot.getItemTupleDesc().getSlots()) {
       // Position virtual slots can be materialized by piggybacking on another slot.
       if (!nestedSlot.isMaterialized() || nestedSlot.isArrayPosRef()) continue;
       if (nestedSlot.getType().isScalarType()) {
@@ -2095,8 +2095,8 @@ public class HdfsScanNode extends ScanNode {
         // reservation.
         columnMemReservations.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
         addedColumn = true;
-      } else {
-        appendMinColumnMemReservationsForCollection(nestedSlot, columnMemReservations);
+      } else if (nestedSlot.getType().isComplexType()) {
+        appendMinColumnMemReservationsForComplexType(nestedSlot, columnMemReservations);
       }
     }
     // Need to scan at least one column to materialize the pos virtual slot and/or
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index b98fa37..fc6cdcf 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -2291,17 +2291,13 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         " stored as kudu as select cs from functional.chars_tiny",
         "Cannot create table 't': Type CHAR(5) is not supported in Kudu");
     AnalysisError("create table t primary key (id) partition by hash partitions 3" +
-        " stored as kudu as select id, s from functional.complextypes_fileformat",
-        "Expr 's' in select list returns a complex type 'STRUCT<f1:STRING,f2:INT>'.\n" +
-        "Only scalar types are allowed in the select list.");
-    AnalysisError("create table t primary key (id) partition by hash partitions 3" +
         " stored as kudu as select id, m from functional.complextypes_fileformat",
-        "Expr 'm' in select list returns a complex type 'MAP<STRING,BIGINT>'.\n" +
-        "Only scalar types are allowed in the select list.");
+        "Expr 'm' in select list returns a collection type 'MAP<STRING,BIGINT>'.\n" +
+        "Collection types are not allowed in the select list.");
     AnalysisError("create table t primary key (id) partition by hash partitions 3" +
         " stored as kudu as select id, a from functional.complextypes_fileformat",
-        "Expr 'a' in select list returns a complex type 'ARRAY<INT>'.\n" +
-        "Only scalar types are allowed in the select list.");
+        "Expr 'a' in select list returns a collection type 'ARRAY<INT>'.\n" +
+        "Collection types are not allowed in the select list.");
 
     // IMPALA-6454: CTAS into Kudu tables with primary key specified in upper case.
     AnalyzesOk("create table part_kudu_tbl primary key(INT_COL, SMALLINT_COL, ID)" +
@@ -3083,13 +3079,20 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         "Incompatible return types 'INT' and 'STRING' of exprs " +
         "'int_col' and 'string_col'.");
 
-    // View cannot have complex-typed columns because complex-typed exprs are
+    // View cannot have collection-typed columns because collection-typed exprs are
     // not supported in the select list.
-    AnalysisError("create view functional.foo (a, b, c) as " +
-        "select int_array_col, int_map_col, int_struct_col " +
+    AnalysisError("create view functional.foo (a, b) as " +
+        "select int_array_col, int_map_col " +
         "from functional.allcomplextypes",
-        "Expr 'int_array_col' in select list returns a complex type 'ARRAY<INT>'.\n" +
-        "Only scalar types are allowed in the select list.");
+        "Expr 'int_array_col' in select list returns a collection type 'ARRAY<INT>'.\n" +
+        "Collection types are not allowed in the select list.");
+    // It's allowed to do the same with struct as it is supported in the select list.
+    AnalysisContext ctx = createAnalysisCtx();
+    // TODO: Turning Codegen OFF could be removed once the Codegen support is implemented
+    // for structs given in the select list.
+    ctx.getQueryOptions().setDisable_codegen(true);
+    AnalyzesOk("create view functional.foo (a) as " +
+        "select tiny_struct from functional_orc_def.complextypes_structs", ctx);
 
     // IMPALA-7679: Inserting a null column type without an explicit type should
     // throw an error.
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
index 5ce1ba2..1230451 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
@@ -335,9 +335,10 @@ public class AnalyzeExprsTest extends AnalyzerTest {
     AnalysisError("select 1 from functional.allcomplextypes where int_map_col = 1",
         "operands of type MAP<STRING,INT> and TINYINT are not comparable: " +
         "int_map_col = 1");
-    AnalysisError("select 1 from functional.allcomplextypes where int_struct_col = 1",
-        "operands of type STRUCT<f1:INT,f2:INT> and TINYINT are not comparable: " +
-        "int_struct_col = 1");
+    AnalysisError("select 1 from functional_orc_def.complextypes_structs where " +
+        "tiny_struct = true",
+        "operands of type STRUCT<b:BOOLEAN> and BOOLEAN are not comparable: " +
+        "tiny_struct = TRUE");
     // Complex types are not comparable even if identical.
     // TODO: Reconsider this behavior. Such a comparison should ideally work,
     // but may require complex-typed SlotRefs and BE functions able to accept
@@ -645,9 +646,10 @@ public class AnalyzeExprsTest extends AnalyzerTest {
     AnalysisError("select int_array_col or true from functional.allcomplextypes",
         "Operand 'int_array_col' part of predicate 'int_array_col OR TRUE' should " +
             "return type 'BOOLEAN' but returns type 'ARRAY<INT>'");
-    AnalysisError("select false and int_struct_col from functional.allcomplextypes",
-        "Operand 'int_struct_col' part of predicate 'FALSE AND int_struct_col' should " +
-            "return type 'BOOLEAN' but returns type 'STRUCT<f1:INT,f2:INT>'.");
+    AnalysisError("select false and tiny_struct from " +
+        "functional_orc_def.complextypes_structs",
+        "Operand 'tiny_struct' part of predicate 'FALSE AND tiny_struct' should " +
+            "return type 'BOOLEAN' but returns type 'STRUCT<b:BOOLEAN>'.");
     AnalysisError("select not int_map_col from functional.allcomplextypes",
         "Operand 'int_map_col' part of predicate 'NOT int_map_col' should return " +
             "type 'BOOLEAN' but returns type 'MAP<STRING,INT>'.");
@@ -661,12 +663,13 @@ public class AnalyzeExprsTest extends AnalyzerTest {
 
     AnalysisError("select 1 from functional.allcomplextypes where int_map_col is null",
         "IS NULL predicate does not support complex types: int_map_col IS NULL");
-    AnalysisError("select * from functional.allcomplextypes where complex_struct_col " +
-        "is null", "IS NULL predicate does not support complex types: " +
-            "complex_struct_col IS NULL");
-    AnalysisError("select * from functional.allcomplextypes where nested_struct_col " +
-        "is not null", "IS NOT NULL predicate does not support complex types: " +
-            "nested_struct_col IS NOT NULL");
+    AnalysisError("select * from functional_orc_def.complextypes_structs where " +
+        "tiny_struct is null",
+        "IS NULL predicate does not support complex types: tiny_struct IS NULL");
+    AnalysisError("select * from functional_orc_def.complextypes_structs where " +
+        "tiny_struct is not null",
+        "IS NOT NULL predicate does not support complex types: tiny_struct " +
+            "IS NOT NULL");
   }
 
   @Test
@@ -767,10 +770,10 @@ public class AnalyzeExprsTest extends AnalyzerTest {
         "where date_col between int_col and double_col",
         "Incompatible return types 'DATE' and 'INT' " +
         "of exprs 'date_col' and 'int_col'.");
-    AnalysisError("select 1 from functional.allcomplextypes " +
-        "where int_struct_col between 10 and 20",
-        "Incompatible return types 'STRUCT<f1:INT,f2:INT>' and 'TINYINT' " +
-        "of exprs 'int_struct_col' and '10'.");
+    AnalysisError("select 1 from functional_orc_def.complextypes_structs " +
+        "where tiny_struct between 10 and 20",
+        "Incompatible return types 'STRUCT<b:BOOLEAN>' and 'TINYINT' " +
+        "of exprs 'tiny_struct' and '10'.");
     // IMPALA-7211: Do not cast decimal types to other decimal types
     AnalyzesOk("select cast(1 as decimal(38,2)) between " +
         "0.9 * cast(1 as decimal(38,3)) and 3");
@@ -1274,9 +1277,10 @@ public class AnalyzeExprsTest extends AnalyzerTest {
     AnalysisError("select id, row_number() over (order by int_array_col) " +
         "from functional_parquet.allcomplextypes", "ORDER BY expression " +
         "'int_array_col' with complex type 'ARRAY<INT>' is not supported.");
-    AnalysisError("select id, count() over (partition by int_struct_col) " +
-        "from functional_parquet.allcomplextypes", "PARTITION BY expression " +
-        "'int_struct_col' with complex type 'STRUCT<f1:INT,f2:INT>' is not supported.");
+    AnalysisError("select id, count() over (partition by tiny_struct) from " +
+        "functional_orc_def.complextypes_structs",
+        "PARTITION BY expression 'tiny_struct' with complex type " +
+        "'STRUCT<b:BOOLEAN>' is not supported.");
   }
 
   /**
@@ -1731,9 +1735,10 @@ public class AnalyzeExprsTest extends AnalyzerTest {
         "'string_col + INTERVAL 10 years' returns type 'STRING'. " +
         "Expected type 'TIMESTAMP' or 'DATE'.");
     AnalysisError(
-        "select int_struct_col + interval 10 years from functional.allcomplextypes",
-        "Operand 'int_struct_col' of timestamp/date arithmetic expression " +
-        "'int_struct_col + INTERVAL 10 years' returns type 'STRUCT<f1:INT,f2:INT>'. " +
+        "select tiny_struct + interval 10 years from " +
+            "functional_orc_def.complextypes_structs",
+        "Operand 'tiny_struct' of timestamp/date arithmetic expression " +
+        "'tiny_struct + INTERVAL 10 years' returns type 'STRUCT<b:BOOLEAN>'. " +
         "Expected type 'TIMESTAMP' or 'DATE'.");
     // Reversed interval and timestamp using addition.
     AnalysisError("select interval 10 years + float_col from functional.alltypes",
@@ -1863,8 +1868,9 @@ public class AnalyzeExprsTest extends AnalyzerTest {
     AnalyzesOk("select round(cast('1.1' as decimal), 1)");
 
     // No matching signature for complex type.
-    AnalysisError("select lower(int_struct_col) from functional.allcomplextypes",
-        "No matching function with signature: lower(STRUCT<f1:INT,f2:INT>).");
+    AnalysisError("select lower(tiny_struct) from " +
+        "functional_orc_def.complextypes_structs",
+        "No matching function with signature: lower(STRUCT<b:BOOLEAN>).");
 
     // Special cases for FROM in function call
     AnalyzesOk("select extract(year from now())");
@@ -2170,10 +2176,10 @@ public class AnalyzeExprsTest extends AnalyzerTest {
     AnalyzesOk("select if(bool_col, false, NULL) from functional.alltypes");
     AnalyzesOk("select if(NULL, NULL, NULL) from functional.alltypes");
     // No matching signature.
-    AnalysisError("select if(true, int_struct_col, int_struct_col) " +
-        "from functional.allcomplextypes",
+    AnalysisError("select if(true, tiny_struct, tiny_struct) " +
+        "from functional_orc_def.complextypes_structs",
         "No matching function with signature: " +
-        "if(BOOLEAN, STRUCT<f1:INT,f2:INT>, STRUCT<f1:INT,f2:INT>).");
+        "if(BOOLEAN, STRUCT<b:BOOLEAN>, STRUCT<b:BOOLEAN>).");
 
     // if() only accepts three arguments
     AnalysisError("select if(true, false, true, true)",
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 8955b3d..fae991d 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -421,16 +421,21 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
 
   /**
    * Checks that the given SQL analyzes ok, and asserts that the last result expr in the
-   * parsed SelectStmt is a scalar SlotRef whose absolute path is identical to the given
-   * expected one. Also asserts that the slot's absolute path is equal to its
+   * parsed SelectStmt is a non-collection SlotRef whose absolute path is identical to
+   * the given expected one. Also asserts that the slot's absolute path is equal to its
    * materialized path. Intentionally allows multiple result exprs to be analyzed to test
    * absolute path caching, though only the last path is validated.
    */
   private void testSlotRefPath(String sql, List<Integer> expectedAbsPath) {
-    SelectStmt stmt = (SelectStmt) AnalyzesOk(sql);
+    AnalysisContext ctx = createAnalysisCtx();
+    // TODO: Turning Codegen OFF could be removed once the Codegen support is implemented
+    // for structs given in the select list.
+    ctx.getQueryOptions().setDisable_codegen(true);
+
+    SelectStmt stmt = (SelectStmt) AnalyzesOk(sql, ctx);
     Expr e = stmt.getResultExprs().get(stmt.getResultExprs().size() - 1);
     Preconditions.checkState(e instanceof SlotRef);
-    Preconditions.checkState(e.getType().isScalarType());
+    Preconditions.checkState(!e.getType().isCollectionType());
     SlotRef slotRef = (SlotRef) e;
     List<Integer> actualAbsPath = slotRef.getDesc().getPath().getAbsolutePath();
     Assert.assertEquals("Mismatched absolute paths.", expectedAbsPath, actualAbsPath);
@@ -520,13 +525,11 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
 
     // Array of structs. No name conflicts with implicit fields. Both implicit and
     // explicit paths are allowed.
-    addTestTable("create table d.t2 (c array<struct<f:int>>)");
+    addTestTable("create table d.t2 (c array<struct<f:int>>) stored as orc");
     testSlotRefPath("select f from d.t2.c", path(0, 0, 0));
     testSlotRefPath("select item.f from d.t2.c", path(0, 0, 0));
     testSlotRefPath("select pos from d.t2.c", path(0, 1));
-    AnalysisError("select item from d.t2.c",
-        "Expr 'item' in select list returns a complex type 'STRUCT<f:INT>'.\n" +
-        "Only scalar types are allowed in the select list.");
+    testSlotRefPath("select item from d.t2.c", path(0, 0));
     AnalysisError("select item.pos from d.t2.c",
         "Could not resolve column/field reference: 'item.pos'");
     // Test star expansion.
@@ -535,16 +538,14 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
 
     // Array of structs with name conflicts. Both implicit and explicit
     // paths are allowed.
-    addTestTable("create table d.t3 (c array<struct<f:int,item:int,pos:int>>)");
+    addTestTable("create table d.t3 (c array<struct<f:int,item:int,pos:int>>) " +
+        "stored as orc");
     testSlotRefPath("select f from d.t3.c", path(0, 0, 0));
     testSlotRefPath("select item.f from d.t3.c", path(0, 0, 0));
     testSlotRefPath("select item.item from d.t3.c", path(0, 0, 1));
     testSlotRefPath("select item.pos from d.t3.c", path(0, 0, 2));
     testSlotRefPath("select pos from d.t3.c", path(0, 1));
-    AnalysisError("select item from d.t3.c",
-        "Expr 'item' in select list returns a complex type " +
-        "'STRUCT<f:INT,item:INT,pos:INT>'.\n" +
-        "Only scalar types are allowed in the select list.");
+    testSlotRefPath("select item from d.t3.c", path(0, 0));
     // Test star expansion.
     testStarPath("select * from d.t3.c", path(0, 0, 0), path(0, 0, 1), path(0, 0, 2));
     testStarPath("select c.* from d.t3.c", path(0, 0, 0), path(0, 0, 1), path(0, 0, 2));
@@ -561,38 +562,49 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
 
     // Map with a scalar key and struct value. No name conflicts. Both implicit and
     // explicit paths are allowed.
-    addTestTable("create table d.t5 (c map<int,struct<f:int>>)");
+    addTestTable("create table d.t5 (c map<int,struct<f:int>>) stored as orc");
     testSlotRefPath("select key from d.t5.c", path(0, 0));
     testSlotRefPath("select f from d.t5.c", path(0, 1, 0));
     testSlotRefPath("select value.f from d.t5.c", path(0, 1, 0));
     AnalysisError("select value.value from d.t5.c",
         "Could not resolve column/field reference: 'value.value'");
-    AnalysisError("select value from d.t5.c",
-        "Expr 'value' in select list returns a complex type " +
-        "'STRUCT<f:INT>'.\n" +
-        "Only scalar types are allowed in the select list.");
+    testSlotRefPath("select value from d.t5.c", path(0, 1));
     // Test star expansion.
     testStarPath("select * from d.t5.c", path(0, 0), path(0, 1, 0));
     testStarPath("select c.* from d.t5.c", path(0, 0), path(0, 1, 0));
 
     // Map with a scalar key and struct value with name conflicts. Both implicit and
     // explicit paths are allowed.
-    addTestTable("create table d.t6 (c map<int,struct<f:int,key:int,value:int>>)");
+    addTestTable("create table d.t6 (c map<int,struct<f:int,key:int,value:int>>) " +
+        "stored as orc");
     testSlotRefPath("select key from d.t6.c", path(0, 0));
     testSlotRefPath("select f from d.t6.c", path(0, 1, 0));
     testSlotRefPath("select value.f from d.t6.c", path(0, 1, 0));
     testSlotRefPath("select value.key from d.t6.c", path(0, 1, 1));
     testSlotRefPath("select value.value from d.t6.c", path(0, 1, 2));
-    AnalysisError("select value from d.t6.c",
-        "Expr 'value' in select list returns a complex type " +
-        "'STRUCT<f:INT,key:INT,value:INT>'.\n" +
-        "Only scalar types are allowed in the select list.");
+    testSlotRefPath("select value from d.t6.c", path(0, 1));
     // Test star expansion.
     testStarPath("select * from d.t6.c",
         path(0, 0), path(0, 1, 0), path(0, 1, 1), path(0, 1, 2));
     testStarPath("select c.* from d.t6.c",
         path(0, 0), path(0, 1, 0), path(0, 1, 1), path(0, 1, 2));
 
+    // Map with nested struct value with name conflict. Both implicit and explicit paths
+    // are allowed.
+    addTestTable("create table d.t6_nested (c map<int," +
+        "struct<f:int,key:int,value:int,s:struct<f:int,key:int,value:int>>>)" +
+        " stored as orc");
+    testSlotRefPath("select key from d.t6_nested.c", path(0,0));
+    testSlotRefPath("select value from d.t6_nested.c", path(0,1));
+    testSlotRefPath("select f from d.t6_nested.c", path(0, 1, 0));
+    testSlotRefPath("select value.key from d.t6_nested.c", path(0, 1, 1));
+    testSlotRefPath("select value.value from d.t6_nested.c", path(0, 1, 2));
+    testSlotRefPath("select value.s from d.t6_nested.c", path(0, 1, 3));
+    testSlotRefPath("select value.s.f from d.t6_nested.c", path(0, 1, 3, 0));
+    testSlotRefPath("select value.s.key from d.t6_nested.c", path(0, 1, 3, 1));
+    testSlotRefPath("select value.s.value from d.t6_nested.c", path(0, 1, 3, 2));
+
+
     // Test implicit/explicit paths on a complicated schema.
     addTestTable("create table d.t7 (" +
         "c1 int, " +
@@ -600,7 +612,8 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "c3 array<struct<a1:array<int>,a2:array<struct<x:int,y:int,a3:array<int>>>>>, " +
         "c4 bigint, " +
         "c5 map<int,struct<m1:map<int,string>," +
-        "                  m2:map<int,struct<x:int,y:int,m3:map<int,int>>>>>)");
+        "                  m2:map<int,struct<x:int,y:int,m3:map<int,int>>>>>) " +
+        "stored as orc");
 
     // Test paths with c3.
     testTableRefPath("select 1 from d.t7.c3.a1", path(2, 0, 0), null);
@@ -615,6 +628,10 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     testTableRefPath("select 1 from d.t7.c3.item.a2.item.a3", path(2, 0, 1, 0, 2), null);
     testSlotRefPath("select item from d.t7.c3.a2.a3", path(2, 0, 1, 0, 2, 0));
     testSlotRefPath("select item from d.t7.c3.item.a2.item.a3", path(2, 0, 1, 0, 2, 0));
+    AnalysisContext ctx = createAnalysisCtx();
+    ctx.getQueryOptions().setDisable_codegen(true);
+    AnalysisError("select item from d.t7.c3", ctx,
+        "Struct containing a collection type is not allowed in the select list.");
     // Test path assembly with multiple tuple descriptors.
     testTableRefPath("select 1 from d.t7, t7.c3, c3.a2, a2.a3",
         path(2, 0, 1, 0, 2), path(2, 0, 1, 0, 2));
@@ -755,27 +772,42 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "Illegal column/field reference 'complex_nested_struct_col.f2.f11' with " +
         "intermediate collection 'f2' of type " +
         "'ARRAY<STRUCT<f11:BIGINT,f12:MAP<STRING,STRUCT<f21:BIGINT>>>>'");
+
+    // Check the support of struct in the select list for different file formats.
+    AnalysisContext ctx = createAnalysisCtx();
+    ctx.getQueryOptions().setDisable_codegen(true);
+    AnalysisError("select alltypes from functional_parquet.complextypes_structs", ctx,
+        "Querying STRUCT is only supported for ORC file format.");
+    AnalyzesOk("select alltypes from functional_orc_def.complextypes_structs", ctx);
+
+    // Check if a struct in the select list raises an error if it contains collections.
+    addTestTable(
+        "create table nested_structs (s1 struct<s2:struct<i:int>>) stored as orc");
+    addTestTable("create table nested_structs_with_list " +
+        "(s1 struct<s2:struct<a:array<int>>>) stored as orc");
+    AnalyzesOk("select s1 from nested_structs", ctx);
+    AnalyzesOk("select s1.s2 from nested_structs", ctx);
+    AnalysisError("select s1 from nested_structs_with_list", ctx, "Struct containing " +
+        "a collection type is not allowed in the select list.");
+    AnalysisError("select s1.s2 from nested_structs_with_list", ctx, "Struct " +
+        "containing a collection type is not allowed in the select list.");
   }
 
   @Test
   public void TestSlotRefPathAmbiguity() {
     addTestDb("a", null);
-    addTestTable("create table a.a (a struct<a:struct<a:int>>)");
+    addTestTable("create table a.a (a struct<a:struct<a:int>>) stored as orc");
 
     // Slot path is not ambiguous.
     AnalyzesOk("select a.a.a.a.a from a.a");
     AnalyzesOk("select t.a.a.a from a.a t");
 
-    // Slot path is not ambiguous but resolves to a struct.
-    AnalysisError("select a from a.a",
-        "Expr 'a' in select list returns a complex type 'STRUCT<a:STRUCT<a:INT>>'.\n" +
-        "Only scalar types are allowed in the select list.");
-    AnalysisError("select t.a from a.a t",
-        "Expr 't.a' in select list returns a complex type 'STRUCT<a:STRUCT<a:INT>>'.\n" +
-        "Only scalar types are allowed in the select list.");
-    AnalysisError("select t.a.a from a.a t",
-        "Expr 't.a.a' in select list returns a complex type 'STRUCT<a:INT>'.\n" +
-        "Only scalar types are allowed in the select list.");
+    // Slot path is not ambiguous and resolves to a struct.
+    AnalysisContext ctx = createAnalysisCtx();
+    ctx.getQueryOptions().setDisable_codegen(true);
+    AnalyzesOk("select a from a.a", ctx);
+    AnalyzesOk("select t.a from a.a t", ctx);
+    AnalyzesOk("select t.a.a from a.a t", ctx);
 
     // Slot paths are ambiguous. A slot path can legally resolve to a non-scalar type,
     // even though we currently do not support non-scalar SlotRefs in the select list
@@ -970,7 +1002,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
   }
 
   /**
-   * Test that complex types are not allowed in the select list.
+   * Test that complex types are supported in the select list.
    */
   @Test
   public void TestComplexTypesInSelectList() {
@@ -990,35 +1022,50 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "tables only have complex-typed columns.");
     // Empty star expansion, but non empty result exprs.
     AnalyzesOk("select 1, * from only_complex_types");
-    // Illegal complex-typed expr in select list.
-    AnalysisError("select int_struct_col from functional.allcomplextypes",
-        "Expr 'int_struct_col' in select list returns a " +
-        "complex type 'STRUCT<f1:INT,f2:INT>'.\n" +
-        "Only scalar types are allowed in the select list.");
+
+    // Struct in select list works only if codegen is OFF.
+    AnalysisContext ctx = createAnalysisCtx();
+    ctx.getQueryOptions().setDisable_codegen(false);
+    AnalysisError("select alltypes from functional_orc_def.complextypes_structs", ctx,
+        "Struct type in select list is not allowed when Codegen is ON. You might want " +
+        "to set DISABLE_CODEGEN=true");
+    ctx.getQueryOptions().setDisable_codegen(true);
+    AnalyzesOk("select alltypes from functional_orc_def.complextypes_structs", ctx);
     // Illegal complex-typed expr in a union.
-    AnalysisError("select int_struct_col from functional.allcomplextypes " +
-        "union all select int_struct_col from functional.allcomplextypes",
-        "Expr 'int_struct_col' in select list returns a " +
-        "complex type 'STRUCT<f1:INT,f2:INT>'.\n" +
-        "Only scalar types are allowed in the select list.");
+    AnalysisError("select int_array_col from functional.allcomplextypes ",
+        "Expr 'int_array_col' in select list returns a collection type 'ARRAY<INT>'.\n" +
+        "Collection types are not allowed in the select list.");
+    // Illegal complex-typed expr in a union.
+    AnalysisError("select int_array_col from functional.allcomplextypes " +
+        "union all select int_array_col from functional.allcomplextypes",
+        "Expr 'int_array_col' in select list returns a collection type 'ARRAY<INT>'.\n" +
+        "Collection types are not allowed in the select list.");
+    AnalysisError("select tiny_struct from functional_orc_def.complextypes_structs " +
+        "union all select tiny_struct from functional_orc_def.complextypes_structs", ctx,
+        "Set operations don't support STRUCT type. STRUCT<b:BOOLEAN> in tiny_struct");
     // Illegal complex-typed expr inside inline view.
     AnalysisError("select 1 from " +
-        "(select int_struct_col from functional.allcomplextypes) v",
-        "Expr 'int_struct_col' in select list returns a " +
-        "complex type 'STRUCT<f1:INT,f2:INT>'.\n" +
-        "Only scalar types are allowed in the select list.");
+        "(select int_array_col from functional.allcomplextypes) v",
+        "Expr 'int_array_col' in select list returns a collection type 'ARRAY<INT>'.\n" +
+        "Collection types are not allowed in the select list.");
+    // Structs are allowed in an inline view.
+    AnalyzesOk("select v.ts from (select tiny_struct as ts from " +
+        "functional_orc_def.complextypes_structs) v;", ctx);
     // Illegal complex-typed expr in an insert.
     AnalysisError("insert into functional.allcomplextypes " +
-        "select int_struct_col from functional.allcomplextypes",
-        "Expr 'int_struct_col' in select list returns a " +
-        "complex type 'STRUCT<f1:INT,f2:INT>'.\n" +
-        "Only scalar types are allowed in the select list.");
+        "select int_array_col from functional.allcomplextypes",
+        "Expr 'int_array_col' in select list returns a collection type 'ARRAY<INT>'.\n" +
+        "Collection types are not allowed in the select list.");
     // Illegal complex-typed expr in a CTAS.
     AnalysisError("create table new_tbl as " +
-        "select int_struct_col from functional.allcomplextypes",
-        "Expr 'int_struct_col' in select list returns a " +
-        "complex type 'STRUCT<f1:INT,f2:INT>'.\n" +
-        "Only scalar types are allowed in the select list.");
+        "select int_array_col from functional.allcomplextypes",
+        "Expr 'int_array_col' in select list returns a collection type 'ARRAY<INT>'.\n" +
+        "Collection types are not allowed in the select list.");
+    AnalysisError("create table new_tbl as " +
+        "select tiny_struct from functional_orc_def.complextypes_structs", ctx,
+        "Unable to INSERT into target table (default.new_tbl) because the column " +
+            "'tiny_struct' has a complex type 'STRUCT<b:BOOLEAN>' and Impala doesn't " +
+            "support inserting into tables containing complex type columns");
   }
 
   @Test
@@ -3019,9 +3066,9 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
                "ORDER BY timestamp_col");
 
     // Ordering by complex-typed expressions is not allowed.
-    AnalysisError("select * from functional_parquet.allcomplextypes " +
-        "order by int_struct_col", "ORDER BY expression 'int_struct_col' with " +
-        "complex type 'STRUCT<f1:INT,f2:INT>' is not supported.");
+    AnalysisError("select * from functional_orc_def.complextypes_structs " +
+        "order by tiny_struct", "ORDER BY expression 'tiny_struct' with " +
+        "complex type 'STRUCT<b:BOOLEAN>' is not supported.");
     AnalysisError("select * from functional_parquet.allcomplextypes " +
         "order by int_array_col", "ORDER BY expression 'int_array_col' with " +
         "complex type 'ARRAY<INT>' is not supported.");
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
index 8a48596..106ec6e 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
@@ -122,9 +122,8 @@ public class AnalyzeUpsertStmtTest extends AnalyzerTest {
         "SELECT item FROM b.int_array_col, functional.alltypestiny");
     // Illegal complex-typed expr
     AnalysisError("upsert into functional_kudu.testtbl " +
-        "select int_struct_col from functional.allcomplextypes",
-        "Expr 'int_struct_col' in select list returns a " +
-        "complex type 'STRUCT<f1:INT,f2:INT>'.\n" +
-        "Only scalar types are allowed in the select list.");
+        "select int_array_col from functional.allcomplextypes",
+        "Expr 'int_array_col' in select list returns a collection type 'ARRAY<INT>'.\n" +
+        "Collection types are not allowed in the select list.");
   }
 }
diff --git a/testdata/ComplexTypesTbl/structs.orc b/testdata/ComplexTypesTbl/structs.orc
new file mode 100644
index 0000000..4d36443
Binary files /dev/null and b/testdata/ComplexTypesTbl/structs.orc differ
diff --git a/testdata/ComplexTypesTbl/structs.parq b/testdata/ComplexTypesTbl/structs.parq
new file mode 100644
index 0000000..a8d696d
Binary files /dev/null and b/testdata/ComplexTypesTbl/structs.parq differ
diff --git a/testdata/ComplexTypesTbl/structs_nested.orc b/testdata/ComplexTypesTbl/structs_nested.orc
new file mode 100644
index 0000000..ad9e4b7
Binary files /dev/null and b/testdata/ComplexTypesTbl/structs_nested.orc differ
diff --git a/testdata/ComplexTypesTbl/structs_nested.parq b/testdata/ComplexTypesTbl/structs_nested.parq
new file mode 100644
index 0000000..baa56aa
Binary files /dev/null and b/testdata/ComplexTypesTbl/structs_nested.parq differ
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 2c4eede..78094a3 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -761,6 +761,70 @@ INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM functiona
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+alltypes_structs
+---- PARTITION_COLUMNS
+year int
+month int
+---- COLUMNS
+id int
+struct_val struct<bool_col:boolean, tinyint_col:tinyint, smallint_col:smallint, int_col:int, bigint_col:bigint, float_col:float, double_col:double, date_string_col:string, string_col:string, timestamp_col:timestamp>
+---- DEPENDENT_LOAD_HIVE
+INSERT INTO {db_name}{db_suffix}.{table_name}
+PARTITION (year, month)
+    SELECT
+        id,
+        named_struct(
+            'bool_col', bool_col,
+            'tinyint_col', tinyint_col,
+            'smallint_col', smallint_col,
+            'int_col', int_col,
+            'bigint_col', bigint_col,
+            'float_col', float_col,
+            'double_col', double_col,
+            'date_string_col', date_string_col,
+            'string_col', string_col,
+            'timestamp_col', timestamp_col),
+        year,
+        month
+    FROM {db_name}{db_suffix}.alltypes;
+---- LOAD
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+complextypes_structs
+---- COLUMNS
+id int
+str string
+alltypes struct<ti:tinyint, si:smallint, i:int, bi:bigint, b:boolean, f:float, do:double, da:date, ts:timestamp, s1:string, s2:string, c1:char(1), c2:char(3), vc:varchar(10), de1:decimal(5, 0), de2:decimal(10, 3)>
+tiny_struct struct<b:boolean>
+small_struct struct<i:int, s:string>
+---- DEPENDENT_LOAD
+`hadoop fs -mkdir -p /test-warehouse/complextypes_structs_parquet && \
+hadoop fs -put -f ${IMPALA_HOME}/testdata/ComplexTypesTbl/structs.parq \
+/test-warehouse/complextypes_structs_parquet/
+---- DEPENDENT_LOAD_ACID
+LOAD DATA LOCAL INPATH '{impala_home}/testdata/ComplexTypesTbl/structs.orc' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
+---- LOAD
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+complextypes_nested_structs
+---- COLUMNS
+id int
+outer_struct struct<str:string,inner_struct1:struct<str:string,de:decimal(8,2)>,inner_struct2:struct<i:int,str:string>,inner_struct3:struct<s:struct<i:int,s:string>>>
+---- DEPENDENT_LOAD
+`hadoop fs -mkdir -p /test-warehouse/complextypes_nested_structs_parquet && \
+hadoop fs -put -f ${IMPALA_HOME}/testdata/ComplexTypesTbl/structs_nested.parq \
+/test-warehouse/complextypes_nested_structs_parquet/
+---- DEPENDENT_LOAD_ACID
+LOAD DATA LOCAL INPATH '{impala_home}/testdata/ComplexTypesTbl/structs_nested.orc' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
+---- LOAD
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 complextypestbl_minor_compacted
 ---- COLUMNS
 id bigint
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index 9afffcf..c359af1 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -97,6 +97,12 @@ table_name:complextypestbl_medium, constraint:restrict_to, table_format:orc/def/
 table_name:complextypestbl_non_transactional, constraint:restrict_to, table_format:orc/def/block
 table_name:pos_item_key_value_complextypestbl, constraint:restrict_to, table_format:orc/def/block
 table_name:pos_item_key_value_complextypestbl, constraint:restrict_to, table_format:parquet/none/none
+table_name:alltypes_structs, constraint:restrict_to, table_format:parquet/none/none
+table_name:alltypes_structs, constraint:restrict_to, table_format:orc/def/block
+table_name:complextypes_structs, constraint:restrict_to, table_format:parquet/none/none
+table_name:complextypes_structs, constraint:restrict_to, table_format:orc/def/block
+table_name:complextypes_nested_structs, constraint:restrict_to, table_format:parquet/none/none
+table_name:complextypes_nested_structs, constraint:restrict_to, table_format:orc/def/block
 
 table_name:alltypeserror, constraint:exclude, table_format:parquet/none/none
 table_name:alltypeserrornonulls, constraint:exclude, table_format:parquet/none/none
diff --git a/testdata/workloads/functional-query/queries/QueryTest/compute-stats-with-structs.test b/testdata/workloads/functional-query/queries/QueryTest/compute-stats-with-structs.test
new file mode 100644
index 0000000..1a5390e
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/compute-stats-with-structs.test
@@ -0,0 +1,35 @@
+====
+---- QUERY
+COMPUTE STATS complextypes_structs
+---- RESULTS
+'Updated 1 partition(s) and 2 column(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+# Checks that that there are no stats generated for struct columns.
+SHOW COLUMN STATS complextypes_structs
+---- RESULTS
+'id','INT',6,0,4,4.0,-1,-1
+'str','STRING',6,0,11,10.3333330154,-1,-1
+'alltypes','STRUCT<ti:TINYINT,si:SMALLINT,i:INT,bi:BIGINT,b:BOOLEAN,f:FLOAT,do:DOUBLE,da:DATE,ts:TIMESTAMP,s1:STRING,s2:STRING,c1:CHAR(1),c2:CHAR(3),vc:VARCHAR(10),de1:DECIMAL(5,0),de2:DECIMAL(10,3)>',-1,-1,-1,-1.0,-1,-1
+'tiny_struct','STRUCT<b:BOOLEAN>',-1,-1,-1,-1.0,-1,-1
+'small_struct','STRUCT<i:INT,s:STRING>',-1,-1,-1,-1.0,-1,-1
+---- TYPES
+STRING,STRING,BIGINT,BIGINT,BIGINT,DOUBLE,BIGINT,BIGINT
+====
+---- QUERY
+COMPUTE STATS complextypes_nested_structs
+---- RESULTS
+'Updated 1 partition(s) and 1 column(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+# Checks that that there are no stats generated for struct columns.
+SHOW COLUMN STATS complextypes_nested_structs
+---- RESULTS
+'id','INT',5,0,4,4.0,-1,-1
+'outer_struct','STRUCT<str:STRING,inner_struct1:STRUCT<str:STRING,de:DECIMAL(8,2)>,inner_struct2:STRUCT<i:INT,str:STRING>,inner_struct3:STRUCT<s:STRUCT<i:INT,s:STRING>>>',-1,-1,-1,-1.0,-1,-1
+---- TYPES
+STRING,STRING,BIGINT,BIGINT,BIGINT,DOUBLE,BIGINT,BIGINT
diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-struct-in-select-list.test b/testdata/workloads/functional-query/queries/QueryTest/nested-struct-in-select-list.test
new file mode 100644
index 0000000..d010cdb
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/nested-struct-in-select-list.test
@@ -0,0 +1,155 @@
+====
+---- QUERY
+# Select a struct that contains multiple structs.
+select id, outer_struct
+from functional_orc_def.complextypes_nested_structs;
+---- RESULTS
+1,'{"str":"somestr1","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":333222111,"str":"somestr3"},"inner_struct3":{"s":{"i":112288,"s":null}}}'
+2,'{"str":"str","inner_struct1":null,"inner_struct2":{"i":100,"str":"str3"},"inner_struct3":{"s":{"i":321,"s":"dfgs"}}}'
+3,'NULL'
+4,'{"str":"","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":1,"str":"string"},"inner_struct3":{"s":null}}'
+5,'{"str":null,"inner_struct1":null,"inner_struct2":null,"inner_struct3":null}'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Select a struct that contains multiple structs using a filter on a non-struct field.
+select id, outer_struct
+from functional_orc_def.complextypes_nested_structs
+where id > 2;
+---- RESULTS
+3,'NULL'
+4,'{"str":"","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":1,"str":"string"},"inner_struct3":{"s":null}}'
+5,'{"str":null,"inner_struct1":null,"inner_struct2":null,"inner_struct3":null}'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Select a struct that contains multiple structs using a filter on a struct field.
+select id, outer_struct
+from functional_orc_def.complextypes_nested_structs
+where length(outer_struct.str) > 3;
+---- RESULTS
+1,'{"str":"somestr1","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":333222111,"str":"somestr3"},"inner_struct3":{"s":{"i":112288,"s":null}}}'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Select a nested struct with an order by.
+select id, outer_struct
+from functional_orc_def.complextypes_nested_structs
+order by id;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+1,'{"str":"somestr1","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":333222111,"str":"somestr3"},"inner_struct3":{"s":{"i":112288,"s":null}}}'
+2,'{"str":"str","inner_struct1":null,"inner_struct2":{"i":100,"str":"str3"},"inner_struct3":{"s":{"i":321,"s":"dfgs"}}}'
+3,'NULL'
+4,'{"str":"","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":1,"str":"string"},"inner_struct3":{"s":null}}'
+5,'{"str":null,"inner_struct1":null,"inner_struct2":null,"inner_struct3":null}'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Select a nested struct with an order by.
+select id, outer_struct
+from functional_orc_def.complextypes_nested_structs
+order by id desc;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+5,'{"str":null,"inner_struct1":null,"inner_struct2":null,"inner_struct3":null}'
+4,'{"str":"","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":1,"str":"string"},"inner_struct3":{"s":null}}'
+3,'NULL'
+2,'{"str":"str","inner_struct1":null,"inner_struct2":{"i":100,"str":"str3"},"inner_struct3":{"s":{"i":321,"s":"dfgs"}}}'
+1,'{"str":"somestr1","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":333222111,"str":"somestr3"},"inner_struct3":{"s":{"i":112288,"s":null}}}'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Select the same nested struct multiple times in one query.
+select id, outer_struct, outer_struct
+from functional_orc_def.complextypes_nested_structs;
+---- RESULTS
+1,'{"str":"somestr1","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":333222111,"str":"somestr3"},"inner_struct3":{"s":{"i":112288,"s":null}}}','{"str":"somestr1","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":333222111,"str":"somestr3"},"inner_struct3":{"s":{"i":112288,"s":null}}}'
+2,'{"str":"str","inner_struct1":null,"inner_struct2":{"i":100,"str":"str3"},"inner_struct3":{"s":{"i":321,"s":"dfgs"}}}','{"str":"str","inner_struct1":null,"inner_struct2":{"i":100,"str":"str3"},"inner_struct3":{"s":{"i":321,"s":"dfgs"}}}'
+3,'NULL','NULL'
+4,'{"str":"","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":1,"str":"string"},"inner_struct3":{"s":null}}','{"str":"","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":1,"str":"string"},"inner_struct3":{"s":null}}'
+5,'{"str":null,"inner_struct1":null,"inner_struct2":null,"inner_struct3":null}','{"str":null,"inner_struct1":null,"inner_struct2":null,"inner_struct3":null}'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Select the same nested struct multiple times in one query and order the results.
+select id, outer_struct, outer_struct
+from functional_orc_def.complextypes_nested_structs
+order by id desc;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+5,'{"str":null,"inner_struct1":null,"inner_struct2":null,"inner_struct3":null}','{"str":null,"inner_struct1":null,"inner_struct2":null,"inner_struct3":null}'
+4,'{"str":"","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":1,"str":"string"},"inner_struct3":{"s":null}}','{"str":"","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":1,"str":"string"},"inner_struct3":{"s":null}}'
+3,'NULL','NULL'
+2,'{"str":"str","inner_struct1":null,"inner_struct2":{"i":100,"str":"str3"},"inner_struct3":{"s":{"i":321,"s":"dfgs"}}}','{"str":"str","inner_struct1":null,"inner_struct2":{"i":100,"str":"str3"},"inner_struct3":{"s":{"i":321,"s":"dfgs"}}}'
+1,'{"str":"somestr1","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":333222111,"str":"somestr3"},"inner_struct3":{"s":{"i":112288,"s":null}}}','{"str":"somestr1","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":333222111,"str":"somestr3"},"inner_struct3":{"s":{"i":112288,"s":null}}}'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Similar to the above query but here the 'id' field is not in the select list but still
+# used in the order by.
+select outer_struct, outer_struct
+from functional_orc_def.complextypes_nested_structs
+order by id desc;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+'{"str":null,"inner_struct1":null,"inner_struct2":null,"inner_struct3":null}','{"str":null,"inner_struct1":null,"inner_struct2":null,"inner_struct3":null}'
+'{"str":"","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":1,"str":"string"},"inner_struct3":{"s":null}}','{"str":"","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":1,"str":"string"},"inner_struct3":{"s":null}}'
+'NULL','NULL'
+'{"str":"str","inner_struct1":null,"inner_struct2":{"i":100,"str":"str3"},"inner_struct3":{"s":{"i":321,"s":"dfgs"}}}','{"str":"str","inner_struct1":null,"inner_struct2":{"i":100,"str":"str3"},"inner_struct3":{"s":{"i":321,"s":"dfgs"}}}'
+'{"str":"somestr1","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":333222111,"str":"somestr3"},"inner_struct3":{"s":{"i":112288,"s":null}}}','{"str":"somestr1","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":333222111,"str":"somestr3"},"inner_struct3":{"s":{"i":112288,"s":null}}}'
+---- TYPES
+STRING,STRING
+====
+---- QUERY
+# WITH clause creates an inline view containing a nested struct.
+with sub as (
+    select id, outer_struct from functional_orc_def.complextypes_nested_structs)
+select sub.id, sub.outer_struct from sub;
+---- RESULTS
+1,'{"str":"somestr1","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":333222111,"str":"somestr3"},"inner_struct3":{"s":{"i":112288,"s":null}}}'
+2,'{"str":"str","inner_struct1":null,"inner_struct2":{"i":100,"str":"str3"},"inner_struct3":{"s":{"i":321,"s":"dfgs"}}}'
+3,'NULL'
+4,'{"str":"","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":1,"str":"string"},"inner_struct3":{"s":null}}'
+5,'{"str":null,"inner_struct1":null,"inner_struct2":null,"inner_struct3":null}'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# WITH clause creates an inline view containing a nested struct. Also has a filter on
+# the inline view and ordering by a non-complex item from the view.
+with sub as (
+    select id, outer_struct
+    from functional_orc_def.complextypes_nested_structs
+    where length(outer_struct.str) > 3)
+select sub.id, sub.outer_struct from sub order by sub.id desc;
+---- RESULTS
+1,'{"str":"somestr1","inner_struct1":{"str":"somestr2","de":12345.12},"inner_struct2":{"i":333222111,"str":"somestr3"},"inner_struct3":{"s":{"i":112288,"s":null}}}'
+---- TYPES
+INT,STRING
+---- QUERY
+# Checks that "SELECT nested_struct.* ..." omits the nested structs from the output.
+select id, outer_struct.* from functional_orc_def.complextypes_nested_structs;
+---- RESULTS
+1,'somestr1'
+2,'str'
+3,'NULL'
+4,''
+5,'NULL'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Subquery that returns a complex type is not supported.
+# IMPALA-9500
+select outer_struct
+from functional_orc_def.complextypes_nested_structs
+where outer_struct in
+(select outer_struct from functional_orc_def.complextypes_nested_structs);
+---- CATCH
+AnalysisException: A subquery can't return complex types. (SELECT outer_struct FROM functional_orc_def.complex
+types_nested_structs)
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking_struct_in_select_list.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking_struct_in_select_list.test
new file mode 100644
index 0000000..70d4f04
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking_struct_in_select_list.test
@@ -0,0 +1,19 @@
+====
+---- QUERY
+# Checking that a primitive column is masked to NULL while there is a struct in the
+# select list.
+# Note1, functional_orc_def is hard-coded here. Once we implement struct direct read
+# support for Parquet as well then we can remove the DB name here. IMPALA-9496
+# Note2, turning off codegen support could be removed once we implement the support for
+# that. IMPALA-10851
+select id, str, alltypes from functional_orc_def.complextypes_structs
+---- RESULTS
+1,'NULL','{"ti":100,"si":12348,"i":156789012,"bi":163234345342,"b":true,"f":1234.56005859375,"do":65323423.33,"da":"2021-05-30","ts":"2021-06-01 10:19:04","s1":"some string","s2":"another str","c1":"x","c2":"xyz","vc":"somevarcha","de1":12345,"de2":null}'
+2,'NULL','{"ti":123,"si":4567,"i":1562322212,"bi":334333345342,"b":false,"f":NaN,"do":23233423.099,"da":null,"ts":"2020-06-11 12:10:04","s1":null,"s2":"NULL","c1":"a","c2":"ab ","vc":"varchar","de1":11223,"de2":null}'
+3,'NULL','{"ti":null,"si":null,"i":null,"bi":null,"b":null,"f":null,"do":null,"da":null,"ts":null,"s1":null,"s2":null,"c1":null,"c2":null,"vc":null,"de1":null,"de2":null}'
+4,'NULL','{"ti":90,"si":30482,"i":1664336,"bi":23567459873,"b":true,"f":0.5600000023841858,"do":NaN,"da":"2000-12-31","ts":"2024-01-01 00:00:00.123400000","s1":"random string","s2":"","c1":"c","c2":"d  ","vc":"addsdrr","de1":33357,"de2":null}'
+5,'NULL','NULL'
+6,'NULL','{"ti":127,"si":100,"i":234732212,"bi":664233223342,"b":true,"f":34.56000137329102,"do":99523423.33,"da":"1985-11-19","ts":"2020-09-15 03:11:22","s1":"string1","s2":"string2","c1":"z","c2":"   ","vc":"cv","de1":346,"de2":6235.600}'
+---- TYPES
+INT,STRING,STRING
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/struct-in-select-list.test b/testdata/workloads/functional-query/queries/QueryTest/struct-in-select-list.test
new file mode 100644
index 0000000..4e9327a
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/struct-in-select-list.test
@@ -0,0 +1,602 @@
+====
+---- QUERY
+# Select a simple struct with one bool member.
+select id, tiny_struct from functional_orc_def.complextypes_structs;
+---- RESULTS
+1,'{"b":true}'
+2,'{"b":false}'
+3,'{"b":true}'
+4,'{"b":null}'
+5,'{"b":false}'
+6,'NULL'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Similar query as above but with an order by.
+select id, tiny_struct from functional_orc_def.complextypes_structs order by id;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+1,'{"b":true}'
+2,'{"b":false}'
+3,'{"b":true}'
+4,'{"b":null}'
+5,'{"b":false}'
+6,'NULL'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Ordering by a member of the struct.
+# Forced to use a SORT node instead of a TOPN.
+set disable_outermost_topn = 1;
+select id, alltypes from functional_orc_def.complextypes_structs
+order by alltypes.ti;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+4,'{"ti":90,"si":30482,"i":1664336,"bi":23567459873,"b":true,"f":0.5600000023841858,"do":NaN,"da":"2000-12-31","ts":"2024-01-01 00:00:00.123400000","s1":"random string","s2":"","c1":"c","c2":"d  ","vc":"addsdrr","de1":33357,"de2":null}'
+1,'{"ti":100,"si":12348,"i":156789012,"bi":163234345342,"b":true,"f":1234.56005859375,"do":65323423.33,"da":"2021-05-30","ts":"2021-06-01 10:19:04","s1":"some string","s2":"another str","c1":"x","c2":"xyz","vc":"somevarcha","de1":12345,"de2":null}'
+2,'{"ti":123,"si":4567,"i":1562322212,"bi":334333345342,"b":false,"f":NaN,"do":23233423.099,"da":null,"ts":"2020-06-11 12:10:04","s1":null,"s2":"NULL","c1":"a","c2":"ab ","vc":"varchar","de1":11223,"de2":null}'
+6,'{"ti":127,"si":100,"i":234732212,"bi":664233223342,"b":true,"f":34.56000137329102,"do":99523423.33,"da":"1985-11-19","ts":"2020-09-15 03:11:22","s1":"string1","s2":"string2","c1":"z","c2":"   ","vc":"cv","de1":346,"de2":6235.600}'
+3,'{"ti":null,"si":null,"i":null,"bi":null,"b":null,"f":null,"do":null,"da":null,"ts":null,"s1":null,"s2":null,"c1":null,"c2":null,"vc":null,"de1":null,"de2":null}'
+5,'NULL'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Querying two simple structs. There is a string in one of them and also a non-struct
+# string in the select list.
+select id, str, tiny_struct, small_struct from functional_orc_def.complextypes_structs;
+---- RESULTS
+1,'first item','{"b":true}','NULL'
+2,'second item','{"b":false}','{"i":19191,"s":"small_struct_str"}'
+3,'third item','{"b":true}','{"i":98765,"s":null}'
+4,'fourth item','{"b":null}','{"i":null,"s":"str"}'
+5,'fifth item','{"b":false}','{"i":98765,"s":"abcde f"}'
+6,'sixth item','NULL','{"i":null,"s":null}'
+---- TYPES
+INT,STRING,STRING,STRING
+====
+---- QUERY
+# Similar query as above but with an order by.
+select id, str, tiny_struct, small_struct
+from functional_orc_def.complextypes_structs
+order by id;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+1,'first item','{"b":true}','NULL'
+2,'second item','{"b":false}','{"i":19191,"s":"small_struct_str"}'
+3,'third item','{"b":true}','{"i":98765,"s":null}'
+4,'fourth item','{"b":null}','{"i":null,"s":"str"}'
+5,'fifth item','{"b":false}','{"i":98765,"s":"abcde f"}'
+6,'sixth item','NULL','{"i":null,"s":null}'
+---- TYPES
+INT,STRING,STRING,STRING
+====
+---- QUERY
+# Querying the same struct multiple times in one query.
+select id, small_struct, small_struct from functional_orc_def.complextypes_structs;
+---- RESULTS
+1,'NULL','NULL'
+2,'{"i":19191,"s":"small_struct_str"}','{"i":19191,"s":"small_struct_str"}'
+3,'{"i":98765,"s":null}','{"i":98765,"s":null}'
+4,'{"i":null,"s":"str"}','{"i":null,"s":"str"}'
+5,'{"i":98765,"s":"abcde f"}','{"i":98765,"s":"abcde f"}'
+6,'{"i":null,"s":null}','{"i":null,"s":null}'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# The same struct multiple times in the select list where there is an ordering in the
+# results.
+select id, tiny_struct, tiny_struct
+from functional_orc_def.complextypes_structs
+order by id desc;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+6,'NULL','NULL'
+5,'{"b":false}','{"b":false}'
+4,'{"b":null}','{"b":null}'
+3,'{"b":true}','{"b":true}'
+2,'{"b":false}','{"b":false}'
+1,'{"b":true}','{"b":true}'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Similar to the above query but here the 'id' field is not in the select list but still
+# used in the order by.
+select tiny_struct, tiny_struct
+from functional_orc_def.complextypes_structs
+order by id desc;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+'NULL','NULL'
+'{"b":false}','{"b":false}'
+'{"b":null}','{"b":null}'
+'{"b":true}','{"b":true}'
+'{"b":false}','{"b":false}'
+'{"b":true}','{"b":true}'
+---- TYPES
+STRING,STRING
+====
+---- QUERY
+# Querying a struct that has all the primitive types as children.
+# There are multiple string columns to check if none of the overwrites the other.
+# There is a row where all the children of the struct are null but the struct is non
+# null. Another row hold a struct that is itself null.
+select id, str, alltypes from functional_orc_def.complextypes_structs;
+---- RESULTS
+1,'first item','{"ti":100,"si":12348,"i":156789012,"bi":163234345342,"b":true,"f":1234.56005859375,"do":65323423.33,"da":"2021-05-30","ts":"2021-06-01 10:19:04","s1":"some string","s2":"another str","c1":"x","c2":"xyz","vc":"somevarcha","de1":12345,"de2":null}'
+2,'second item','{"ti":123,"si":4567,"i":1562322212,"bi":334333345342,"b":false,"f":NaN,"do":23233423.099,"da":null,"ts":"2020-06-11 12:10:04","s1":null,"s2":"NULL","c1":"a","c2":"ab ","vc":"varchar","de1":11223,"de2":null}'
+3,'third item','{"ti":null,"si":null,"i":null,"bi":null,"b":null,"f":null,"do":null,"da":null,"ts":null,"s1":null,"s2":null,"c1":null,"c2":null,"vc":null,"de1":null,"de2":null}'
+4,'fourth item','{"ti":90,"si":30482,"i":1664336,"bi":23567459873,"b":true,"f":0.5600000023841858,"do":NaN,"da":"2000-12-31","ts":"2024-01-01 00:00:00.123400000","s1":"random string","s2":"","c1":"c","c2":"d  ","vc":"addsdrr","de1":33357,"de2":null}'
+5,'fifth item','NULL'
+6,'sixth item','{"ti":127,"si":100,"i":234732212,"bi":664233223342,"b":true,"f":34.56000137329102,"do":99523423.33,"da":"1985-11-19","ts":"2020-09-15 03:11:22","s1":"string1","s2":"string2","c1":"z","c2":"   ","vc":"cv","de1":346,"de2":6235.600}'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Similar query as above but with an order by.
+select id, str, alltypes from functional_orc_def.complextypes_structs order by id;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+1,'first item','{"ti":100,"si":12348,"i":156789012,"bi":163234345342,"b":true,"f":1234.56005859375,"do":65323423.33,"da":"2021-05-30","ts":"2021-06-01 10:19:04","s1":"some string","s2":"another str","c1":"x","c2":"xyz","vc":"somevarcha","de1":12345,"de2":null}'
+2,'second item','{"ti":123,"si":4567,"i":1562322212,"bi":334333345342,"b":false,"f":NaN,"do":23233423.099,"da":null,"ts":"2020-06-11 12:10:04","s1":null,"s2":"NULL","c1":"a","c2":"ab ","vc":"varchar","de1":11223,"de2":null}'
+3,'third item','{"ti":null,"si":null,"i":null,"bi":null,"b":null,"f":null,"do":null,"da":null,"ts":null,"s1":null,"s2":null,"c1":null,"c2":null,"vc":null,"de1":null,"de2":null}'
+4,'fourth item','{"ti":90,"si":30482,"i":1664336,"bi":23567459873,"b":true,"f":0.5600000023841858,"do":NaN,"da":"2000-12-31","ts":"2024-01-01 00:00:00.123400000","s1":"random string","s2":"","c1":"c","c2":"d  ","vc":"addsdrr","de1":33357,"de2":null}'
+5,'fifth item','NULL'
+6,'sixth item','{"ti":127,"si":100,"i":234732212,"bi":664233223342,"b":true,"f":34.56000137329102,"do":99523423.33,"da":"1985-11-19","ts":"2020-09-15 03:11:22","s1":"string1","s2":"string2","c1":"z","c2":"   ","vc":"cv","de1":346,"de2":6235.600}'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Similar query as above but with an order by desc.
+select id, str, alltypes from functional_orc_def.complextypes_structs order by id desc;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+6,'sixth item','{"ti":127,"si":100,"i":234732212,"bi":664233223342,"b":true,"f":34.56000137329102,"do":99523423.33,"da":"1985-11-19","ts":"2020-09-15 03:11:22","s1":"string1","s2":"string2","c1":"z","c2":"   ","vc":"cv","de1":346,"de2":6235.600}'
+5,'fifth item','NULL'
+4,'fourth item','{"ti":90,"si":30482,"i":1664336,"bi":23567459873,"b":true,"f":0.5600000023841858,"do":NaN,"da":"2000-12-31","ts":"2024-01-01 00:00:00.123400000","s1":"random string","s2":"","c1":"c","c2":"d  ","vc":"addsdrr","de1":33357,"de2":null}'
+3,'third item','{"ti":null,"si":null,"i":null,"bi":null,"b":null,"f":null,"do":null,"da":null,"ts":null,"s1":null,"s2":null,"c1":null,"c2":null,"vc":null,"de1":null,"de2":null}'
+2,'second item','{"ti":123,"si":4567,"i":1562322212,"bi":334333345342,"b":false,"f":NaN,"do":23233423.099,"da":null,"ts":"2020-06-11 12:10:04","s1":null,"s2":"NULL","c1":"a","c2":"ab ","vc":"varchar","de1":11223,"de2":null}'
+1,'first item','{"ti":100,"si":12348,"i":156789012,"bi":163234345342,"b":true,"f":1234.56005859375,"do":65323423.33,"da":"2021-05-30","ts":"2021-06-01 10:19:04","s1":"some string","s2":"another str","c1":"x","c2":"xyz","vc":"somevarcha","de1":12345,"de2":null}'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Setting BATCH_SIZE to force the results to fit in multiple row batches.
+set BATCH_SIZE=2;
+select id, str, alltypes from functional_orc_def.complextypes_structs;
+---- RESULTS
+1,'first item','{"ti":100,"si":12348,"i":156789012,"bi":163234345342,"b":true,"f":1234.56005859375,"do":65323423.33,"da":"2021-05-30","ts":"2021-06-01 10:19:04","s1":"some string","s2":"another str","c1":"x","c2":"xyz","vc":"somevarcha","de1":12345,"de2":null}'
+2,'second item','{"ti":123,"si":4567,"i":1562322212,"bi":334333345342,"b":false,"f":NaN,"do":23233423.099,"da":null,"ts":"2020-06-11 12:10:04","s1":null,"s2":"NULL","c1":"a","c2":"ab ","vc":"varchar","de1":11223,"de2":null}'
+3,'third item','{"ti":null,"si":null,"i":null,"bi":null,"b":null,"f":null,"do":null,"da":null,"ts":null,"s1":null,"s2":null,"c1":null,"c2":null,"vc":null,"de1":null,"de2":null}'
+4,'fourth item','{"ti":90,"si":30482,"i":1664336,"bi":23567459873,"b":true,"f":0.5600000023841858,"do":NaN,"da":"2000-12-31","ts":"2024-01-01 00:00:00.123400000","s1":"random string","s2":"","c1":"c","c2":"d  ","vc":"addsdrr","de1":33357,"de2":null}'
+5,'fifth item','NULL'
+6,'sixth item','{"ti":127,"si":100,"i":234732212,"bi":664233223342,"b":true,"f":34.56000137329102,"do":99523423.33,"da":"1985-11-19","ts":"2020-09-15 03:11:22","s1":"string1","s2":"string2","c1":"z","c2":"   ","vc":"cv","de1":346,"de2":6235.600}'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Querying struct in the select list and filter on one member of the struct.
+set BATCH_SIZE=0;
+select id, str, alltypes
+from functional_orc_def.complextypes_structs
+where alltypes.b = true;
+---- RESULTS
+1,'first item','{"ti":100,"si":12348,"i":156789012,"bi":163234345342,"b":true,"f":1234.56005859375,"do":65323423.33,"da":"2021-05-30","ts":"2021-06-01 10:19:04","s1":"some string","s2":"another str","c1":"x","c2":"xyz","vc":"somevarcha","de1":12345,"de2":null}'
+4,'fourth item','{"ti":90,"si":30482,"i":1664336,"bi":23567459873,"b":true,"f":0.5600000023841858,"do":NaN,"da":"2000-12-31","ts":"2024-01-01 00:00:00.123400000","s1":"random string","s2":"","c1":"c","c2":"d  ","vc":"addsdrr","de1":33357,"de2":null}'
+6,'sixth item','{"ti":127,"si":100,"i":234732212,"bi":664233223342,"b":true,"f":34.56000137329102,"do":99523423.33,"da":"1985-11-19","ts":"2020-09-15 03:11:22","s1":"string1","s2":"string2","c1":"z","c2":"   ","vc":"cv","de1":346,"de2":6235.600}'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Query a single struct slot.
+select alltypes from functional_orc_def.complextypes_structs;
+---- RESULTS
+'{"ti":100,"si":12348,"i":156789012,"bi":163234345342,"b":true,"f":1234.56005859375,"do":65323423.33,"da":"2021-05-30","ts":"2021-06-01 10:19:04","s1":"some string","s2":"another str","c1":"x","c2":"xyz","vc":"somevarcha","de1":12345,"de2":null}'
+'{"ti":123,"si":4567,"i":1562322212,"bi":334333345342,"b":false,"f":NaN,"do":23233423.099,"da":null,"ts":"2020-06-11 12:10:04","s1":null,"s2":"NULL","c1":"a","c2":"ab ","vc":"varchar","de1":11223,"de2":null}'
+'{"ti":null,"si":null,"i":null,"bi":null,"b":null,"f":null,"do":null,"da":null,"ts":null,"s1":null,"s2":null,"c1":null,"c2":null,"vc":null,"de1":null,"de2":null}'
+'{"ti":90,"si":30482,"i":1664336,"bi":23567459873,"b":true,"f":0.5600000023841858,"do":NaN,"da":"2000-12-31","ts":"2024-01-01 00:00:00.123400000","s1":"random string","s2":"","c1":"c","c2":"d  ","vc":"addsdrr","de1":33357,"de2":null}'
+'NULL'
+'{"ti":127,"si":100,"i":234732212,"bi":664233223342,"b":true,"f":34.56000137329102,"do":99523423.33,"da":"1985-11-19","ts":"2020-09-15 03:11:22","s1":"string1","s2":"string2","c1":"z","c2":"   ","vc":"cv","de1":346,"de2":6235.600}'
+---- TYPES
+STRING
+====
+---- QUERY
+# Query a single struct slot and order by a member of the struct.
+select alltypes from functional_orc_def.complextypes_structs order by alltypes.si;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+'{"ti":127,"si":100,"i":234732212,"bi":664233223342,"b":true,"f":34.56000137329102,"do":99523423.33,"da":"1985-11-19","ts":"2020-09-15 03:11:22","s1":"string1","s2":"string2","c1":"z","c2":"   ","vc":"cv","de1":346,"de2":6235.600}'
+'{"ti":123,"si":4567,"i":1562322212,"bi":334333345342,"b":false,"f":NaN,"do":23233423.099,"da":null,"ts":"2020-06-11 12:10:04","s1":null,"s2":"NULL","c1":"a","c2":"ab ","vc":"varchar","de1":11223,"de2":null}'
+'{"ti":100,"si":12348,"i":156789012,"bi":163234345342,"b":true,"f":1234.56005859375,"do":65323423.33,"da":"2021-05-30","ts":"2021-06-01 10:19:04","s1":"some string","s2":"another str","c1":"x","c2":"xyz","vc":"somevarcha","de1":12345,"de2":null}'
+'{"ti":90,"si":30482,"i":1664336,"bi":23567459873,"b":true,"f":0.5600000023841858,"do":NaN,"da":"2000-12-31","ts":"2024-01-01 00:00:00.123400000","s1":"random string","s2":"","c1":"c","c2":"d  ","vc":"addsdrr","de1":33357,"de2":null}'
+'{"ti":null,"si":null,"i":null,"bi":null,"b":null,"f":null,"do":null,"da":null,"ts":null,"s1":null,"s2":null,"c1":null,"c2":null,"vc":null,"de1":null,"de2":null}'
+'NULL'
+---- TYPES
+STRING
+====
+---- QUERY
+# Query struct slots only.
+select small_struct, alltypes from functional_orc_def.complextypes_structs;
+---- RESULTS
+'NULL','{"ti":100,"si":12348,"i":156789012,"bi":163234345342,"b":true,"f":1234.56005859375,"do":65323423.33,"da":"2021-05-30","ts":"2021-06-01 10:19:04","s1":"some string","s2":"another str","c1":"x","c2":"xyz","vc":"somevarcha","de1":12345,"de2":null}'
+'{"i":19191,"s":"small_struct_str"}','{"ti":123,"si":4567,"i":1562322212,"bi":334333345342,"b":false,"f":NaN,"do":23233423.099,"da":null,"ts":"2020-06-11 12:10:04","s1":null,"s2":"NULL","c1":"a","c2":"ab ","vc":"varchar","de1":11223,"de2":null}'
+'{"i":98765,"s":null}','{"ti":null,"si":null,"i":null,"bi":null,"b":null,"f":null,"do":null,"da":null,"ts":null,"s1":null,"s2":null,"c1":null,"c2":null,"vc":null,"de1":null,"de2":null}'
+'{"i":null,"s":"str"}','{"ti":90,"si":30482,"i":1664336,"bi":23567459873,"b":true,"f":0.5600000023841858,"do":NaN,"da":"2000-12-31","ts":"2024-01-01 00:00:00.123400000","s1":"random string","s2":"","c1":"c","c2":"d  ","vc":"addsdrr","de1":33357,"de2":null}'
+'{"i":98765,"s":"abcde f"}','NULL'
+'{"i":null,"s":null}','{"ti":127,"si":100,"i":234732212,"bi":664233223342,"b":true,"f":34.56000137329102,"do":99523423.33,"da":"1985-11-19","ts":"2020-09-15 03:11:22","s1":"string1","s2":"string2","c1":"z","c2":"   ","vc":"cv","de1":346,"de2":6235.600}'
+---- TYPES
+STRING,STRING
+====
+---- QUERY
+# Query struct slot in a join query.
+select allt.id, comt.alltypes
+from functional_orc_def.alltypes allt
+join functional_orc_def.complextypes_structs comt on allt.id = comt.id;
+---- RESULTS
+1,'{"ti":100,"si":12348,"i":156789012,"bi":163234345342,"b":true,"f":1234.56005859375,"do":65323423.33,"da":"2021-05-30","ts":"2021-06-01 10:19:04","s1":"some string","s2":"another str","c1":"x","c2":"xyz","vc":"somevarcha","de1":12345,"de2":null}'
+2,'{"ti":123,"si":4567,"i":1562322212,"bi":334333345342,"b":false,"f":NaN,"do":23233423.099,"da":null,"ts":"2020-06-11 12:10:04","s1":null,"s2":"NULL","c1":"a","c2":"ab ","vc":"varchar","de1":11223,"de2":null}'
+3,'{"ti":null,"si":null,"i":null,"bi":null,"b":null,"f":null,"do":null,"da":null,"ts":null,"s1":null,"s2":null,"c1":null,"c2":null,"vc":null,"de1":null,"de2":null}'
+4,'{"ti":90,"si":30482,"i":1664336,"bi":23567459873,"b":true,"f":0.5600000023841858,"do":NaN,"da":"2000-12-31","ts":"2024-01-01 00:00:00.123400000","s1":"random string","s2":"","c1":"c","c2":"d  ","vc":"addsdrr","de1":33357,"de2":null}'
+5,'NULL'
+6,'{"ti":127,"si":100,"i":234732212,"bi":664233223342,"b":true,"f":34.56000137329102,"do":99523423.33,"da":"1985-11-19","ts":"2020-09-15 03:11:22","s1":"string1","s2":"string2","c1":"z","c2":"   ","vc":"cv","de1":346,"de2":6235.600}'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Similar join query as above but with different join order.
+select allt.id, comt.alltypes
+from functional_orc_def.complextypes_structs comt
+join functional_orc_def.alltypes allt on comt.id = allt.id;
+---- RESULTS
+1,'{"ti":100,"si":12348,"i":156789012,"bi":163234345342,"b":true,"f":1234.56005859375,"do":65323423.33,"da":"2021-05-30","ts":"2021-06-01 10:19:04","s1":"some string","s2":"another str","c1":"x","c2":"xyz","vc":"somevarcha","de1":12345,"de2":null}'
+2,'{"ti":123,"si":4567,"i":1562322212,"bi":334333345342,"b":false,"f":NaN,"do":23233423.099,"da":null,"ts":"2020-06-11 12:10:04","s1":null,"s2":"NULL","c1":"a","c2":"ab ","vc":"varchar","de1":11223,"de2":null}'
+3,'{"ti":null,"si":null,"i":null,"bi":null,"b":null,"f":null,"do":null,"da":null,"ts":null,"s1":null,"s2":null,"c1":null,"c2":null,"vc":null,"de1":null,"de2":null}'
+4,'{"ti":90,"si":30482,"i":1664336,"bi":23567459873,"b":true,"f":0.5600000023841858,"do":NaN,"da":"2000-12-31","ts":"2024-01-01 00:00:00.123400000","s1":"random string","s2":"","c1":"c","c2":"d  ","vc":"addsdrr","de1":33357,"de2":null}'
+5,'NULL'
+6,'{"ti":127,"si":100,"i":234732212,"bi":664233223342,"b":true,"f":34.56000137329102,"do":99523423.33,"da":"1985-11-19","ts":"2020-09-15 03:11:22","s1":"string1","s2":"string2","c1":"z","c2":"   ","vc":"cv","de1":346,"de2":6235.600}'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Querying IS NULL on a member of a struct.
+select id, str, alltypes
+from functional_orc_def.complextypes_structs
+where alltypes.da is null;
+---- RESULTS
+2,'second item','{"ti":123,"si":4567,"i":1562322212,"bi":334333345342,"b":false,"f":NaN,"do":23233423.099,"da":null,"ts":"2020-06-11 12:10:04","s1":null,"s2":"NULL","c1":"a","c2":"ab ","vc":"varchar","de1":11223,"de2":null}'
+3,'third item','{"ti":null,"si":null,"i":null,"bi":null,"b":null,"f":null,"do":null,"da":null,"ts":null,"s1":null,"s2":null,"c1":null,"c2":null,"vc":null,"de1":null,"de2":null}'
+5,'fifth item','NULL'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Querying a struct that is inside a nested array. Directly referencing the inner array
+# in the FROM clause. This also triggers a re-analysis of the statement as the table is
+# full ACID.
+select inner_arr.ITEM, inner_arr.ITEM.e, inner_arr.ITEM.f
+from functional_orc_def.complextypestbl.nested_struct.c.d.ITEM as inner_arr;
+---- RESULTS
+'{"e":-1,"f":"nonnullable"}',-1,'nonnullable'
+'{"e":10,"f":"aaa"}',10,'aaa'
+'{"e":-10,"f":"bbb"}',-10,'bbb'
+'{"e":11,"f":"c"}',11,'c'
+'{"e":null,"f":null}',NULL,'NULL'
+'{"e":10,"f":"aaa"}',10,'aaa'
+'{"e":null,"f":null}',NULL,'NULL'
+'{"e":-10,"f":"bbb"}',-10,'bbb'
+'{"e":null,"f":null}',NULL,'NULL'
+'{"e":11,"f":"c"}',11,'c'
+'NULL',NULL,'NULL'
+'NULL',NULL,'NULL'
+---- TYPES
+STRING,INT,STRING
+====
+---- QUERY
+# Querying a struct that is inside a nested array. Referencing the inner array through a
+# join with the base table.
+select tbl.id, inner_arr.ITEM
+from functional_orc_def.complextypestbl tbl, tbl.nested_struct.c.d.ITEM as inner_arr;
+---- RESULTS
+8,'{"e":-1,"f":"nonnullable"}'
+1,'{"e":10,"f":"aaa"}'
+1,'{"e":-10,"f":"bbb"}'
+1,'{"e":11,"f":"c"}'
+2,'{"e":null,"f":null}'
+2,'{"e":10,"f":"aaa"}'
+2,'{"e":null,"f":null}'
+2,'{"e":-10,"f":"bbb"}'
+2,'{"e":null,"f":null}'
+2,'{"e":11,"f":"c"}'
+2,'NULL'
+7,'NULL'
+---- TYPES
+BIGINT,STRING
+====
+---- QUERY
+# Querying a struct that is inside a nested array. Used 2 joins to reference the inner
+# array from the FROM clause.
+select tbl.id, inner_arr.ITEM
+from functional_orc_def.complextypestbl tbl,
+    tbl.nested_struct.c.d as outer_arr, outer_arr.ITEM as inner_arr;
+---- RESULTS
+8,'{"e":-1,"f":"nonnullable"}'
+1,'{"e":10,"f":"aaa"}'
+1,'{"e":-10,"f":"bbb"}'
+1,'{"e":11,"f":"c"}'
+2,'{"e":null,"f":null}'
+2,'{"e":10,"f":"aaa"}'
+2,'{"e":null,"f":null}'
+2,'{"e":-10,"f":"bbb"}'
+2,'{"e":null,"f":null}'
+2,'{"e":11,"f":"c"}'
+2,'NULL'
+7,'NULL'
+---- TYPES
+BIGINT,STRING
+====
+---- QUERY
+# Querying a struct that is inside a nested array. Used different kind of joins to
+# reference the inner array from the FROM clause.
+select tbl.id, inner_arr.ITEM
+from functional_orc_def.complextypestbl tbl left join
+    tbl.nested_struct.c.d as outer_arr inner join outer_arr.ITEM as inner_arr;
+---- RESULTS
+8,'{"e":-1,"f":"nonnullable"}'
+1,'{"e":10,"f":"aaa"}'
+1,'{"e":-10,"f":"bbb"}'
+1,'{"e":11,"f":"c"}'
+2,'{"e":null,"f":null}'
+2,'{"e":10,"f":"aaa"}'
+2,'{"e":null,"f":null}'
+2,'{"e":-10,"f":"bbb"}'
+2,'{"e":null,"f":null}'
+2,'{"e":11,"f":"c"}'
+2,'NULL'
+7,'NULL'
+---- TYPES
+BIGINT,STRING
+====
+---- QUERY
+# Similar query as above but with an order by.
+select tbl.id, inner_arr.ITEM
+from functional_orc_def.complextypestbl tbl,
+    tbl.nested_struct.c.d as outer_arr, outer_arr.ITEM as inner_arr
+order by tbl.id;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+1,'{"e":10,"f":"aaa"}'
+1,'{"e":-10,"f":"bbb"}'
+1,'{"e":11,"f":"c"}'
+2,'{"e":null,"f":null}'
+2,'{"e":10,"f":"aaa"}'
+2,'{"e":null,"f":null}'
+2,'{"e":-10,"f":"bbb"}'
+2,'{"e":null,"f":null}'
+2,'{"e":11,"f":"c"}'
+2,'NULL'
+7,'NULL'
+8,'{"e":-1,"f":"nonnullable"}'
+---- TYPES
+BIGINT,STRING
+====
+---- QUERY
+# Structs are allowed in an inline view.
+select v.ts from
+  (select tiny_struct as ts from functional_orc_def.complextypes_structs) v
+---- RESULTS
+'{"b":true}'
+'{"b":false}'
+'{"b":true}'
+'{"b":null}'
+'{"b":false}'
+'NULL'
+---- TYPES
+STRING
+====
+---- QUERY
+# Structs in an inline view where the underying file format is not supported for structs.
+select v.ts from
+  (select int_struct_col as ts from functional.allcomplextypes) v
+---- CATCH
+AnalysisException: Querying STRUCT is only supported for ORC file format.
+====
+---- QUERY
+# Structs in an inline view with order by.
+select v.id, v.ts from
+  (select id, tiny_struct as ts
+      from functional_orc_def.complextypes_structs
+      order by id
+      limit 3) v
+---- RESULTS
+1,'{"b":true}'
+2,'{"b":false}'
+3,'{"b":true}'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+select v.id, v.ts from
+  (select id, tiny_struct as ts
+      from functional_orc_def.complextypes_structs
+      order by id
+      limit 3) v
+order by id desc
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+3,'{"b":true}'
+2,'{"b":false}'
+1,'{"b":true}'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+select v.id, v.ts from
+  (select id, tiny_struct as ts
+      from functional_orc_def.complextypes_structs) v
+order by id desc
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+6,'NULL'
+5,'{"b":false}'
+4,'{"b":null}'
+3,'{"b":true}'
+2,'{"b":false}'
+1,'{"b":true}'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# CREATE VIEW AS SELECT where the select returns struct.
+create view $DATABASE.struct_view as select id, small_struct
+from functional_orc_def.complextypes_structs;
+select id, small_struct from $DATABASE.struct_view;
+---- RESULTS
+1,'NULL'
+2,'{"i":19191,"s":"small_struct_str"}'
+3,'{"i":98765,"s":null}'
+4,'{"i":null,"s":"str"}'
+5,'{"i":98765,"s":"abcde f"}'
+6,'{"i":null,"s":null}'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# WITH clause creates an inline view containing a struct.
+with sub as (select id, small_struct from functional_orc_def.complextypes_structs)
+select sub.id, sub.small_struct from sub;
+---- RESULTS
+1,'NULL'
+2,'{"i":19191,"s":"small_struct_str"}'
+3,'{"i":98765,"s":null}'
+4,'{"i":null,"s":"str"}'
+5,'{"i":98765,"s":"abcde f"}'
+6,'{"i":null,"s":null}'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# WITH clause creates an inline view containing a struct. Also has a filter on the inline
+# view and ordering by a non-complex item from the view.
+with sub as (
+    select id, small_struct
+    from functional_orc_def.complextypes_structs
+    where small_struct.i > 19200)
+select sub.id, sub.small_struct from sub order by sub.id desc;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+5,'{"i":98765,"s":"abcde f"}'
+3,'{"i":98765,"s":null}'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Create a view containing structs and query the view.
+create view tmp_view as
+    select id, str, tiny_struct, alltypes from functional_orc_def.complextypes_structs;
+select id, alltypes, tiny_struct from tmp_view;
+---- RESULTS
+1,'{"ti":100,"si":12348,"i":156789012,"bi":163234345342,"b":true,"f":1234.56005859375,"do":65323423.33,"da":"2021-05-30","ts":"2021-06-01 10:19:04","s1":"some string","s2":"another str","c1":"x","c2":"xyz","vc":"somevarcha","de1":12345,"de2":null}','{"b":true}'
+2,'{"ti":123,"si":4567,"i":1562322212,"bi":334333345342,"b":false,"f":NaN,"do":23233423.099,"da":null,"ts":"2020-06-11 12:10:04","s1":null,"s2":"NULL","c1":"a","c2":"ab ","vc":"varchar","de1":11223,"de2":null}','{"b":false}'
+3,'{"ti":null,"si":null,"i":null,"bi":null,"b":null,"f":null,"do":null,"da":null,"ts":null,"s1":null,"s2":null,"c1":null,"c2":null,"vc":null,"de1":null,"de2":null}','{"b":true}'
+4,'{"ti":90,"si":30482,"i":1664336,"bi":23567459873,"b":true,"f":0.5600000023841858,"do":NaN,"da":"2000-12-31","ts":"2024-01-01 00:00:00.123400000","s1":"random string","s2":"","c1":"c","c2":"d  ","vc":"addsdrr","de1":33357,"de2":null}','{"b":null}'
+5,'NULL','{"b":false}'
+6,'{"ti":127,"si":100,"i":234732212,"bi":664233223342,"b":true,"f":34.56000137329102,"do":99523423.33,"da":"1985-11-19","ts":"2020-09-15 03:11:22","s1":"string1","s2":"string2","c1":"z","c2":"   ","vc":"cv","de1":346,"de2":6235.600}','NULL'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Query a struct from a partitioned table to check multi-fragment execution.
+set disable_outermost_topn = 1;
+select id, struct_val from functional_orc_def.alltypes_structs order by id desc limit 5;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+7299,'{"bool_col":false,"tinyint_col":9,"smallint_col":9,"int_col":9,"bigint_col":90,"float_col":9.899999618530273,"double_col":90.89999999999999,"date_string_col":"12/31/10","string_col":"9","timestamp_col":"2010-12-31 05:09:13.860000000"}'
+7298,'{"bool_col":true,"tinyint_col":8,"smallint_col":8,"int_col":8,"bigint_col":80,"float_col":8.800000190734863,"double_col":80.8,"date_string_col":"12/31/10","string_col":"8","timestamp_col":"2010-12-31 05:08:13.780000000"}'
+7297,'{"bool_col":false,"tinyint_col":7,"smallint_col":7,"int_col":7,"bigint_col":70,"float_col":7.699999809265137,"double_col":70.7,"date_string_col":"12/31/10","string_col":"7","timestamp_col":"2010-12-31 05:07:13.710000000"}'
+7296,'{"bool_col":true,"tinyint_col":6,"smallint_col":6,"int_col":6,"bigint_col":60,"float_col":6.599999904632568,"double_col":60.59999999999999,"date_string_col":"12/31/10","string_col":"6","timestamp_col":"2010-12-31 05:06:13.650000000"}'
+7295,'{"bool_col":false,"tinyint_col":5,"smallint_col":5,"int_col":5,"bigint_col":50,"float_col":5.5,"double_col":50.5,"date_string_col":"12/31/10","string_col":"5","timestamp_col":"2010-12-31 05:05:13.600000000"}'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Query the same struct multiple times from a partitioned table.
+select id, struct_val, struct_val from functional_orc_def.alltypes_structs order by id limit 2;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+0,'{"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":"01/01/09","string_col":"0","timestamp_col":"2009-01-01 00:00:00"}','{"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":"01/01/09","string_col":"0","timestamp_col":"2009-01-01 00:00:00"}'
+1,'{"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":"01/01/09","string_col":"1","timestamp_col":"2009-01-01 00:01:00"}','{"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":"01/01/09","string_col":"1","timestamp_col":"2009-01-01 00:01:00"}'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Query struct from a partitioned table with where clause on the struct's members.
+select id, struct_val
+from functional_orc_def.alltypes_structs
+where struct_val.tinyint_col=8 and struct_val.timestamp_col > "2010-12-30";
+---- RESULTS
+7288,'{"bool_col":true,"tinyint_col":8,"smallint_col":8,"int_col":8,"bigint_col":80,"float_col":8.800000190734863,"double_col":80.8,"date_string_col":"12/30/10","string_col":"8","timestamp_col":"2010-12-30 04:58:13.330000000"}'
+7298,'{"bool_col":true,"tinyint_col":8,"smallint_col":8,"int_col":8,"bigint_col":80,"float_col":8.800000190734863,"double_col":80.8,"date_string_col":"12/31/10","string_col":"8","timestamp_col":"2010-12-31 05:08:13.780000000"}'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# It's not supported to create a view with structs from a table type that doesn't
+# support selecting structs.
+create view tmp_view as select id, int_struct_col from functional_hbase.allcomplextypes;
+---- CATCH
+is not supported when querying STRUCT type STRUCT<f1:INT,f2:INT>
+====
+---- QUERY
+# It's not supported to create a view with structs from a file format that doesn't
+# support selecting structs.
+create view tmp_view as select id, int_struct_col from functional.allcomplextypes;
+---- CATCH
+AnalysisException: Querying STRUCT is only supported for ORC file format.
+====
+---- QUERY
+# Querying IS NULL on a struct is not supported.
+# IMPALA-3060
+select id, str, alltypes
+from functional_orc_def.complextypes_structs
+where alltypes is null;
+---- CATCH
+AnalysisException: IS NULL predicate does not support complex types: alltypes IS NULL
+====
+---- QUERY
+# Subquery that returns a complex type is not supported.
+# IMPALA-9500
+select alltypes
+from functional_orc_def.complextypes_structs
+where alltypes in (select alltypes from functional_orc_def.complextypes_structs);
+---- CATCH
+AnalysisException: A subquery can't return complex types. (SELECT alltypes FROM functional_orc_def.complextypes_structs)
+====
+---- QUERY
+select tbl.nested_struct from functional_orc_def.complextypestbl tbl;
+---- CATCH
+AnalysisException: Struct containing a collection type is not allowed in the select list.
+====
+---- QUERY
+select tbl.nested_struct.c from functional_orc_def.complextypestbl tbl;
+---- CATCH
+AnalysisException: Struct containing a collection type is not allowed in the select list.
+====
+---- QUERY
+# Unioning structs is not supported.
+# IMPALA-10752
+select id, tiny_struct from functional_orc_def.complextypes_structs
+union all
+select id, tiny_struct from functional_orc_def.complextypes_structs;
+---- CATCH
+AnalysisException: Set operations don't support STRUCT type. STRUCT<b:BOOLEAN> in tiny_struct
+====
+---- QUERY
+# Ordering by struct column is not supported.
+select id, tiny_struct from functional_orc_def.complextypes_structs
+order by tiny_struct
+---- CATCH
+AnalysisException: ORDER BY expression 'tiny_struct' with complex type 'STRUCT<b:BOOLEAN>' is not supported.
+====
+---- QUERY
+# Ordering by struct column (using the index of the column) is not supported.
+select id, tiny_struct from functional_orc_def.complextypes_structs
+order by 2
+---- CATCH
+AnalysisException: ORDER BY expression 'tiny_struct' with complex type 'STRUCT<b:BOOLEAN>' is not supported.
+====
+---- QUERY
+# Check that the order by don't confuse the 3rd column with the member of the struct.
+select id, tiny_struct from functional_orc_def.complextypes_structs
+order by 3
+---- CATCH
+AnalysisException: ORDER BY: ordinal exceeds the number of items in the SELECT list: 3
diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py
index 0ebc850..e4f639c 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -29,6 +29,8 @@ from getpass import getuser
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.skip import (SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon,
                                SkipIfLocal, SkipIfHive2, SkipIfGCS)
+from tests.common.test_dimensions import (create_client_protocol_dimension,
+    create_exec_option_dimension, create_orc_dimension)
 from tests.util.hdfs_util import NAMENODE
 from tests.util.calculation_util import get_random_id
 
@@ -1573,3 +1575,48 @@ class TestRangerColumnMaskingTpchNested(CustomClusterTestSuite):
         for col in tbl_cols[tbl]:
           policy_name = "%s_%s_mask" % (tbl, col)
           TestRanger._remove_policy(policy_name)
+
+
+class TestRangerColumnMaskingComplexTypesInSelectList(CustomClusterTestSuite):
+  """
+  Tests Ranger policies when complex types are given in the select list. The reason
+  this is a separate class is that directly querying complex types works only on HS2
+  while some tests in TestRanger needs Beeswax interface otherwise some of them fails.
+  """
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
+    cls.ImpalaTestMatrix.add_dimension(create_orc_dimension(cls.get_workload()))
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('protocol') == 'hs2')
+    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
+        disable_codegen_options=[True]))
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
+  def test_column_masking_with_structs_in_select_list(self, vector, unique_name):
+    user = getuser()
+    db = "functional_orc_def"
+    # Create another client for admin user since current user doesn't have privileges to
+    # create/drop databases or refresh authorization.
+    admin_client = self.create_impala_client()
+    policy_cnt = 0
+    try:
+      # Add a policy on a primitive column of a table which contains nested columns.
+      TestRanger._add_column_masking_policy(
+          unique_name + str(policy_cnt), user, "functional_orc_def",
+          "complextypes_structs", "str", "MASK_NULL")
+      policy_cnt += 1
+      self.execute_query_expect_success(admin_client, "refresh authorization",
+          user=ADMIN)
+      self.run_test_case("QueryTest/ranger_column_masking_struct_in_select_list", vector,
+          use_db=db)
+    finally:
+      for i in range(policy_cnt):
+        TestRanger._remove_policy(unique_name + str(i))
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index e0c100e..4cc30aa 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -109,6 +109,11 @@ def create_parquet_dimension(workload):
       TableFormatInfo.create_from_string(dataset, 'parquet/none'))
 
 
+def create_orc_dimension(workload):
+  dataset = get_dataset_from_workload(workload)
+  return ImpalaTestDimension('table_format',
+      TableFormatInfo.create_from_string(dataset, 'orc/def'))
+
 def create_avro_snappy_dimension(workload):
   dataset = get_dataset_from_workload(workload)
   return ImpalaTestDimension('table_format',
diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py
index fd1189b..f00b697 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -34,6 +34,8 @@ from tests.common.skip import (
     SkipIfLocal,
     SkipIfNotHdfsMinicluster
     )
+from tests.common.test_dimensions import (create_exec_option_dimension,
+    create_exec_option_dimension_from_dict, create_client_protocol_dimension)
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.filesystem_utils import WAREHOUSE, get_fs_path, IS_HDFS
 
@@ -112,6 +114,85 @@ class TestNestedTypes(ImpalaTestSuite):
                        use_db='tpch_nested' + db_suffix)
 
 
+class TestNestedTypesInSelectList(ImpalaTestSuite):
+  """Functional tests for nested types provided in the select list."""
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestNestedTypesInSelectList, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format in ['parquet', 'orc'])
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('mt_dop', 0, 2))
+    cls.ImpalaTestMatrix.add_dimension(
+        create_exec_option_dimension_from_dict({
+            'disable_codegen': ['False', 'True']}))
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('protocol') == 'hs2')
+
+  def test_struct_in_select_list(self, vector, unique_database):
+    """Queries where a struct column is in the select list"""
+    if vector.get_value('table_format').file_format == 'parquet':
+      pytest.skip()
+    if vector.get_value('exec_option')['disable_codegen'] == 'False':
+      pytest.skip()
+    self.run_test_case('QueryTest/struct-in-select-list', vector, unique_database)
+
+  def test_nested_struct_in_select_list(self, vector, unique_database):
+    """Queries where a nested struct column is in the select list"""
+    if vector.get_value('table_format').file_format == 'parquet':
+      pytest.skip()
+    if vector.get_value('exec_option')['disable_codegen'] == 'False':
+      pytest.skip()
+    self.run_test_case('QueryTest/nested-struct-in-select-list', vector, unique_database)
+
+
+# Moved this to a separate test class from TestNestedTypesInSelectList because this needs
+# a narrower test vector.
+class TestNestedTypesInSelectListWithBeeswax(ImpalaTestSuite):
+  """Functional tests for nested types provided in the select list."""
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('protocol') == 'beeswax')
+    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
+        disable_codegen_options=[True]))
+
+  def test_struct_with_beeswax(self, vector):
+    expected_err = "Returning complex types is not supported through the beeswax " + \
+        "interface"
+    err = self.execute_query_expect_failure(self.client,
+        "select tiny_struct from functional_orc_def.complextypes_structs",
+        vector.get_value('exec_option'))
+    assert expected_err in str(err)
+
+
+class TestComputeStatsWithNestedTypes(ImpalaTestSuite):
+  """Functional tests for running compute stats on tables that have nested types in the
+  columns."""
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestComputeStatsWithNestedTypes, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format in ['parquet', 'orc'])
+
+  def test_compute_stats_with_structs(self, vector):
+    """COMPUTE STATS and SHOW COLUMN STATS for tables with structs"""
+    self.run_test_case('QueryTest/compute-stats-with-structs', vector)
+
 class TestNestedTypesNoMtDop(ImpalaTestSuite):
   """Functional tests for nested types that do not need to be run with mt_dop > 0."""
   @classmethod