You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2021/11/24 06:37:37 UTC

[impala] 01/03: IMPALA-10920: Zipping unnest for arrays

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit df528fe2b108600c9c39c345bad52d0de076a4f5
Author: Gabor Kaszab <ga...@cloudera.com>
AuthorDate: Mon Oct 11 17:01:38 2021 +0200

    IMPALA-10920: Zipping unnest for arrays
    
    This patch provides an unnest implementation for arrays where unnesting
    multiple arrays in one query results the items of the arrays being
    zipped together instead of joining. There are two different syntaxes
    introduced for this purpose:
    
    1: ISO:SQL 2016 compliant syntax:
    SELECT a1.item, a2.item
    FROM complextypes_arrays t, UNNEST(t.arr1, t.arr2) AS (a1, a2);
    
    2: Postgres compatible syntax:
    SELECT UNNEST(arr1), UNNEST(arr2) FROM complextypes_arrays;
    
    Let me show the expected behaviour through the following example:
    Inputs: arr1: {1,2,3}, arr2: {11, 12}
    After running any of the above queries we expect the following output:
    ===============
    | arr1 | arr2 |
    ===============
    | 1    | 11   |
    | 2    | 12   |
    | 3    | NULL |
    ===============
    
    Expected behaviour:
     - When unnesting multiple arrays with zipping unnest then the 'i'th
       item of one array will be put next to the 'i'th item of the other
       arrays in the results.
     - In case the size of the arrays is not the same then the shorter
       arrays will be filled with NULL values up to the size of the longest
       array.
    
    On a sidenote, UNNEST is added to Impala's SQL language as a new
    keyword. This might interfere with use cases where a resource (db,
    table, column, etc.) is named "UNNEST".
    
    Restrictions:
     - It is not allowed to have WHERE filters on an unnested item of an
       array in the same SELECT query. E.g. this is not allowed:
       SELECT arr1.item
       FROM complextypes_arrays t, UNNEST(t.arr1) WHERE arr1.item < 5;
    
       Note, that it is allowed to have an outer SELECT around the one
       doing unnests and have a filter there on the unnested items.
     - If there is an outer SELECT filtering on the unnested array's items
       from the inner SELECT then these predicates won't be pushed down to
       the SCAN node. They are rather evaluated in the UNNEST node to
       guarantee result correctness after unnesting.
       Note, this restriction is only active when there are multiple arrays
       being unnested, or in other words when zipping unnest logic is
       required to produce results.
     - It's not allowed to do a zipping and a (traditional) joining unnest
       together in one SELECT query.
     - It's not allowed to perform zipping unnests on arrays from different
       tables.
    
    Testing:
     - Added a bunch of E2E tests to the test suite to cover both syntaxes.
     - Did a manual test run on a table with 1000 rows, 3 array columns
       with size of around 5000 items in each array. I did an unnest on all
       three arrays in one query to see if there are any crashes or
       suspicious slowness when running on this scale.
    
    Change-Id: Ic58ff6579ecff03962e7a8698edfbe0684ce6cf7
    Reviewed-on: http://gerrit.cloudera.org:8080/17983
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/unnest-node.cc                         | 162 +++++---
 be/src/exec/unnest-node.h                          |  96 +++--
 common/thrift/PlanNodes.thrift                     |   6 +-
 fe/src/main/cup/sql-parser.cup                     |  76 +++-
 .../apache/impala/analysis/AnalysisContext.java    |  20 +
 .../java/org/apache/impala/analysis/Analyzer.java  |  37 ++
 .../org/apache/impala/analysis/FromClause.java     |  55 ++-
 .../org/apache/impala/analysis/SelectStmt.java     |  28 ++
 .../java/org/apache/impala/analysis/SlotRef.java   |  16 +-
 .../org/apache/impala/analysis/StmtRewriter.java   |  18 +
 .../java/org/apache/impala/analysis/TableRef.java  |  20 +-
 .../apache/impala/analysis/TupleDescriptor.java    |   8 +-
 .../org/apache/impala/analysis/UnnestExpr.java     | 150 ++++++++
 .../org/apache/impala/planner/HdfsScanNode.java    |  10 +
 .../java/org/apache/impala/planner/PlanNode.java   |  10 +
 .../apache/impala/planner/SingleNodePlanner.java   |  54 ++-
 .../java/org/apache/impala/planner/UnnestNode.java |  59 ++-
 fe/src/main/jflex/sql-scanner.flex                 |   3 +-
 .../apache/impala/analysis/AnalyzeStmtsTest.java   |   2 +-
 .../org/apache/impala/analysis/ParserTest.java     |  16 +-
 .../java/org/apache/impala/analysis/ToSqlTest.java |  38 ++
 .../authorization/AuthorizationStmtTest.java       |  25 ++
 testdata/ComplexTypesTbl/arrays.orc                | Bin 0 -> 614 bytes
 testdata/ComplexTypesTbl/arrays.parq               | Bin 0 -> 1087 bytes
 testdata/data/README                               |   6 +
 .../functional/functional_schema_template.sql      |  16 +
 .../datasets/functional/schema_constraints.csv     |   2 +
 .../QueryTest/zipping-unnest-from-view.test        |  57 +++
 .../QueryTest/zipping-unnest-in-from-clause.test   | 411 +++++++++++++++++++++
 .../QueryTest/zipping-unnest-in-select-list.test   | 187 ++++++++++
 tests/query_test/test_nested_types.py              |  30 ++
 31 files changed, 1480 insertions(+), 138 deletions(-)

diff --git a/be/src/exec/unnest-node.cc b/be/src/exec/unnest-node.cc
index dd91566..a1f75f5 100644
--- a/be/src/exec/unnest-node.cc
+++ b/be/src/exec/unnest-node.cc
@@ -17,6 +17,8 @@
 
 #include "exec/unnest-node.h"
 
+#include <algorithm>
+
 #include "common/status.h"
 #include "exec/exec-node.inline.h"
 #include "exec/exec-node-util.h"
@@ -40,7 +42,9 @@ Status UnnestPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
 }
 
 void UnnestPlanNode::Close() {
-  if (collection_expr_ != nullptr) collection_expr_->Close();
+  for (auto coll_expr : collection_exprs_) {
+    if (coll_expr != nullptr) coll_expr->Close();
+  }
   PlanNode::Close();
 }
 
@@ -49,16 +53,17 @@ Status UnnestPlanNode::InitCollExpr(FragmentState* state) {
       << "set_containing_subplan() must have been called";
   const RowDescriptor& row_desc = *containing_subplan_->children_[0]->row_descriptor_;
   RETURN_IF_ERROR(ScalarExpr::Create(
-      tnode_->unnest_node.collection_expr, row_desc, state, &collection_expr_));
-  DCHECK(collection_expr_->IsSlotRef());
-
-  // Set the coll_slot_desc_ and the corresponding tuple index used for manually
-  // evaluating the collection SlotRef and for projection.
-  DCHECK(collection_expr_->IsSlotRef());
-  const SlotRef* slot_ref = static_cast<SlotRef*>(collection_expr_);
-  coll_slot_desc_ = state->desc_tbl().GetSlotDescriptor(slot_ref->slot_id());
-  DCHECK(coll_slot_desc_ != nullptr);
-  coll_tuple_idx_ = row_desc.GetTupleIdx(coll_slot_desc_->parent()->id());
+      tnode_->unnest_node.collection_exprs, row_desc, state, &collection_exprs_));
+  DCHECK_GT(collection_exprs_.size(), 0);
+
+  for (ScalarExpr* coll_expr : collection_exprs_) {
+    DCHECK(coll_expr->IsSlotRef());
+    const SlotRef* slot_ref = static_cast<SlotRef*>(coll_expr);
+    SlotDescriptor* slot_desc = state->desc_tbl().GetSlotDescriptor(slot_ref->slot_id());
+    DCHECK(slot_desc != nullptr);
+    coll_slot_descs_.push_back(slot_desc);
+    coll_tuple_idxs_.push_back(row_desc.GetTupleIdx(slot_desc->parent()->id()));
+  }
   return Status::OK();
 }
 
@@ -71,14 +76,10 @@ Status UnnestPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) cons
 UnnestNode::UnnestNode(
     ObjectPool* pool, const UnnestPlanNode& pnode, const DescriptorTbl& descs)
   : ExecNode(pool, pnode, descs),
-    item_byte_size_(0),
-    thrift_coll_expr_(pnode.tnode_->unnest_node.collection_expr),
-    coll_expr_(pnode.collection_expr_),
-    coll_expr_eval_(nullptr),
-    coll_slot_desc_(pnode.coll_slot_desc_),
-    coll_tuple_idx_(pnode.coll_tuple_idx_),
-    coll_value_(nullptr),
+    coll_slot_descs_(&(pnode.coll_slot_descs_)),
+    input_coll_tuple_idxs_(&(pnode.coll_tuple_idxs_)),
     item_idx_(0),
+    longest_collection_size_(0),
     num_collections_(0),
     total_collection_size_(0),
     max_collection_size_(-1),
@@ -86,7 +87,14 @@ UnnestNode::UnnestNode(
     avg_collection_size_counter_(nullptr),
     max_collection_size_counter_(nullptr),
     min_collection_size_counter_(nullptr),
-    num_collections_counter_(nullptr) {}
+    num_collections_counter_(nullptr) {
+  DCHECK_GT(coll_slot_descs_->size(), 0);
+  DCHECK_EQ(coll_slot_descs_->size(), input_coll_tuple_idxs_->size());
+  coll_values_.resize(coll_slot_descs_->size());
+  for (const SlotDescriptor* slot_desc : *coll_slot_descs_) {
+    output_coll_tuple_idxs_.push_back(GetCollTupleIdx(slot_desc));
+  }
+}
 
 Status UnnestNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
@@ -101,13 +109,14 @@ Status UnnestNode::Prepare(RuntimeState* state) {
   num_collections_counter_ =
       ADD_COUNTER(runtime_profile_, "NumCollections", TUnit::UNIT);
 
-  DCHECK_EQ(1, row_desc()->tuple_descriptors().size());
-  const TupleDescriptor* item_tuple_desc = row_desc()->tuple_descriptors()[0];
-  DCHECK(item_tuple_desc != nullptr);
-  item_byte_size_ = item_tuple_desc->byte_size();
+  DCHECK_EQ(coll_values_.size(), row_desc()->tuple_descriptors().size());
+  item_byte_sizes_.resize(row_desc()->tuple_descriptors().size());
+  for (int i = 0; i < row_desc()->tuple_descriptors().size(); ++i) {
+    const TupleDescriptor* item_tuple_desc = row_desc()->tuple_descriptors()[i];
+    DCHECK(item_tuple_desc != nullptr);
+    item_byte_sizes_[i] = item_tuple_desc->byte_size();
+  }
 
-  RETURN_IF_ERROR(ScalarExprEvaluator::Create(*coll_expr_, state, pool_,
-      expr_perm_pool(), expr_results_pool(), &coll_expr_eval_));
   return Status::OK();
 }
 
@@ -116,36 +125,42 @@ Status UnnestNode::Open(RuntimeState* state) {
   // Omit ScopedOpenEventAdder since this is always in a subplan.
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));
-  RETURN_IF_ERROR(coll_expr_eval_->Open(state));
 
   DCHECK(containing_subplan_->current_row() != nullptr);
-  Tuple* tuple = containing_subplan_->current_input_row_->GetTuple(coll_tuple_idx_);
-  if (tuple != nullptr) {
-    // Retrieve the collection value to be unnested directly from the tuple. We purposely
-    // ignore the null bit of the slot because we may have set it in a previous Open() of
-    // this same unnest node for projection.
-    coll_value_ = reinterpret_cast<const CollectionValue*>(
-        tuple->GetSlot(coll_slot_desc_->tuple_offset()));
-    // Projection: Set the slot containing the collection value to nullptr.
-    tuple->SetNull(coll_slot_desc_->null_indicator_offset());
-  } else {
-    coll_value_ = &EMPTY_COLLECTION_VALUE;
-    DCHECK_EQ(coll_value_->num_tuples, 0);
+  longest_collection_size_ = 0;
+  for (int i = 0; i < coll_values_.size(); ++i) {
+    Tuple* tuple =
+        containing_subplan_->current_input_row_->GetTuple((*input_coll_tuple_idxs_)[i]);
+    if (tuple != nullptr) {
+      SlotDescriptor* coll_slot_desc = (*coll_slot_descs_)[i];
+      coll_values_[i] = reinterpret_cast<const CollectionValue*>(
+          tuple->GetSlot(coll_slot_desc->tuple_offset()));
+      // Projection: Set the slot containing the collection value to nullptr.
+      tuple->SetNull(coll_slot_desc->null_indicator_offset());
+
+      // Update stats. Only take into account non-empty collections.
+      int num_tuples = coll_values_[i]->num_tuples;
+      if (num_tuples > 0) {
+        longest_collection_size_ = std::max(longest_collection_size_,
+            (int64_t)num_tuples);
+        total_collection_size_ += num_tuples;
+        ++num_collections_;
+        max_collection_size_ = std::max(max_collection_size_, (int64_t)num_tuples);
+        if (min_collection_size_ == -1 || num_tuples < min_collection_size_) {
+          min_collection_size_ = num_tuples;
+        }
+      }
+    } else {
+      coll_values_[i] = &EMPTY_COLLECTION_VALUE;
+      DCHECK_EQ(coll_values_[i]->num_tuples, 0);
+    }
   }
 
-  ++num_collections_;
   COUNTER_SET(num_collections_counter_, num_collections_);
-  total_collection_size_ += coll_value_->num_tuples;
   COUNTER_SET(avg_collection_size_counter_,
       static_cast<double>(total_collection_size_) / num_collections_);
-  if (max_collection_size_ == -1 || coll_value_->num_tuples > max_collection_size_) {
-    max_collection_size_ = coll_value_->num_tuples;
-    COUNTER_SET(max_collection_size_counter_, max_collection_size_);
-  }
-  if (min_collection_size_ == -1 || coll_value_->num_tuples < min_collection_size_) {
-    min_collection_size_ = coll_value_->num_tuples;
-    COUNTER_SET(min_collection_size_counter_, min_collection_size_);
-  }
+  COUNTER_SET(max_collection_size_counter_, max_collection_size_);
+  COUNTER_SET(min_collection_size_counter_, min_collection_size_);
   return Status::OK();
 }
 
@@ -158,17 +173,24 @@ Status UnnestNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
   }
   *eos = false;
 
-  // Populate the output row_batch with tuples from the collection.
-  DCHECK(coll_value_ != nullptr);
-  DCHECK_GE(coll_value_->num_tuples, 0);
-  while (item_idx_ < coll_value_->num_tuples) {
-    Tuple* item =
-        reinterpret_cast<Tuple*>(coll_value_->ptr + item_idx_ * item_byte_size_);
-    ++item_idx_;
+  // Populate the output row_batch with tuples from the collections.
+  while (item_idx_ < longest_collection_size_) {
     int row_idx = row_batch->AddRow();
     TupleRow* row = row_batch->GetRow(row_idx);
-    row->SetTuple(0, item);
-    // TODO: Ideally these should be evaluated by the parent scan node.
+    for (int i = 0; i < coll_values_.size(); ++i) {
+      const CollectionValue* coll_value = coll_values_[i];
+      DCHECK(coll_value != nullptr);
+      DCHECK_GE(coll_value->num_tuples, 0);
+      Tuple* input_tuple;
+      if (coll_value->num_tuples <= item_idx_) {
+        input_tuple = CreateNullTuple(i, row_batch);
+      } else {
+        input_tuple =
+            reinterpret_cast<Tuple*>(coll_value->ptr + item_idx_ * item_byte_sizes_[i]);
+      }
+      row->SetTuple(output_coll_tuple_idxs_[i], input_tuple);
+    }
+    ++item_idx_;
     DCHECK_EQ(conjuncts_.size(), conjunct_evals_.size());
     if (EvalConjuncts(conjunct_evals_.data(), conjuncts_.size(), row)) {
       row_batch->CommitLastRow();
@@ -179,13 +201,32 @@ Status UnnestNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
 
   // Checking the limit here is simpler/cheaper than doing it in the loop above.
   const bool reached_limit = CheckLimitAndTruncateRowBatchIfNeeded(row_batch, eos);
-  if (!reached_limit && item_idx_ == coll_value_->num_tuples) {
-    *eos = true;
-  }
+  if (!reached_limit && item_idx_ == longest_collection_size_) *eos = true;
   COUNTER_SET(rows_returned_counter_, rows_returned());
   return Status::OK();
 }
 
+int UnnestNode::GetCollTupleIdx(const SlotDescriptor* slot_desc) const {
+  DCHECK(slot_desc != nullptr);
+  const TupleDescriptor* coll_tuple = slot_desc->children_tuple_descriptor();
+  DCHECK(coll_tuple != nullptr);
+  return row_descriptor_.GetTupleIdx(coll_tuple->id());
+}
+
+Tuple* UnnestNode::CreateNullTuple(int coll_idx, RowBatch* row_batch) const {
+  const TupleDescriptor* coll_tuple =
+      (*coll_slot_descs_)[coll_idx]->children_tuple_descriptor();
+  DCHECK(coll_tuple != nullptr);
+  if (coll_tuple->slots().size() == 0) return nullptr;
+  DCHECK_EQ(coll_tuple->slots().size(), 1);
+  const SlotDescriptor* coll_item_slot = coll_tuple->slots()[0];
+  DCHECK(coll_item_slot != nullptr);
+  Tuple* tuple = Tuple::Create(item_byte_sizes_[coll_idx], row_batch->tuple_data_pool());
+  if (tuple == nullptr) return nullptr;
+  tuple->SetNull(coll_item_slot->null_indicator_offset());
+  return tuple;
+}
+
 Status UnnestNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   item_idx_ = 0;
   return ExecNode::Reset(state, row_batch);
@@ -193,7 +234,6 @@ Status UnnestNode::Reset(RuntimeState* state, RowBatch* row_batch) {
 
 void UnnestNode::Close(RuntimeState* state) {
   if (is_closed()) return;
-  if (coll_expr_eval_ != nullptr) coll_expr_eval_->Close(state);
   ExecNode::Close(state);
 }
 
diff --git a/be/src/exec/unnest-node.h b/be/src/exec/unnest-node.h
index 8045fab..deea026 100644
--- a/be/src/exec/unnest-node.h
+++ b/be/src/exec/unnest-node.h
@@ -31,28 +31,43 @@ class UnnestPlanNode : public PlanNode {
   virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
   virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
-  /// Initializes the expression which produces the collection to be unnested.
+  /// Initializes the expressions that produce the collections to be unnested.
   /// Called by the containing subplan plan-node.
   Status InitCollExpr(FragmentState* state);
 
   ~UnnestPlanNode(){}
 
-  /// Expr that produces the collection to be unnested. Currently always a SlotRef into an
-  /// collection-typed slot. We do not evaluate this expr for setting coll_value_, but
-  /// instead manually retrieve the slot value to support projection (see class comment).
-  ScalarExpr* collection_expr_ = nullptr;
+  /// Expressions that produce the collections to be unnested. They are always SlotRefs
+  /// into collection-typed slots. We do not evaluate these expressions for setting
+  /// 'UnnestNode::coll_values_', but instead manually retrieve the slot values to support
+  /// projection (see class comment in UnnestNode).
+  std::vector<ScalarExpr*> collection_exprs_;
 
-  /// Descriptor of the collection-typed slot referenced by coll_expr_eval_. Set in
-  /// Prepare().  This slot is always set to NULL in Open() as a simple projection.
-  const SlotDescriptor* coll_slot_desc_;
+  /// Descriptors of the collection-typed slots handled by this UnnestPlanNode. Set in
+  /// InitCollExpr().
+  std::vector<SlotDescriptor*> coll_slot_descs_;
 
-  /// Tuple index corresponding to coll_slot_desc_. Set in Prepare().
-  int coll_tuple_idx_;
+  /// Tuple indexes corresponding to 'coll_slot_descs_'. Set in InitCollExpr().
+  std::vector<int> coll_tuple_idxs_;
 };
 
-/// Exec node that scans an in-memory collection of tuples (a CollectionValue) producing
-/// one output row per tuple in the collection. The output row is composed of a single
-/// tuple - the collection's item tuple.
+/// Exec node that scans one or more in-memory collections of tuples (CollectionValues).
+/// The output row is composed of as many tuples as the number of collections this unnest
+/// handles - the collections' item tuples.
+/// Produces as many output rows as the size of the longest collection in this unnest and
+/// performs a zipping unnest on the collections. If the lenght of the collections is not
+/// the same than the missing values from the shorter collections will be null tuples.
+///
+/// Example:
+/// The collections handled by this unnest: coll1: {1,2,3}, coll2: {11}, coll3: {}
+/// The output of the unnest:
+/// +=======================+
+/// | coll1 | coll2 | coll3 |
+/// |-----------------------|
+/// | 1     | 11    | null  |
+/// | 2     | null  | null  |
+/// | 3     | null  | null  |
+/// +=======================+
 ///
 /// An UnnestNode does not have children and can only appear in the right child of a
 /// SubplanNode. The UnnestNode gets its 'input' from its containing SubplanNode.
@@ -66,7 +81,7 @@ class UnnestPlanNode : public PlanNode {
 /// might have set the bit in a prior Open()/GetNext()*/Reset() cycle.  We rely on the
 /// producer of the slot value (scan node) to write an empty collection value into slots
 /// that are NULL, in addition to setting the null bit. This breaks/augments the existing
-/// semantics of the null bits.  Setting the slot to NULL as early as possible ensures
+/// semantics of the null bits. Setting the slot to NULL as early as possible ensures
 /// that all rows returned by the containing SubplanNode will have the slot set to NULL.
 /// The FE guarantees that the contents of any collection-typed slot are never referenced
 /// outside of a single UnnestNode, so setting such a slot to NULL is safe after the
@@ -89,42 +104,57 @@ class UnnestNode : public ExecNode {
  private:
   friend class SubplanNode;
 
+  /// Gets a slot descriptor that is expected to refer to a collection and then returns
+  /// the tuple index from the output row's row descriptor to indicate where the values
+  /// of the given collection belong.
+  int GetCollTupleIdx(const SlotDescriptor* slot_desc) const;
+
+  /// Gets the index of a collection and creates a null tuple using mem pool from
+  /// 'row_batch' for this collection. Used for filling null values when this UnnestNode
+  /// is handling multiple collections for zipping unnest and one of the collections is
+  /// shorter then the others.
+  /// Returns nullptr if the collection doesn't have an underlying slot, e.g. when not
+  /// referenced in the query only for unnesting.
+  /// E.g.: SELECT id FROM complextypes_arrays t, t.arr1 where ID = 10;
+  Tuple* CreateNullTuple(int coll_idx, RowBatch* row_batch) const;
+
   static const CollectionValue EMPTY_COLLECTION_VALUE;
 
-  /// Size of a collection item tuple in bytes. Set in Prepare().
-  int item_byte_size_;
+  /// Sizes of collection item tuples in bytes. Set in Prepare().
+  std::vector<int> item_byte_sizes_;
 
-  /// Expr that produces the collection to be unnested. Currently always a SlotRef into an
-  /// collection-typed slot. We do not evaluate this expr for setting coll_value_, but
-  /// instead manually retrieve the slot value to support projection (see class comment).
-  const TExpr& thrift_coll_expr_;
-  ScalarExpr* coll_expr_;
-  ScalarExprEvaluator* coll_expr_eval_;
+  /// Descriptors of the collection-typed slots. These slots are always set to NULL in
+  /// Open() as a simple projection.
+  const std::vector<SlotDescriptor*>* coll_slot_descs_;
 
-  /// Descriptor of the collection-typed slot referenced by coll_expr_eval_. Set in
-  /// Prepare().  This slot is always set to NULL in Open() as a simple projection.
-  const SlotDescriptor* coll_slot_desc_;
+  /// Tuple indexes corresponding to 'coll_slot_descs_'. Note, these are tuple indexes in
+  /// the source node.
+  const std::vector<int>* input_coll_tuple_idxs_;
 
-  /// Tuple index corresponding to coll_slot_desc_. Set in Prepare().
-  int coll_tuple_idx_;
+  /// Tuple indexes corresponding to 'coll_slot_descs_' in the output tuple.
+  std::vector<int> output_coll_tuple_idxs_;
 
-  /// Current collection value to be unnested. Set using coll_slot_desc_ in Open().
-  const CollectionValue* coll_value_;
+  /// The current collection values to be unnested. Set using 'coll_slot_descs_' in
+  /// Open().
+  std::vector<const CollectionValue*> coll_values_;
 
   /// Current item index.
   int item_idx_;
 
-  // Stats for runtime profile
+  /// Stores the length of the longest collection in 'coll_values_'. Set in Open().
+  int64_t longest_collection_size_;
+
+  /// Stats for runtime profile
   int64_t num_collections_;
   int64_t total_collection_size_;
   int64_t max_collection_size_;
   int64_t min_collection_size_;
-  // TODO: replace with stats or histogram counter
+  /// TODO: replace with stats or histogram counter
   RuntimeProfile::Counter* avg_collection_size_counter_;
   RuntimeProfile::Counter* max_collection_size_counter_;
   RuntimeProfile::Counter* min_collection_size_counter_;
-  // This can be determined by looking at the input cardinality to the subplan node, but
-  // it's handy to have it here too.
+  /// This can be determined by looking at the input cardinality to the subplan node, but
+  /// it's handy to have it here too.
   RuntimeProfile::Counter* num_collections_counter_;
 };
 
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index e137250..c9afb34 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -654,9 +654,9 @@ struct TExchangeNode {
 }
 
 struct TUnnestNode {
-  // Expr that returns the in-memory collection to be scanned.
-  // Currently always a SlotRef into an array-typed slot.
-  1: required Exprs.TExpr collection_expr
+  // Exprs that return the in-memory collections to be scanned.
+  // Currently always SlotRefs into array-typed slots.
+  1: required list<Exprs.TExpr> collection_exprs
 }
 
 struct TCardinalityCheckNode {
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index ce849d0..bbd1e53 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -312,9 +312,9 @@ terminal
   KW_STRING, KW_STRUCT, KW_SYMBOL, KW_SYSTEM_TIME, KW_SYSTEM_VERSION,
   KW_TABLE, KW_TABLES, KW_TABLESAMPLE, KW_TBLPROPERTIES,
   KW_TERMINATED, KW_TEXTFILE, KW_THEN, KW_TIMESTAMP, KW_TINYINT, KW_TRUNCATE, KW_STATS,
-  KW_TO, KW_TRUE, KW_UNBOUNDED, KW_UNCACHED, KW_UNION, KW_UNKNOWN, KW_UNSET, KW_UPDATE,
-  KW_UPDATE_FN, KW_UPSERT, KW_USE, KW_USING, KW_VALIDATE, KW_VALUES, KW_VARCHAR, KW_VIEW,
-  KW_WHEN, KW_WHERE, KW_WITH, KW_ZORDER;
+  KW_TO, KW_TRUE, KW_UNBOUNDED, KW_UNCACHED, KW_UNION, KW_UNKNOWN, KW_UNNEST, KW_UNSET,
+  KW_UPDATE, KW_UPDATE_FN, KW_UPSERT, KW_USE, KW_USING, KW_VALIDATE, KW_VALUES,
+  KW_VARCHAR, KW_VIEW, KW_WHEN, KW_WHERE, KW_WITH, KW_ZORDER;
 
 terminal UNUSED_RESERVED_WORD;
 
@@ -404,7 +404,7 @@ nonterminal Boolean opt_nulls_order_param;
 nonterminal Expr opt_offset_param;
 nonterminal LimitElement opt_limit_offset_clause;
 nonterminal Expr opt_limit_clause, opt_offset_clause;
-nonterminal Expr cast_expr, case_else_clause, analytic_expr;
+nonterminal Expr cast_expr, case_else_clause, analytic_expr, unnest_expr;
 nonterminal String cast_format_val;
 nonterminal Expr function_call_expr;
 nonterminal AnalyticWindow opt_window_clause;
@@ -416,9 +416,12 @@ nonterminal CaseExpr case_expr;
 nonterminal List<CaseWhenClause> case_when_clause_list;
 nonterminal FunctionParams function_params;
 nonterminal List<String> dotted_path;
+nonterminal List<List<String>> dotted_path_list;
 nonterminal SlotRef slot_ref;
 nonterminal FromClause from_clause;
 nonterminal List<TableRef> table_ref_list;
+nonterminal List<TableRef> unnest_table_ref;
+nonterminal List<String> unnest_alias_list;
 nonterminal TableSampleClause opt_tablesample;
 nonterminal WithClause opt_with_clause;
 nonterminal List<View> with_view_def_list;
@@ -3008,12 +3011,28 @@ table_ref_list ::=
     list.add(table);
     RESULT = list;
   :}
+  | unnest_table_ref:table_refs
+  {: RESULT = table_refs; :}
   | table_ref_list:list COMMA table_ref:table opt_plan_hints:hints
   {:
     table.setTableHints(hints);
     list.add(table);
     RESULT = list;
   :}
+  | table_ref_list:list COMMA unnest_table_ref:table_refs
+  {:
+    for (TableRef tblRef : list) {
+      if (tblRef.isZippingUnnest()) {
+        // Have to do this check here in the .cup file as once the UNNEST() is converted
+        // into TableRefs there won't be a way to see if these TableRefs are from the
+        // same or different UNNEST()s.
+        parser.parseError("unnest", SqlParserSymbols.KW_UNNEST,
+            "Providing multiple UNNEST() in the FROM clause is not supported.");
+      }
+    }
+    list.addAll(table_refs);
+    RESULT = list;
+  :}
   | table_ref_list:list KW_CROSS KW_JOIN opt_plan_hints:join_hints table_ref:table
     opt_plan_hints:table_hints
   {:
@@ -3065,6 +3084,46 @@ table_ref ::=
   {: RESULT = new InlineViewRef(alias, query, tblsmpl); :}
   ;
 
+unnest_table_ref ::=
+  KW_UNNEST LPAREN dotted_path_list:path_list RPAREN unnest_alias_list:aliases
+  {:
+    if (aliases != null && aliases.size() != path_list.size()) {
+      parser.parseError("unnest", SqlParserSymbols.KW_UNNEST,
+          "The number of arrays doesn't match with the number of aliases");
+    }
+    List<TableRef> refs = Lists.newArrayList();
+    for (int i = 0; i < path_list.size(); ++i) {
+      String alias = null;
+      if (aliases != null) alias = aliases.get(i);
+      TableRef ref = new TableRef(path_list.get(i), alias);
+      ref.setZippingUnnestType(TableRef.ZippingUnnestType.FROM_CLAUSE_ZIPPING_UNNEST);
+      refs.add(ref);
+    }
+    RESULT = refs;
+  :}
+;
+
+unnest_alias_list ::=
+  KW_AS LPAREN ident_list:aliases RPAREN
+  {: RESULT = aliases; :}
+  | /* empty */
+  {: RESULT = null; :}
+;
+
+dotted_path_list ::=
+  dotted_path:path
+  {:
+    List<List<String>> list = Lists.newArrayList();
+    list.add(path);
+    RESULT = list;
+  :}
+  | dotted_path_list:list COMMA dotted_path:path
+  {:
+    list.add(path);
+    RESULT = list;
+  :}
+;
+
 opt_asof ::=
   KW_FOR KW_SYSTEM_TIME KW_AS KW_OF expr:expr
   {: RESULT = new TimeTravelSpec(TimeTravelSpec.Kind.TIME_AS_OF, expr); :}
@@ -3389,6 +3448,11 @@ case_else_clause ::=
   {: RESULT = null; :}
   ;
 
+unnest_expr ::=
+  KW_UNNEST LPAREN dotted_path:path RPAREN
+  {: RESULT = new UnnestExpr(path); :}
+  ;
+
 sign_chain_expr ::=
   SUBTRACT expr:e
   {:
@@ -3430,6 +3494,8 @@ non_pred_expr ::=
   {: RESULT = e; :}
   | analytic_expr:e
   {: RESULT = e; :}
+  | unnest_expr:e
+  {: RESULT = e; :}
   /* Additional rules for function names that are also keywords */
   | KW_IF LPAREN expr_list:exprs RPAREN
   {: RESULT = new FunctionCallExpr("if", exprs); :}
@@ -4300,6 +4366,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_UNKNOWN:r
   {: RESULT = r.toString(); :}
+  | KW_UNNEST:r
+  {: RESULT = r.toString(); :}
   | KW_UNSET:r
   {: RESULT = r.toString(); :}
   | KW_UPDATE:r
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index de55559..8d60f80 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -393,6 +393,9 @@ public class AnalysisContext {
     public boolean requiresAcidComplexScanRewrite() {
       return canRewriteStatement() && analyzer_.hasTopLevelAcidCollectionTableRef();
     }
+    public boolean requiresZippingUnnestRewrite() {
+      return canRewriteStatement() && isZippingUnnestInSelectList(stmt_);
+    }
     public boolean requiresExprRewrite() {
       return isQueryStmt() || isInsertStmt() || isCreateTableAsSelectStmt()
           || isUpdateStmt() || isDeleteStmt();
@@ -406,6 +409,19 @@ public class AnalysisContext {
     }
     public void setUserHasProfileAccess(boolean value) { userHasProfileAccess_ = value; }
     public boolean userHasProfileAccess() { return userHasProfileAccess_; }
+
+    private boolean isZippingUnnestInSelectList(StatementBase stmt) {
+      if (!(stmt instanceof SelectStmt)) return false;
+      if (!stmt.analyzer_.getTableRefsFromUnnestExpr().isEmpty()) return true;
+      SelectStmt selectStmt = (SelectStmt)stmt;
+      for (TableRef tblRef : selectStmt.fromClause_.getTableRefs()) {
+        if (tblRef instanceof InlineViewRef &&
+            isZippingUnnestInSelectList(((InlineViewRef)tblRef).getViewStmt())) {
+          return true;
+        }
+      }
+      return false;
+    }
   }
 
   public Analyzer createAnalyzer(StmtTableCache stmtTableCache) {
@@ -533,6 +549,10 @@ public class AnalysisContext {
       new StmtRewriter.AcidRewriter().rewrite(analysisResult_);
       shouldReAnalyze = true;
     }
+    if (analysisResult_.requiresZippingUnnestRewrite()) {
+      new StmtRewriter.ZippingUnnestRewriter().rewrite(analysisResult_);
+      shouldReAnalyze = true;
+    }
     if (!shouldReAnalyze) return;
 
     // For SetOperationStmt we must replace the query statement with the rewritten version
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index c3d48dc..3b95bcd 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -504,6 +504,10 @@ public class Analyzer {
     public final Map<String, org.apache.kudu.client.KuduTable> kuduTables =
         new HashMap<>();
 
+    // This holds the tuple id's of the arrays that are given as a zipping unnest table
+    // ref.
+    public Set<TupleId> zippingUnnestTupleIds = new HashSet<>();
+
     public GlobalState(StmtTableCache stmtTableCache, TQueryCtx queryCtx,
         AuthorizationFactory authzFactory, AuthorizationContext authzCtx) {
       this.stmtTableCache = stmtTableCache;
@@ -566,6 +570,11 @@ public class Analyzer {
   // Map from tuple id to its corresponding table ref.
   private final Map<TupleId, TableRef> tableRefMap_ = new HashMap<>();
 
+  // This is populated by UnnestExpr during analysis. Each UnnestExpr creates a
+  // CollectionTableRef and adds it to this set. Later on SelectStmt will add these table
+  // refs to the table refs in the FROM clause.
+  private final Set<CollectionTableRef> tableRefsFromUnnestExpr_ = new HashSet<>();
+
   // Set of lowercase ambiguous implicit table aliases.
   private final Set<String> ambiguousAliases_ = new HashSet<>();
 
@@ -693,6 +702,11 @@ public class Analyzer {
     }
   }
 
+  public boolean isRegisteredTableRef(TableRef ref) {
+    if (ref == null) return false;
+    String uniqueAlias = ref.getUniqueAlias();
+    return aliasMap_.containsKey(uniqueAlias);
+  }
   /**
    * Creates an returns an empty TupleDescriptor for the given table ref and registers
    * it against all its legal aliases. For tables refs with an explicit alias, only the
@@ -948,6 +962,21 @@ public class Analyzer {
     globalState_.semiJoinedTupleIds.put(tid, rhsRef);
   }
 
+  public void addZippingUnnestTupleId(CollectionTableRef tblRef) {
+    Expr collExpr = tblRef.getCollectionExpr();
+    if (!(collExpr instanceof SlotRef)) return;
+    SlotRef slotCollExpr = (SlotRef)collExpr;
+    SlotDescriptor collSlotDesc = slotCollExpr.getDesc();
+    Preconditions.checkNotNull(collSlotDesc);
+    TupleDescriptor collTupleDesc = collSlotDesc.getItemTupleDesc();
+    Preconditions.checkNotNull(collTupleDesc);
+    globalState_.zippingUnnestTupleIds.add(collTupleDesc.getId());
+  }
+
+  public Set<TupleId> getZippingUnnestTupleIds() {
+    return globalState_.zippingUnnestTupleIds;
+  }
+
   /**
    * Returns the descriptor of the given explicit or implicit table alias or null if no
    * such alias has been registered.
@@ -983,9 +1012,17 @@ public class Analyzer {
 
   public int getNumTableRefs() { return tableRefMap_.size(); }
   public TableRef getTableRef(TupleId tid) { return tableRefMap_.get(tid); }
+  public Map<TupleId, TableRef> getTableRefs() { return tableRefMap_; }
   public ExprRewriter getConstantFolder() { return globalState_.constantFolder_; }
   public ExprRewriter getExprRewriter() { return globalState_.exprRewriter_; }
 
+  public Set<CollectionTableRef> getTableRefsFromUnnestExpr() {
+    return tableRefsFromUnnestExpr_;
+  }
+  public void addTableRefFromUnnestExpr(CollectionTableRef ref) {
+    tableRefsFromUnnestExpr_.add(ref);
+  }
+
   /**
    * Given a "table alias"."column alias", return the SlotDescriptor
    */
diff --git a/fe/src/main/java/org/apache/impala/analysis/FromClause.java b/fe/src/main/java/org/apache/impala/analysis/FromClause.java
index 76c75b6..7d84132 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FromClause.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FromClause.java
@@ -23,6 +23,8 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.impala.analysis.TableRef.ZippingUnnestType;
+import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.util.AcidUtils;
 
@@ -50,6 +52,8 @@ public class FromClause extends StmtNode implements Iterable<TableRef> {
   public FromClause() { tableRefs_ = new ArrayList<>(); }
   public List<TableRef> getTableRefs() { return tableRefs_; }
 
+  public boolean isAnalyzed() { return analyzed_; }
+
   @Override
   public boolean resolveTableMask(Analyzer analyzer) throws AnalysisException {
     boolean hasChanges = false;
@@ -74,7 +78,9 @@ public class FromClause extends StmtNode implements Iterable<TableRef> {
   public void analyze(Analyzer analyzer) throws AnalysisException {
     if (analyzed_) return;
 
+    TableRef firstZippingUnnestRef = null;
     TableRef leftTblRef = null;  // the one to the left of tblRef
+    boolean hasJoiningUnnest = false;
     for (int i = 0; i < tableRefs_.size(); ++i) {
       TableRef tblRef = tableRefs_.get(i);
       tblRef = analyzer.resolveTableRef(tblRef);
@@ -84,11 +90,40 @@ public class FromClause extends StmtNode implements Iterable<TableRef> {
       leftTblRef = tblRef;
       if (tblRef instanceof CollectionTableRef) {
         checkTopLevelComplexAcidScan(analyzer, (CollectionTableRef)tblRef);
+        if (firstZippingUnnestRef != null && tblRef.isZippingUnnest() &&
+            firstZippingUnnestRef.getResolvedPath().getRootTable() !=
+            tblRef.getResolvedPath().getRootTable()) {
+          throw new AnalysisException("Not supported to do zipping unnest on " +
+              "arrays from different tables.");
+        }
+        if (!tblRef.isZippingUnnest()) {
+          hasJoiningUnnest = true;
+        } else {
+          if (!isPathForArrayType(tblRef)) {
+            throw new AnalysisException("Unnest operator is only supported for arrays. " +
+                ToSqlUtils.getPathSql(tblRef.getPath()));
+          }
+          if (firstZippingUnnestRef == null) firstZippingUnnestRef = tblRef;
+          analyzer.addZippingUnnestTupleId((CollectionTableRef)tblRef);
+        }
       }
     }
+    if (hasJoiningUnnest && firstZippingUnnestRef != null) {
+      throw new AnalysisException(
+          "Providing zipping and joining unnests together is not supported.");
+    }
     analyzed_ = true;
   }
 
+  private boolean isPathForArrayType(TableRef tblRef) {
+    Preconditions.checkNotNull(tblRef);
+    Preconditions.checkState(!tblRef.getResolvedPath().getMatchedTypes().isEmpty());
+    Type resolvedType =
+        tblRef.getResolvedPath().getMatchedTypes().get(
+            tblRef.getResolvedPath().getMatchedTypes().size() - 1);
+    return resolvedType.isArrayType();
+  }
+
   public void collectFromClauseTableRefs(List<TableRef> tblRefs) {
     collectTableRefs(tblRefs, true);
   }
@@ -158,7 +193,25 @@ public class FromClause extends StmtNode implements Iterable<TableRef> {
     if (!tableRefs_.isEmpty()) {
       builder.append(" FROM ");
       for (int i = 0; i < tableRefs_.size(); ++i) {
-        builder.append(tableRefs_.get(i).toSql(options));
+        TableRef tblRef = tableRefs_.get(i);
+        if (tblRef.getZippingUnnestType() ==
+            ZippingUnnestType.FROM_CLAUSE_ZIPPING_UNNEST) {
+          // Go through all the consecutive table refs for zipping unnest and put them in
+          // the same "UNNEST()".
+          if (i != 0) builder.append(", ");
+          builder.append("UNNEST(");
+          boolean first = true;
+          while(i < tableRefs_.size() && tblRef.getZippingUnnestType() ==
+              ZippingUnnestType.FROM_CLAUSE_ZIPPING_UNNEST) {
+            if (!first) builder.append(", ");
+            if (first) first = false;
+            builder.append(ToSqlUtils.getPathSql(tblRef.getPath()));
+            if (++i < tableRefs_.size()) tblRef = tableRefs_.get(i);
+          }
+          builder.append(")");
+        }
+        if (i >= tableRefs_.size()) break;
+        builder.append(tblRef.toSql(options));
       }
     }
     return builder.toString();
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index b74546f..bcc8ef0 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -440,6 +440,29 @@ public class SelectStmt extends QueryStmt {
         throw new AnalysisException(
             "WHERE clause must not contain analytic expressions: " + e.toSql());
       }
+
+      // Don't allow a WHERE conjunct on an array item that is part of a zipping unnest.
+      // In case there is only one zipping unnested array this restriction is not needed
+      // as the UNNEST node has to handle a single array and it's safe to do the filtering
+      // in the scanner.
+      Set<TupleId> zippingUnnestTupleIds = analyzer_.getZippingUnnestTupleIds();
+      if (zippingUnnestTupleIds.size() > 1) {
+        for (Expr expr : whereClause_.getChildren()) {
+          if (expr == null || !(expr instanceof SlotRef)) continue;
+          SlotRef slotRef = (SlotRef)expr;
+          for (TupleId tid : zippingUnnestTupleIds) {
+            TupleDescriptor collTupleDesc = analyzer_.getTupleDesc(tid);
+            // If there is no slot ref for the collection tuple then there is no need to
+            // check.
+            if (collTupleDesc.getSlots().size() == 0) continue;
+            Preconditions.checkState(collTupleDesc.getSlots().size() == 1);
+            if (slotRef.getDesc().equals(collTupleDesc.getSlots().get(0))) {
+              throw new AnalysisException("Not allowed to add a filter on an unnested " +
+                  "array under the same select statement: " + expr.toSql());
+            }
+          }
+        }
+      }
       analyzer_.registerConjuncts(whereClause_, false);
     }
 
@@ -492,6 +515,11 @@ public class SelectStmt extends QueryStmt {
         // Do not generate a predicate if the parent tuple is outer joined.
         if (analyzer_.isOuterJoined(ref.getResolvedPath().getRootDesc().getId()))
           continue;
+        // Don't push down the "is not empty" predicate for zipping unnests if there are
+        // multiple zipping unnests in the FROM clause.
+        if (tblRef.isZippingUnnest() && analyzer_.getZippingUnnestTupleIds().size() > 1) {
+          continue;
+        }
         IsNotEmptyPredicate isNotEmptyPred =
             new IsNotEmptyPredicate(ref.getCollectionExpr().clone());
         isNotEmptyPred.analyze(analyzer_);
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
index a31c51a..93942bf 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.analysis;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
@@ -40,14 +41,14 @@ import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 
 public class SlotRef extends Expr {
-  private final List<String> rawPath_;
+  protected List<String> rawPath_;
   private final String label_;  // printed in toSql()
 
   // Results of analysis.
   private SlotDescriptor desc_;
 
   // The resolved path after resolving 'rawPath_'.
-  private Path resolvedPath_ = null;
+  protected Path resolvedPath_ = null;
 
   public SlotRef(List<String> rawPath) {
     super();
@@ -88,10 +89,17 @@ public class SlotRef extends Expr {
   /**
    * C'tor for cloning.
    */
-  private SlotRef(SlotRef other) {
+  protected SlotRef(SlotRef other) {
     super(other);
     resolvedPath_ = other.resolvedPath_;
-    rawPath_ = other.rawPath_;
+    if (other.rawPath_ != null) {
+      // Instead of using the reference of 'other.rawPath_' clone its values into another
+      // list.
+      rawPath_ = new ArrayList<>();
+      rawPath_.addAll(other.rawPath_);
+    } else {
+      rawPath_ = null;
+    }
     label_ = other.label_;
     desc_ = other.desc_;
     type_ = other.type_;
diff --git a/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java b/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
index d57d46f..ad060b2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
@@ -20,6 +20,7 @@ package org.apache.impala.analysis;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
 import org.apache.impala.analysis.SetOperationStmt.SetOperand;
@@ -1853,4 +1854,21 @@ public class StmtRewriter {
       return rawTblPath;
     }
   }
+
+  /**
+   * The purpose of this rewriter is to add CollectionTableRefs to the FROM clause of
+   * select queries where unnest() is in the select list. One example of such a query:
+   * SELECT unnest(arr1), unnest(arr2) FROM complextypes_arrays;
+   * In the above example there will be two CollectionTableRefs added to the FROM clause,
+   * one for each unnested array.
+   */
+  static class ZippingUnnestRewriter extends StmtRewriter {
+    @Override
+    protected void rewriteSelectStmtHook(SelectStmt stmt, Analyzer analyzer)
+        throws AnalysisException {
+      Set<CollectionTableRef> unnestTableRefs = analyzer.getTableRefsFromUnnestExpr();
+      Preconditions.checkState(stmt.getResultExprs().size() >= unnestTableRefs.size());
+      for (TableRef tblRef : unnestTableRefs) stmt.fromClause_.add(tblRef);
+    }
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableRef.java b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
index 4f008b1..79c9622 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
@@ -52,7 +52,7 @@ import com.google.common.collect.Lists;
  * The analysis of table refs follows a two-step process:
  *
  * 1. Resolution: A table ref's path is resolved and then the generic TableRef is
- * replaced by a concrete table ref (a BaseTableRef, CollectionTabeRef or ViewRef)
+ * replaced by a concrete table ref (a BaseTableRef, CollectionTableRef or ViewRef)
  * in the originating stmt and that is given the resolved path. This step is driven by
  * Analyzer.resolveTableRef().
  *
@@ -110,6 +110,15 @@ public class TableRef extends StmtNode {
   // TODO: Move join-specific members out of TableRef.
   private DistributionMode distrMode_ = DistributionMode.NONE;
 
+  public enum ZippingUnnestType {
+    NONE,
+    FROM_CLAUSE_ZIPPING_UNNEST,
+    SELECT_LIST_ZIPPING_UNNEST
+  }
+
+  // Indicates if this TableRef is for the purpose of zipping unnest for arrays.
+  protected ZippingUnnestType zippingUnnestType_ = ZippingUnnestType.NONE;
+
   /////////////////////////////////////////
   // BEGIN: Members that need to be reset()
 
@@ -247,6 +256,7 @@ public class TableRef extends StmtNode {
     exposeNestedColumnsByTableMaskView_ = other.exposeNestedColumnsByTableMaskView_;
     scalarColumns_ = new LinkedHashMap<>(other.scalarColumns_);
     isHidden_ = other.isHidden_;
+    zippingUnnestType_ = other.zippingUnnestType_;
   }
 
   @Override
@@ -391,6 +401,13 @@ public class TableRef extends StmtNode {
   public boolean isAnalyzed() { return isAnalyzed_; }
   public boolean isResolved() { return !getClass().equals(TableRef.class); }
 
+  public boolean isZippingUnnest() {
+    return zippingUnnestType_ != ZippingUnnestType.NONE;
+  }
+  public ZippingUnnestType getZippingUnnestType() { return zippingUnnestType_; }
+  public void setZippingUnnestType(ZippingUnnestType t) { zippingUnnestType_ = t; }
+
+
   /**
    * This method should only be called after the TableRef has been analyzed.
    */
@@ -706,6 +723,7 @@ public class TableRef extends StmtNode {
 
   @Override
   public String toSql(ToSqlOptions options) {
+    if (isZippingUnnest()) return "";
     if (joinOp_ == null) {
       // prepend "," if we're part of a sequence of table refs w/o an
       // explicit JOIN clause
diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
index f51a36e..783345a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
@@ -188,8 +188,12 @@ public class TupleDescriptor {
   public StructType getType() { return type_; }
   public int getByteSize() { return byteSize_; }
   public float getAvgSerializedSize() { return avgSerializedSize_; }
-  public boolean isMaterialized() { return isMaterialized_; }
-  public void setIsMaterialized(boolean value) { isMaterialized_ = value; }
+  public boolean isMaterialized() {
+    return isMaterialized_;
+  }
+  public void setIsMaterialized(boolean value) {
+    isMaterialized_ = value;
+  }
   public boolean hasMemLayout() { return hasMemLayout_; }
   public void setAliases(String[] aliases, boolean hasExplicitAlias) {
     aliases_ = aliases;
diff --git a/fe/src/main/java/org/apache/impala/analysis/UnnestExpr.java b/fe/src/main/java/org/apache/impala/analysis/UnnestExpr.java
new file mode 100644
index 0000000..6d63ff3
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/UnnestExpr.java
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.analysis.Path.PathType;
+import org.apache.impala.analysis.TableRef.ZippingUnnestType;
+import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.thrift.TExprNode;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+
+// This class represents a zipping unnest SlotRef in a SELECT as the following:
+// SELECT unnest(arr1), unnest(arr2) FROM tbl;
+public class UnnestExpr extends SlotRef {
+  // Stores the raw path to the underlying array without the "item" postfix.
+  private List<String> rawPathWithoutItem_ = new ArrayList<>();
+
+  public UnnestExpr(List<String> path) {
+    super(path);
+  }
+
+  protected UnnestExpr(UnnestExpr other) {
+    super(other);
+  }
+
+  @Override
+  protected void analyzeImpl(Analyzer analyzer) throws AnalysisException {
+    Preconditions.checkNotNull(rawPath_);
+    Preconditions.checkState(rawPath_.size() >= 1);
+
+    verifyTableRefs(analyzer);
+
+    // Resolve 'rawPath_' early before running super.analyzeImpl() because the required
+    // CollectionTableRef might have to be created beforehand so that the array item could
+    // find the corresponding CollectionTableRef during resolution.
+    Path resolvedPath = resolveAndVerifyRawPath(analyzer);
+    Preconditions.checkNotNull(resolvedPath);
+    rawPathWithoutItem_.addAll(rawPath_);
+
+    List<String> tableRefRawPath = constructRawPathForTableRef(resolvedPath);
+    Preconditions.checkNotNull(tableRefRawPath);
+    createAndRegisterCollectionTableRef(tableRefRawPath, analyzer);
+
+    // 'rawPath_' points to an array and we need a SlotRef to refer to the item of the
+    // array. Hence, adding "item" to the end of the path.
+    rawPath_.add("item");
+    // If 'rawPath_' contains the table or database alias then trim it to the following
+    // format: 'collection_name.item'.
+    if (rawPath_.size() > 2) {
+      rawPath_ = rawPath_.subList(rawPath_.size() - 2, rawPath_.size());
+    }
+    super.analyzeImpl(analyzer);
+  }
+
+  private void verifyTableRefs(Analyzer analyzer) throws AnalysisException {
+    for (TableRef ref : analyzer.getTableRefs().values()) {
+      if (ref instanceof CollectionTableRef) {
+        if (!ref.isZippingUnnest()) {
+          throw new AnalysisException(
+              "Providing zipping and joining unnests together is not supported.");
+        } else if (ref.getZippingUnnestType() ==
+            ZippingUnnestType.FROM_CLAUSE_ZIPPING_UNNEST) {
+          throw new AnalysisException("Providing zipping unnest both in the SELECT " +
+              "list and in the FROM clause is not supported.");
+        }
+      }
+    }
+  }
+
+  private Path resolveAndVerifyRawPath(Analyzer analyzer) throws AnalysisException {
+    Preconditions.checkNotNull(rawPath_);
+    // If this is a re-analysis round and we already added "item" to the end of the path,
+    // we remove it now before resolving the path again.
+    if (resolvedPath_ != null) removeItemFromPath();
+    Path resolvedPath = null;
+    try {
+      resolvedPath = analyzer.resolvePath(rawPath_, PathType.SLOT_REF);
+      if (resolvedPath == null) {
+        throw new AnalysisException("Unable to resolve path: " +
+            ToSqlUtils.getPathSql(rawPath_));
+      }
+    } catch (TableLoadingException e) {
+      throw new AnalysisException(e.toString());
+    }
+    Preconditions.checkNotNull(resolvedPath);
+    Preconditions.checkState(!resolvedPath.getMatchedTypes().isEmpty());
+    Type resolvedType =
+        resolvedPath.getMatchedTypes().get(resolvedPath.getMatchedTypes().size() - 1);
+    if (!resolvedType.isArrayType()) {
+      throw new AnalysisException("Unnest operator is only supported for arrays. " +
+          ToSqlUtils.getPathSql(rawPath_));
+    }
+    return resolvedPath;
+  }
+
+  private List<String> constructRawPathForTableRef(Path resolvedPath) {
+    List<String> tableRefRawPath = new ArrayList<String>();
+    tableRefRawPath.add(resolvedPath.getRootDesc().getAlias());
+    tableRefRawPath.add(rawPath_.get(rawPath_.size() - 1));
+    return tableRefRawPath;
+  }
+
+  private void createAndRegisterCollectionTableRef(List<String> tableRefRawPath,
+      Analyzer analyzer) throws AnalysisException {
+    TableRef tblRef = new TableRef(tableRefRawPath, null);
+    tblRef = analyzer.resolveTableRef(tblRef);
+    Preconditions.checkState(tblRef instanceof CollectionTableRef);
+    tblRef.setZippingUnnestType(ZippingUnnestType.SELECT_LIST_ZIPPING_UNNEST);
+    if (!analyzer.isRegisteredTableRef(tblRef)) {
+      tblRef.analyze(analyzer);
+      // This just registers the tbl ref to be added to the FROM clause because it's not
+      // available here. Note, SelectStmt will add it to the FROM clause during analysis.
+      analyzer.addTableRefFromUnnestExpr((CollectionTableRef)tblRef);
+    }
+  }
+
+  private void removeItemFromPath() {
+    Preconditions.checkNotNull(rawPath_);
+    if (rawPath_.get(rawPath_.size() - 1).equals("item")) {
+      rawPath_.remove(rawPath_.size() - 1);
+    }
+  }
+
+  @Override
+  public Expr clone() { return new UnnestExpr(this); }
+
+  @Override
+  public String toSqlImpl(ToSqlOptions options) {
+    return "UNNEST(" + ToSqlUtils.getPathSql(rawPathWithoutItem_)  + ")";
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index b84ce7f..68f5134 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -931,6 +931,16 @@ public class HdfsScanNode extends ScanNode {
       Preconditions.checkNotNull(slotDesc.getItemTupleDesc());
       TupleDescriptor itemTupleDesc = slotDesc.getItemTupleDesc();
       TupleId itemTid = itemTupleDesc.getId();
+
+      // If the slot is part of a collection that is given as a zipping unnest in the
+      // FROM clause then avoid pushing down conjunct for this slot to the scanner as it
+      // would result incorrect results on that slot after performing the unnest.
+      // One exception is when there is only one such table reference in the FROM clause.
+      Set<TupleId> zippingUnnestTupleIds = analyzer.getZippingUnnestTupleIds();
+      if (zippingUnnestTupleIds.size() > 1 && zippingUnnestTupleIds.contains(itemTid)) {
+        continue;
+      }
+
       // First collect unassigned and binding predicates. Then remove redundant
       // predicates based on slot equivalences and enforce slot equivalences by
       // generating new predicates.
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index e75722e..3461634 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -27,6 +27,7 @@ import java.util.Set;
 
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
+import org.apache.impala.analysis.CollectionTableRef;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ExprId;
 import org.apache.impala.analysis.ExprSubstitutionMap;
@@ -157,6 +158,15 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     tblRefIds_.addAll(tupleIds);
   }
 
+  protected PlanNode(PlanNodeId id, String displayName,
+      List<CollectionTableRef> tblRefs) {
+    this(id, displayName);
+    for (CollectionTableRef collRef : tblRefs) {
+      tupleIds_.add(collRef.getDesc().getId());
+      tblRefIds_.add(collRef.getDesc().getId());
+    }
+  }
+
   /**
    * Deferred id_ assignment.
    */
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 23e1e46..71b215d 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -914,8 +914,11 @@ public class SingleNodePlanner {
     // maintain a deterministic order of traversing the TableRefs during join
     // plan generation (helps with tests)
     List<Pair<TableRef, PlanNode>> parentRefPlans = new ArrayList<>();
+    List<CollectionTableRef> unnestCollectionRefs =
+        extractZippingUnnestTableRefs(parentRefs);
+    reduceUnnestCollectionRefs(parentRefs, unnestCollectionRefs);
     for (TableRef ref: parentRefs) {
-      PlanNode root = createTableRefNode(ref, aggInfo, analyzer);
+      PlanNode root = createTableRefNode(ref, aggInfo, analyzer, unnestCollectionRefs);
       Preconditions.checkNotNull(root);
       root = createSubplan(root, subplanRefs, true, analyzer);
       parentRefPlans.add(new Pair<TableRef, PlanNode>(ref, root));
@@ -1025,6 +1028,39 @@ public class SingleNodePlanner {
   }
 
   /**
+   * This functions gathers and returns all the CollectionTableRefs that are for zipping
+   * unnest.
+   */
+  private List<CollectionTableRef> extractZippingUnnestTableRefs(
+      List<TableRef> refs) {
+    Preconditions.checkNotNull(refs);
+    List<CollectionTableRef> collectionRefs = Lists.newArrayList();
+    for (TableRef ref : refs) {
+      if (ref instanceof CollectionTableRef && ref.isZippingUnnest()) {
+        collectionRefs.add((CollectionTableRef)ref);
+      }
+    }
+    return collectionRefs;
+  }
+
+  /**
+   * This functions removes the items in 'unnestCollectionRefs' from 'refs' except the
+   * first item in 'unnestCollectionRefs'. This is used when the collectionTableRefs are
+   * handled by a single UNNEST node for zipping unnest. A single CollectionTableRef item
+   * has to remain in 'refs' so that the subplan creation can see that an UNNEST node has
+   * to be created.
+   */
+  private void reduceUnnestCollectionRefs(List<TableRef> refs,
+      List<CollectionTableRef> unnestCollectionRefs) {
+    Preconditions.checkNotNull(refs);
+    Preconditions.checkNotNull(unnestCollectionRefs);
+    if (unnestCollectionRefs.size() <= 1) return;
+    List<CollectionTableRef> reducedCollectionRefs =
+        unnestCollectionRefs.subList(1, unnestCollectionRefs.size());
+    refs.removeAll(reducedCollectionRefs);
+  }
+
+  /**
    * Returns a new AggregationNode that materializes the aggregation of the given stmt.
    * Assigns conjuncts from the Having clause to the returned node.
    */
@@ -2132,19 +2168,29 @@ public class SingleNodePlanner {
    * The given 'aggInfo' is used for detecting and applying optimizations that span both
    * the scan and aggregation. Only applicable to HDFS and Kudu table refs.
    *
+   * 'collectionRefs' holds all the CollectionTableRefs that serve the purpose of zipping
+   * unnest arrays. Unlike the regular CollectionTableRefs, these will be handled by a
+   * single UnnestNode.
+   *
    * Throws if a PlanNode.init() failed or if planning of the given
    * table ref is not implemented.
    */
   private PlanNode createTableRefNode(TableRef tblRef, MultiAggregateInfo aggInfo,
-      Analyzer analyzer) throws ImpalaException {
+      Analyzer analyzer, List<CollectionTableRef> collectionRefsToZip)
+      throws ImpalaException {
     PlanNode result = null;
     if (tblRef instanceof BaseTableRef) {
       result = createScanNode(tblRef, aggInfo, analyzer);
     } else if (tblRef instanceof CollectionTableRef) {
       if (tblRef.isRelative()) {
         Preconditions.checkState(ctx_.hasSubplan());
-        result = new UnnestNode(ctx_.getNextNodeId(), ctx_.getSubplan(),
-            (CollectionTableRef) tblRef);
+        if (collectionRefsToZip != null && collectionRefsToZip.size() > 0) {
+          result = new UnnestNode(ctx_.getNextNodeId(), ctx_.getSubplan(),
+              collectionRefsToZip);
+        } else {
+          result = new UnnestNode(ctx_.getNextNodeId(), ctx_.getSubplan(),
+              (CollectionTableRef) tblRef);
+        }
         result.init(analyzer);
       } else {
         result = createScanNode(tblRef, null, analyzer);
diff --git a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
index 496268d..df1fe87 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
@@ -17,6 +17,8 @@
 
 package org.apache.impala.planner;
 
+import java.util.List;
+
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.CollectionTableRef;
 import org.apache.impala.analysis.Expr;
@@ -27,34 +29,48 @@ import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TUnnestNode;
 import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 import com.google.common.base.Preconditions;
 
 /**
- * An UnnestNode scans over a collection materialized in memory, and returns
- * one row per item in the collection.
+ * An UnnestNode scans over collections materialized in memory, and returns
+ * one row per item in the collection if a single collection is provided or it returns as
+ * many rows as the length of the longest collection this node handles. For the shorter
+ * collections the missing items are filled with nulls.
  * An UnnestNode can only appear in the plan tree of a SubplanNode.
  */
 public class UnnestNode extends PlanNode {
   private final SubplanNode containingSubplanNode_;
-  private final CollectionTableRef tblRef_;
-  private final Expr collectionExpr_;
+  private final List<CollectionTableRef> tblRefs_;
+  private final List<Expr> collectionExprs_;
 
   public UnnestNode(PlanNodeId id, SubplanNode containingSubplanNode,
       CollectionTableRef tblRef) {
-    super(id, tblRef.getDesc().getId().asList(), "UNNEST");
+    this(id, containingSubplanNode, Lists.newArrayList(tblRef));
+  }
+
+  public UnnestNode(PlanNodeId id, SubplanNode containingSubplanNode,
+      List<CollectionTableRef> tblRefs) {
+    super(id, "UNNEST", tblRefs);
     containingSubplanNode_ = containingSubplanNode;
-    tblRef_ = tblRef;
-    collectionExpr_ = tblRef_.getCollectionExpr();
-    // Assume the collection expr has been fully resolved in analysis.
-    Preconditions.checkState(
-        collectionExpr_.isBoundByTupleIds(containingSubplanNode.getChild(0).tupleIds_));
+    tblRefs_ = tblRefs;
+    collectionExprs_ = getCollectionExprs(tblRefs_);
+    // Assume the collection exprs have been fully resolved in analysis.
+    for (Expr collectionExpr : collectionExprs_) {
+      Preconditions.checkState(
+          collectionExpr.isBoundByTupleIds(containingSubplanNode.getChild(0).tupleIds_));
+    }
+  }
+
+  private List<Expr> getCollectionExprs(List<CollectionTableRef> collectionRefs) {
+    Preconditions.checkState(collectionRefs.size() > 0);
+    List<Expr> result = Lists.newArrayList();
+    for (CollectionTableRef ref : collectionRefs) result.add(ref.getCollectionExpr());
+    return result;
   }
 
   @Override
   public void init(Analyzer analyzer) throws ImpalaException {
-    // Do not assign binding predicates or predicates for enforcing slot equivalences
-    // because they must have been assigned in the scan node materializing the
-    // collection-typed slot.
     super.init(analyzer);
     conjuncts_ = orderConjunctsByCost(conjuncts_);
 
@@ -102,14 +118,25 @@ public class UnnestNode extends PlanNode {
   @Override
   protected String getDisplayLabelDetail() {
     StringBuilder strBuilder = new StringBuilder();
-    strBuilder.append(Joiner.on(".").join(tblRef_.getPath()));
-    if (tblRef_.hasExplicitAlias()) strBuilder.append(" " + tblRef_.getExplicitAlias());
+    boolean first = true;
+    for (CollectionTableRef tblRef : tblRefs_) {
+      if (!first) strBuilder.append(", ");
+      strBuilder.append(Joiner.on(".").join(tblRef.getPath()));
+      if (tblRef.hasExplicitAlias()) {
+        strBuilder.append(" " + tblRef.getExplicitAlias());
+      }
+      first = false;
+    }
     return strBuilder.toString();
   }
 
   @Override
   protected void toThrift(TPlanNode msg) {
     msg.node_type = TPlanNodeType.UNNEST_NODE;
-    msg.setUnnest_node(new TUnnestNode(collectionExpr_.treeToThrift()));
+    TUnnestNode unnestNode = new TUnnestNode();
+    for (Expr expr : collectionExprs_) {
+      unnestNode.addToCollection_exprs(expr.treeToThrift());
+    }
+    msg.setUnnest_node(unnestNode);
   }
 }
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 6778352..e60676a 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -269,6 +269,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("uncached", SqlParserSymbols.KW_UNCACHED);
     keywordMap.put("union", SqlParserSymbols.KW_UNION);
     keywordMap.put("unknown", SqlParserSymbols.KW_UNKNOWN);
+    keywordMap.put("unnest", SqlParserSymbols.KW_UNNEST);
     keywordMap.put("unset", SqlParserSymbols.KW_UNSET);
     keywordMap.put("update", SqlParserSymbols.KW_UPDATE);
     keywordMap.put("update_fn", SqlParserSymbols.KW_UPDATE_FN);
@@ -386,7 +387,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
         "substring_regex", "succeeds", "sum", "symmetric", "system", "system_time",
         "system_user", "tan", "tanh", "time", "timezone_hour", "timezone_minute",
         "trailing", "translate", "translate_regex", "translation", "treat", "trigger",
-        "trim", "trim_array", "uescape", "unique", "unknown", "unnest", "update  ",
+        "trim", "trim_array", "uescape", "unique", "unknown", "update  ",
         "upper", "user", "value", "value_of", "var_pop", "var_samp", "varbinary",
         "varying", "versioning", "whenever", "width_bucket", "window", "within",
         "without", "year"}));
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index fae991d..5c38d37 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -4606,7 +4606,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     testNumberOfMembers(ValuesStmt.class, 0);
 
     // Also check TableRefs.
-    testNumberOfMembers(TableRef.class, 27);
+    testNumberOfMembers(TableRef.class, 28);
     testNumberOfMembers(BaseTableRef.class, 0);
     testNumberOfMembers(InlineViewRef.class, 10);
   }
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index ba4de8c..d700186 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -3515,7 +3515,7 @@ public class ParserTest extends FrontendTestBase {
         "Encountered: FROM\n" +
         "Expected: ALL, CASE, CAST, DATE, DEFAULT, DISTINCT, EXISTS, FALSE, GROUPING, " +
         "IF, INTERVAL, LEFT, NOT, NULL, REPLACE, RIGHT, STRAIGHT_JOIN, TRUNCATE, TRUE, " +
-        "IDENTIFIER");
+        "UNNEST, IDENTIFIER");
 
     // missing from
     ParserError("select c, b, c where a = 5",
@@ -3533,7 +3533,7 @@ public class ParserTest extends FrontendTestBase {
         "select c, b, c from where a = 5\n" +
         "                    ^\n" +
         "Encountered: WHERE\n" +
-        "Expected: DEFAULT, IDENTIFIER\n");
+        "Expected: DEFAULT, UNNEST, IDENTIFIER\n");
 
     // missing predicate in where clause (no group by)
     ParserError("select c, b, c from t where",
@@ -3542,7 +3542,8 @@ public class ParserTest extends FrontendTestBase {
         "                           ^\n" +
         "Encountered: EOF\n" +
         "Expected: CASE, CAST, DATE, DEFAULT, EXISTS, FALSE, GROUPING, IF, INTERVAL, " +
-        "LEFT, NOT, NULL, REPLACE, RIGHT, STRAIGHT_JOIN, TRUNCATE, TRUE, IDENTIFIER");
+        "LEFT, NOT, NULL, REPLACE, RIGHT, STRAIGHT_JOIN, TRUNCATE, TRUE, UNNEST, " +
+        "IDENTIFIER");
 
     // missing predicate in where clause (group by)
     ParserError("select c, b, c from t where group by a, b",
@@ -3551,7 +3552,8 @@ public class ParserTest extends FrontendTestBase {
         "                            ^\n" +
         "Encountered: GROUP\n" +
         "Expected: CASE, CAST, DATE, DEFAULT, EXISTS, FALSE, GROUPING, IF, INTERVAL, " +
-        "LEFT, NOT, NULL, REPLACE, RIGHT, STRAIGHT_JOIN, TRUNCATE, TRUE, IDENTIFIER");
+        "LEFT, NOT, NULL, REPLACE, RIGHT, STRAIGHT_JOIN, TRUNCATE, TRUE, UNNEST, " +
+        "IDENTIFIER");
 
     // unmatched string literal starting with "
     ParserError("select c, \"b, c from t",
@@ -3614,7 +3616,7 @@ public class ParserTest extends FrontendTestBase {
         "                             ^\n" +
         "Encountered: COMMA\n" +
         "Expected: CASE, CAST, DATE, DEFAULT, EXISTS, FALSE, GROUPING, IF, INTERVAL, " +
-        "LEFT, NOT, NULL, REPLACE, RIGHT, TRUNCATE, TRUE, IDENTIFIER");
+        "LEFT, NOT, NULL, REPLACE, RIGHT, TRUNCATE, TRUE, UNNEST, IDENTIFIER");
 
     // Parsing identifiers that have different names printed as EXPECTED
     ParserError("DROP DATA SRC foo",
@@ -3652,7 +3654,7 @@ public class ParserTest extends FrontendTestBase {
          "Encountered: EOF\n" +
          "Expected: ALL, CASE, CAST, DATE, DEFAULT, DISTINCT, EXISTS, FALSE, GROUPING, " +
          "IF, INTERVAL, LEFT, NOT, NULL, REPLACE, RIGHT, " +
-         "STRAIGHT_JOIN, TRUNCATE, TRUE, IDENTIFIER\n");
+         "STRAIGHT_JOIN, TRUNCATE, TRUE, UNNEST, IDENTIFIER\n");
     ParserError("SELECT\n\n",
          "Syntax error in line 3:\n" +
          "\n" +
@@ -3660,7 +3662,7 @@ public class ParserTest extends FrontendTestBase {
          "Encountered: EOF\n" +
          "Expected: ALL, CASE, CAST, DATE, DEFAULT, DISTINCT, EXISTS, FALSE, GROUPING, " +
          "IF, INTERVAL, LEFT, NOT, NULL, REPLACE, RIGHT, " +
-         "STRAIGHT_JOIN, TRUNCATE, TRUE, IDENTIFIER\n");
+         "STRAIGHT_JOIN, TRUNCATE, TRUE, UNNEST, IDENTIFIER\n");
   }
 
   @Test
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index 950fd3e..10f4022 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -1558,6 +1558,44 @@ public class ToSqlTest extends FrontendTestBase {
         "(date_sub(timestamp_col, interval 40 hours)) from functional.alltypes",
         "SELECT DATE_SUB(timestamp_col, INTERVAL 40 hours), " +
         "(DATE_SUB(timestamp_col, INTERVAL 40 hours)) FROM functional.alltypes");
+
+    // UNNEST() operator in SELECT statement.
+    testToSql(
+        "select unnest(arr1), unnest(arr2) from functional_parquet.complextypes_arrays",
+        "SELECT UNNEST(arr1), UNNEST(arr2) FROM functional_parquet.complextypes_arrays");
+    // UNNEST() operator in SELECT statement with aliases.
+    testToSql(
+        "select unnest(arr1) a1, unnest(arr2) a2 from " +
+            "functional_parquet.complextypes_arrays",
+        "SELECT UNNEST(arr1) a1, UNNEST(arr2) a2 FROM " +
+            "functional_parquet.complextypes_arrays");
+    testToSql(
+        "select unnest(arr1) as a1, unnest(arr2) as a2 from " +
+            "functional_parquet.complextypes_arrays",
+        "SELECT UNNEST(arr1) a1, UNNEST(arr2) a2 FROM " +
+            "functional_parquet.complextypes_arrays");
+    testToSql(
+        "select unnest(t.arr1) as a1, unnest(t.arr2) as a2 from " +
+            "functional_parquet.complextypes_arrays t",
+        "SELECT UNNEST(t.arr1) a1, UNNEST(t.arr2) a2 FROM " +
+            "functional_parquet.complextypes_arrays t");
+
+    // UNNEST() operator in FROM clause.
+    testToSql(
+        "select arr1.item, arr2.item from functional_parquet.complextypes_arrays t, " +
+            "unnest(t.arr1, t.arr2)",
+        "SELECT arr1.item, arr2.item FROM functional_parquet.complextypes_arrays t, " +
+            "UNNEST(t.arr1, t.arr2)");
+    testToSql(
+        "select arr1.item, arr2.item from functional_parquet.complextypes_arrays t, " +
+            "unnest(t.arr1, t.arr2), functional_parquet.alltypes",
+        "SELECT arr1.item, arr2.item FROM functional_parquet.complextypes_arrays t, " +
+            "UNNEST(t.arr1, t.arr2), functional_parquet.alltypes");
+    testToSql(
+        "select arr1.item, arr2.item from functional_parquet.complextypes_arrays t, " +
+            "functional_parquet.alltypes, unnest(t.arr1, t.arr2)",
+        "SELECT arr1.item, arr2.item FROM functional_parquet.complextypes_arrays t, " +
+            "functional_parquet.alltypes, UNNEST(t.arr1, t.arr2)");
   }
 
   /**
diff --git a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
index 9b1f31d..64c96da 100644
--- a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
@@ -738,6 +738,31 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
               "alltypessmall", ALLTYPES_COLUMNS, TPrivilegeLevel.SELECT));
     }
 
+    // Unnest an array.
+    authorize("select unnest(int_array_col) from functional.allcomplextypes")
+        .ok(onServer(TPrivilegeLevel.ALL))
+        .ok(onServer(TPrivilegeLevel.OWNER))
+        .ok(onServer(TPrivilegeLevel.SELECT))
+        .ok(onDatabase("functional", TPrivilegeLevel.ALL))
+        .ok(onDatabase("functional", TPrivilegeLevel.OWNER))
+        .ok(onDatabase("functional", TPrivilegeLevel.SELECT))
+        .ok(onTable("functional", "allcomplextypes", TPrivilegeLevel.ALL))
+        .ok(onTable("functional", "allcomplextypes", TPrivilegeLevel.OWNER))
+        .ok(onTable("functional", "allcomplextypes", TPrivilegeLevel.SELECT))
+        .ok(onColumn("functional", "allcomplextypes",
+            new String[]{"id", "int_array_col"}, TPrivilegeLevel.SELECT))
+        .error(selectError("functional.allcomplextypes"))
+        .error(selectError("functional.allcomplextypes"), onServer(
+            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER,
+            TPrivilegeLevel.SELECT)))
+        .error(selectError("functional.allcomplextypes"), onDatabase("functional",
+            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER,
+            TPrivilegeLevel.SELECT)))
+        .error(selectError("functional.allcomplextypes"), onTable("functional",
+            "allcomplextypes", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER,
+            TPrivilegeLevel.SELECT)));
+
+
     // Union on views.
     authorize("select id from functional.alltypes_view union all " +
         "select x from functional.alltypes_view_sub")
diff --git a/testdata/ComplexTypesTbl/arrays.orc b/testdata/ComplexTypesTbl/arrays.orc
new file mode 100644
index 0000000..5da9fa4
Binary files /dev/null and b/testdata/ComplexTypesTbl/arrays.orc differ
diff --git a/testdata/ComplexTypesTbl/arrays.parq b/testdata/ComplexTypesTbl/arrays.parq
new file mode 100644
index 0000000..bc9b005
Binary files /dev/null and b/testdata/ComplexTypesTbl/arrays.parq differ
diff --git a/testdata/data/README b/testdata/data/README
index 14e0078..bfaa0c1 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -657,6 +657,12 @@ https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/test/java/or
 The schema was completely changed to allow us to test types supported in Parquet Bloom
 filters.
 
+ComplexTypesTbl/arrays.orc and arrays.parq
+These tables hold 3 columns, an int ID and two arrays, one with int and the second with
+string. The purpose of introducing these tables is to give more test coverage for zipping
+unnests. There are rows where the 2 arrays are of the same lenght, or one of them is
+longer than the other plus there are NULL and empty arrays as well.
+
 binary_decimal_precision_and_scale_widening.parquet
 Parquet file written with schema (decimal(9,2), decimal(18,2), decimal(38,2)). The rows
 inside the file are carefully chosen so that they don't cause an overflow when being read
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 21293ed..11800d9 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -854,6 +854,22 @@ LOAD DATA LOCAL INPATH '{impala_home}/testdata/ComplexTypesTbl/structs_nested.or
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+complextypes_arrays
+---- COLUMNS
+id int
+arr1 array<int>
+arr2 array<string>
+---- DEPENDENT_LOAD
+`hadoop fs -mkdir -p /test-warehouse/complextypes_arrays_parquet && \
+hadoop fs -put -f ${IMPALA_HOME}/testdata/ComplexTypesTbl/arrays.parq \
+/test-warehouse/complextypes_arrays_parquet/
+---- DEPENDENT_LOAD_ACID
+LOAD DATA LOCAL INPATH '{impala_home}/testdata/ComplexTypesTbl/arrays.orc' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
+---- LOAD
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 complextypestbl_minor_compacted
 ---- COLUMNS
 id bigint
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index 29362a4..bfd0844 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -105,6 +105,8 @@ table_name:complextypes_structs, constraint:restrict_to, table_format:parquet/no
 table_name:complextypes_structs, constraint:restrict_to, table_format:orc/def/block
 table_name:complextypes_nested_structs, constraint:restrict_to, table_format:parquet/none/none
 table_name:complextypes_nested_structs, constraint:restrict_to, table_format:orc/def/block
+table_name:complextypes_arrays, constraint:restrict_to, table_format:parquet/none/none
+table_name:complextypes_arrays, constraint:restrict_to, table_format:orc/def/block
 
 table_name:alltypeserror, constraint:exclude, table_format:parquet/none/none
 table_name:alltypeserrornonulls, constraint:exclude, table_format:parquet/none/none
diff --git a/testdata/workloads/functional-query/queries/QueryTest/zipping-unnest-from-view.test b/testdata/workloads/functional-query/queries/QueryTest/zipping-unnest-from-view.test
new file mode 100644
index 0000000..63cd79c
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/zipping-unnest-from-view.test
@@ -0,0 +1,57 @@
+====
+---- QUERY
+create view view_unnested_arrays as
+    select id, unnest(arr1) as arr1_unnested, unnest(arr2) as arr2_unnested
+    from functional_orc_def.complextypes_arrays;
+---- RESULTS
+'View has been created.'
+====
+---- QUERY
+# Query unnested array items from a view that does the unnesting itself.
+select id, arr1_unnested, arr2_unnested from view_unnested_arrays;
+---- RESULTS
+1,1,'one'
+1,2,'two'
+1,3,'three'
+1,4,'four'
+1,5,'five'
+2,1,'one'
+2,NULL,'two'
+2,3,'three'
+2,4,'NULL'
+2,5,'five'
+3,10,'ten'
+3,9,'NULL'
+3,8,'NULL'
+4,10,'ten'
+4,NULL,'nine'
+4,NULL,'eight'
+5,10,'ten'
+5,NULL,'eleven'
+5,12,'twelve'
+5,NULL,'thirteen'
+6,NULL,'str1'
+6,NULL,'str2'
+7,1,'NULL'
+7,2,'NULL'
+9,NULL,'str1'
+9,NULL,'str2'
+10,1,'NULL'
+10,2,'NULL'
+10,3,'NULL'
+---- TYPES
+INT,INT,STRING
+====
+---- QUERY
+# Same as above but there is a filter in the outer select.
+select id, arr1_unnested, arr2_unnested from view_unnested_arrays
+where arr1_unnested > 5;
+---- RESULTS
+3,10,'ten'
+3,9,'NULL'
+3,8,'NULL'
+4,10,'ten'
+5,10,'ten'
+5,12,'twelve'
+---- TYPES
+INT,INT,STRING
diff --git a/testdata/workloads/functional-query/queries/QueryTest/zipping-unnest-in-from-clause.test b/testdata/workloads/functional-query/queries/QueryTest/zipping-unnest-in-from-clause.test
new file mode 100644
index 0000000..64e436f
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/zipping-unnest-in-from-clause.test
@@ -0,0 +1,411 @@
+====
+---- QUERY
+# Zipping unnest for a single array works the same way as the traditional unnest.
+select id, int_array.item from complextypestbl t, unnest(t.int_array);
+---- RESULTS
+1,1
+1,2
+1,3
+2,NULL
+2,1
+2,2
+2,NULL
+2,3
+2,NULL
+8,-1
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+# Zipping unnest multiple arrays performs zipping instead of joining the array or in
+# other words it puts the values of each array next to each other.
+select id, a1.item, a2.item
+from complextypestbl t, unnest(t.int_array, t.int_array) as (a1, a2);
+---- RESULTS
+1,1,1
+1,2,2
+1,3,3
+2,NULL,NULL
+2,1,1
+2,2,2
+2,NULL,NULL
+2,3,3
+2,NULL,NULL
+8,-1,-1
+---- TYPES
+BIGINT,INT,INT
+====
+---- QUERY
+# Unnest 2 arrays of the same length.
+select arr1.item, arr2.item
+from complextypes_arrays t, unnest(t.arr1, t.arr2)
+where id = 1;
+---- RESULTS
+1,'one'
+2,'two'
+3,'three'
+4,'four'
+5,'five'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Unnest 2 arrays with the same length and they have some null item(s).
+select arr1.item, arr2.item
+from complextypes_arrays t, unnest(t.arr1, t.arr2)
+where id = 2;
+---- RESULTS
+1,'one'
+NULL,'two'
+3,'three'
+4,'NULL'
+5,'five'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Unnest 2 arrays where the second one has less items.
+select arr1.item, arr2.item
+from complextypes_arrays t, unnest(t.arr1, t.arr2)
+where id = 3;
+---- RESULTS
+10,'ten'
+9,'NULL'
+8,'NULL'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Unnest 2 arrays where the first one has less items.
+select arr1.item, arr2.item
+from complextypes_arrays t, unnest(t.arr1, t.arr2)
+where id = 4;
+---- RESULTS
+10,'ten'
+NULL,'nine'
+NULL,'eight'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Unnest 2 arrays with different length and the first one has some null item(s).
+select arr1.item, arr2.item
+from complextypes_arrays t, unnest(t.arr1, t.arr2)
+where id = 5;
+---- RESULTS
+10,'ten'
+NULL,'eleven'
+12,'twelve'
+NULL,'thirteen'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Unnest 2 arrays where the first one is empty.
+select arr1.item, arr2.item
+from complextypes_arrays t, unnest(t.arr1, t.arr2)
+where id = 6;
+---- RESULTS
+NULL,'str1'
+NULL,'str2'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Unnest 2 arrays where the second one is empty.
+select arr1.item, arr2.item
+from complextypes_arrays t, unnest(t.arr1, t.arr2)
+where id = 7;
+---- RESULTS
+1,'NULL'
+2,'NULL'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Unnest a single array that is empty.
+select arr1.item
+from complextypes_arrays t, unnest(t.arr1)
+where id = 6;
+---- RESULTS
+---- TYPES
+INT
+====
+---- QUERY
+# Unnest 2 empty arrays.
+select arr1.item, arr2.item
+from complextypes_arrays t, unnest(t.arr1, t.arr2)
+where id = 8;
+---- RESULTS
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Unnest 2 arrays where the first one is NULL.
+select arr1.item, arr2.item
+from complextypes_arrays t, unnest(t.arr1, t.arr2)
+where id = 9;
+---- RESULTS
+NULL,'str1'
+NULL,'str2'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Unnest 2 arrays where the second one is NULL.
+select arr1.item, arr2.item
+from complextypes_arrays t, unnest(t.arr1, t.arr2)
+where id = 10;
+---- RESULTS
+1,'NULL'
+2,'NULL'
+3,'NULL'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Unnest a single array that is NULL.
+select arr2.item
+from complextypes_arrays t, unnest(t.arr2)
+where id = 10;
+---- RESULTS
+---- TYPES
+STRING
+====
+---- QUERY
+# Unnest 2 NULL arrays.
+select arr1.item, arr2.item
+from complextypes_arrays t, unnest(t.arr1, t.arr2)
+where id = 11;
+---- RESULTS
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# A non-collection field is given to UNNEST()
+select arr1.item
+from complextypes_arrays t, unnest(t.arr1, t.id);
+---- CATCH
+AnalysisException: Illegal table reference to non-collection type: 't.id'
+====
+---- QUERY
+# Multiple UNNEST() in FROM clause is not allowed.
+select arr1.item
+from complextypes_arrays t, unnest(t.arr1), unnest(t.arr2);
+---- CATCH
+Providing multiple UNNEST() in the FROM clause is not supported.
+====
+---- QUERY
+# Zipping unnest with joining unnest together is not allowed.
+select arr1.item, arr2.item
+from complextypes_arrays t, unnest(t.arr1), t.arr2;
+---- CATCH
+Providing zipping and joining unnests together is not supported.
+====
+---- QUERY
+# Zipping unnest with joining unnest together is not allowed.
+select id, arr1.item
+from complextypes_arrays t, t.arr1, unnest(t.arr2);
+---- CATCH
+Providing zipping and joining unnests together is not supported.
+====
+---- QUERY
+# Arrays in the zipping unnest are from different tables.
+select arr1.item, int_array.item
+from complextypes_arrays t1,
+     complextypestbl t2,
+     unnest(t1.arr1, t2.int_array);
+---- CATCH
+AnalysisException: Not supported to do zipping unnest on arrays from different tables.
+====
+---- QUERY
+# Zipping unnest with absolute paths doesn't wotk atm. See IMPALA-10977
+select arr1.item, arr2.item
+from unnest(functional_parquet.complextypes_arrays.arr1,
+            functional_parquet.complextypes_arrays.arr2);
+---- CATCH
+IllegalStateException
+====
+---- QUERY
+# Zipping unnest for a single array with absolute path works well as it's in fact not an
+# unnest but a single scan.
+select arr1.item
+from unnest(functional_parquet.complextypes_arrays.arr1)
+where arr1.item < 2;
+---- RESULTS
+1
+1
+1
+1
+---- TYPES
+INT
+====
+---- QUERY
+# Zipping unnest for a single array should also work well even if it goes through an
+# UNNEST node.
+select arr1.item
+from functional_parquet.complextypes_arrays t, unnest(t.arr1)
+where arr1.item < 2;
+---- RESULTS
+1
+1
+1
+1
+---- TYPES
+INT
+====
+---- QUERY
+# Do an unnest on an array that is not in the select list, e.g. won't be any top level
+# tuples for it.
+select id from complextypes_arrays t, unnest(t.arr1);
+---- RESULTS
+1
+1
+1
+1
+1
+2
+2
+2
+2
+2
+3
+3
+3
+4
+5
+5
+5
+7
+7
+10
+10
+10
+---- TYPES
+INT
+====
+---- QUERY
+# Similar as above but there is a where clause on a non-array field.
+select id from complextypes_arrays t, unnest(t.arr1) where id = 7;
+---- RESULTS
+7
+7
+---- TYPES
+INT
+====
+---- QUERY
+# Similar as above but now there are more than one arrays being zipping unnested.
+select id from complextypes_arrays t, unnest(t.arr1, t.arr2) where id = 7;
+---- RESULTS
+7
+7
+---- TYPES
+INT
+====
+---- QUERY
+# WHERE filter on an unnested array in the same SELECT statement is not allowed.
+select arr1.item, arr2.item
+from complextypes_arrays t, unnest(t.arr1, t.arr2)
+where arr1.item < 3;
+---- CATCH
+AnalysisException: Not allowed to add a filter on an unnested array under the same select statement: arr1.item
+====
+---- QUERY
+# Similar as above but the array.item is not present in the select list.
+select id
+from complextypes_arrays t, unnest(t.arr1, t.arr2)
+where arr1.item = 2;
+---- CATCH
+AnalysisException: Not allowed to add a filter on an unnested array under the same select statement: arr1.item
+====
+---- QUERY
+# Only arrays are allowed in a FROM clause unnest.
+select m.key, m.value from complextypestbl t, unnest(t.int_map) as (m);
+---- CATCH
+AnalysisException: Unnest operator is only supported for arrays. t.int_map
+====
+---- QUERY
+# There number of aliases is bigger than the number of arrays in an unnest.
+select a.item from functional_parquet.arr_tbl t, UNNEST(t.arr1) AS (a,b);
+---- CATCH
+The number of arrays doesn't match with the number of aliases
+====
+---- QUERY
+# There number of arrays in an unnest is bigger than the number of aliases.
+select a.item from functional_parquet.arr_tbl t, UNNEST(t.arr1, t.arr2) AS (a);
+---- CATCH
+The number of arrays doesn't match with the number of aliases
+====
+---- QUERY
+# WHERE filter on an unnested array is allowed if it's provided in an outer SELECT
+# statement. The filter predicate is not being pushed down to the scanner in this case.
+select id, a1, a2
+from (select id, arr1.item as a1, arr2.item as a2
+      from complextypes_arrays t, unnest(t.arr1, t.arr2))
+      as X
+where a1 < 3;
+---- RESULTS
+1,1,'one'
+1,2,'two'
+2,1,'one'
+7,1,'NULL'
+7,2,'NULL'
+10,1,'NULL'
+10,2,'NULL'
+---- TYPES
+INT,INT,STRING
+====
+---- QUERY
+# Similar as above but here there is only one array being unnested. Here the filter
+# predicate is being pushed down to the scanner.
+select id, a1
+from (select id, arr1.item as a1
+      from complextypes_arrays t, unnest(t.arr1))
+      as X
+where a1 < 3;
+---- RESULTS
+1,1
+1,2
+2,1
+7,1
+7,2
+10,1
+10,2
+---- TYPES
+INT,INT
+---- RUNTIME_PROFILE
+predicates on arr1: arr1.item < CAST(3 AS INT)
+====
+---- QUERY
+# After doing a zipping unnest on two arrays, do a regular join to another table. The
+# first array has 2 items while the second array is empty.
+select t1.id, t2.id, arr1.item, arr2.item
+from complextypes_arrays t1, unnest(t1.arr1, t1.arr2), alltypestiny t2
+where t1.id = 7 and t2.id < 3;
+---- RESULTS
+7,0,1,'NULL'
+7,0,2,'NULL'
+7,1,1,'NULL'
+7,1,2,'NULL'
+7,2,1,'NULL'
+7,2,2,'NULL'
+---- TYPES
+INT,INT,INT,STRING
+====
+---- QUERY
+# Similar as above but here the first array has 3 items while the second array has one.
+select t1.id, t2.id, arr1.item, arr2.item
+from complextypes_arrays t1, unnest(t1.arr1, t1.arr2), alltypestiny t2
+where t1.id = 3 and t2.id < 3;
+---- RESULTS
+3,0,10,'ten'
+3,0,9,'NULL'
+3,0,8,'NULL'
+3,1,10,'ten'
+3,1,9,'NULL'
+3,1,8,'NULL'
+3,2,10,'ten'
+3,2,9,'NULL'
+3,2,8,'NULL'
+---- TYPES
+INT,INT,INT,STRING
diff --git a/testdata/workloads/functional-query/queries/QueryTest/zipping-unnest-in-select-list.test b/testdata/workloads/functional-query/queries/QueryTest/zipping-unnest-in-select-list.test
new file mode 100644
index 0000000..66927ec
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/zipping-unnest-in-select-list.test
@@ -0,0 +1,187 @@
+====
+---- QUERY
+# Table ref given with an alias and the unnested arrays' path contains this alias.
+select unnest(t.arr1), unnest(t.arr2) from complextypes_arrays t where id = 5;
+---- RESULTS
+10,'ten'
+NULL,'eleven'
+12,'twelve'
+NULL,'thirteen'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Similar to the above but one array is given multiple times.
+select unnest(t.arr1), unnest(t.arr2), unnest(t.arr1)
+from complextypes_arrays t
+where id = 5;
+---- RESULTS
+10,'ten',10
+NULL,'eleven',NULL
+12,'twelve',12
+NULL,'thirteen',NULL
+---- TYPES
+INT,STRING,INT
+====
+---- QUERY
+# No alias is given for the table ref but the unnested arrays' path contains a table
+# alias.
+select unnest(t.arr1) from complextypes_arrays;
+---- CATCH
+AnalysisException: Could not resolve column/field reference: 't.arr1'
+====
+---- QUERY
+# Giving arr.item to an unnest is not allowed if the item of that array is not an array
+# itself.
+select unnest(arr1.item) from complextypes_arrays;
+---- CATCH
+AnalysisException: Illegal column/field reference 'arr1.item' with intermediate collection 'arr1' of type 'ARRAY<INT>'
+====
+---- QUERY
+# No alias is given for the table ref and the unnested arrays' path doesn't contain a
+# table alias.
+select unnest(arr1) from complextypes_arrays where id = 5;
+---- RESULTS
+10
+NULL
+12
+---- TYPES
+INT
+====
+---- QUERY
+# The unnested array is given with an absolute path.
+select unnest(complextypes_arrays.arr1) from complextypes_arrays where id = 5;
+---- RESULTS
+10
+NULL
+12
+---- TYPES
+INT
+====
+---- QUERY
+# Multiple arrays are queried, with different path.
+select unnest(complextypes_arrays.arr1), unnest(arr2)
+from complextypes_arrays
+where id = 3;
+---- RESULTS
+10,'ten'
+9,'NULL'
+8,'NULL'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Unnest a single array that is empty.
+select unnest(arr1)
+from complextypes_arrays
+where id = 6;
+---- RESULTS
+---- TYPES
+INT
+====
+---- QUERY
+# Use aliases for the unnests and use these aliases in an outer select.
+select id, a1, a2
+from (
+    select id, unnest(arr1) a1, unnest(arr2) a2
+    from complextypes_arrays
+    where id = 3 or id = 4) x;
+---- RESULTS
+3,10,'ten'
+3,9,'NULL'
+3,8,'NULL'
+4,10,'ten'
+4,NULL,'nine'
+4,NULL,'eight'
+---- TYPES
+INT,INT,STRING
+====
+---- QUERY
+# Similar as above but here is a WHERE filter on the outer select filtering by the
+# aliases.
+select id, a1, a2
+from (
+    select id, unnest(arr1) a1, unnest(arr2) a2
+    from complextypes_arrays
+    where id = 3 or id = 4) x
+where a1 > 8 and a2 = 'ten';
+---- RESULTS
+3,10,'ten'
+4,10,'ten'
+---- TYPES
+INT,INT,STRING
+====
+---- QUERY
+# WHERE filter on an unnested array is allowed if it's provided in an outer SELECT
+# statement. The filter predicate is not being pushed down to the scanner in this case.
+select id, a1, a2
+from (select id, unnest(arr1) as a1, unnest(arr2) as a2
+      from complextypes_arrays) x
+where a1 < 3;
+---- RESULTS
+1,1,'one'
+1,2,'two'
+2,1,'one'
+7,1,'NULL'
+7,2,'NULL'
+10,1,'NULL'
+10,2,'NULL'
+---- TYPES
+INT,INT,STRING
+====
+---- QUERY
+# Primitive type is given for the unnest operator.
+select unnest(complextypes_arrays.id) from complextypes_arrays;
+---- CATCH
+AnalysisException: Unnest operator is only supported for arrays. complextypes_arrays.id
+====
+---- QUERY
+# Struct type is given for the unnest operator.
+select unnest(t.alltypes) from complextypes_structs t;
+---- CATCH
+AnalysisException: Unnest operator is only supported for arrays. t.alltypes
+====
+---- QUERY
+# Map type is given for the unnest operator.
+select unnest(int_map) from complextypestbl;
+---- CATCH
+AnalysisException: Unnest operator is only supported for arrays. int_map
+====
+---- QUERY
+# Zipping and joining unnests are given together
+select unnest(arr1) from complextypes_arrays t, t.arr2;
+---- CATCH
+AnalysisException: Providing zipping and joining unnests together is not supported.
+====
+---- QUERY
+# Zipping and joining unnests are given together where a single table ref is in the FROM
+# clause with absolute path.
+select unnest(a) from complextypestbl.int_array_array;
+---- CATCH
+AnalysisException: Providing zipping and joining unnests together is not supported.
+====
+---- QUERY
+# Zipping unnest given in both select list and from clause.
+select unnest(arr1) from complextypes_arrays t, unnest(t.arr1);
+---- CATCH
+AnalysisException: Providing zipping unnest both in the SELECT list and in the FROM clause is not supported.
+====
+---- QUERY
+# Zipping unnest given in both select list and from clause.
+select unnest(arr1) from complextypes_arrays t, unnest(t.arr2);
+---- CATCH
+AnalysisException: Providing zipping unnest both in the SELECT list and in the FROM clause is not supported.
+====
+---- QUERY
+# Do an unnest on the outer layer of a nested array.
+# IMPALA-9498 Might fix this as it will allow to query arrays in the select list.
+select unnest(int_array_array) from complextypestbl;
+---- CATCH
+AnalysisException: Expr 'UNNEST(int_array_array)' in select list returns a collection type 'ARRAY<INT>'.
+====
+---- QUERY
+# Do an unnest on the inner layer of a nested array. This will give an error because the
+# inner array can't be referenced directly without having the outer array as a table ref.
+select unnest(int_array_array.item) from complextypestbl;
+---- CATCH
+AnalysisException: Illegal column/field reference 'int_array_array.item' with intermediate collection 'int_array_array' of type 'ARRAY<ARRAY<INT>>'
diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py
index f202843..364e063 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -193,6 +193,36 @@ class TestComputeStatsWithNestedTypes(ImpalaTestSuite):
     """COMPUTE STATS and SHOW COLUMN STATS for tables with structs"""
     self.run_test_case('QueryTest/compute-stats-with-structs', vector)
 
+
+class TestZippingUnnest(ImpalaTestSuite):
+  """Functional tests for zipping unnest functionality."""
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestZippingUnnest, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format in ['parquet', 'orc'])
+
+  def test_zipping_unnest_in_from_clause(self, vector):
+    """Queries where zipping unnest is executed by providing UNNEST() in the from clause.
+    """
+    self.run_test_case('QueryTest/zipping-unnest-in-from-clause', vector)
+
+  def test_zipping_unnest_in_select_list(self, vector):
+    """Queries where zipping unnest is executed by providing UNNEST() in the select list.
+    """
+    self.run_test_case('QueryTest/zipping-unnest-in-select-list', vector)
+
+  def test_zipping_unnest_from_view(self, vector, unique_database):
+    """Zipping unnest queries where views are involved."""
+    if vector.get_value('table_format').file_format == 'orc':
+      pytest.skip('No need to run this test for multiple file formats.')
+    self.run_test_case('QueryTest/zipping-unnest-from-view', vector,
+        use_db=unique_database)
+
 class TestNestedTypesNoMtDop(ImpalaTestSuite):
   """Functional tests for nested types that do not need to be run with mt_dop > 0."""
   @classmethod