You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/08 21:07:51 UTC

[1/2] impala git commit: IMPALA-7400: [DOCS] Updated outdated contents from impala_porting

Repository: impala
Updated Branches:
  refs/heads/master bf24a814c -> a6c356850


IMPALA-7400: [DOCS] Updated outdated contents from impala_porting

Change-Id: I3a1edb1ec5076354df9301ce12e7618d7cf607a6
Reviewed-on: http://gerrit.cloudera.org:8080/11154
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/dd8a25d4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/dd8a25d4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/dd8a25d4

Branch: refs/heads/master
Commit: dd8a25d4c2976bddbe40cf7b37a2b69447855f83
Parents: bf24a81
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Tue Aug 7 17:33:20 2018 -0700
Committer: Alex Rodoni <ar...@cloudera.com>
Committed: Wed Aug 8 20:33:08 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_porting.xml | 399 ++++++++++++++++--------------------
 1 file changed, 179 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/dd8a25d4/docs/topics/impala_porting.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_porting.xml b/docs/topics/impala_porting.xml
index f9f2081..63e4070 100644
--- a/docs/topics/impala_porting.xml
+++ b/docs/topics/impala_porting.xml
@@ -40,13 +40,11 @@ under the License.
 
   <conbody>
 
-    <p>
-      <indexterm audience="hidden">porting</indexterm>
-      Although Impala uses standard SQL for queries, you might need to modify SQL source when bringing applications
-      to Impala, due to variations in data types, built-in functions, vendor language extensions, and
-      Hadoop-specific syntax. Even when SQL is working correctly, you might make further minor modifications for
-      best performance.
-    </p>
+    <p> Although Impala uses standard SQL for queries, you might need to modify
+      SQL source when bringing applications to Impala, due to variations in data
+      types, built-in functions, vendor language extensions, and Hadoop-specific
+      syntax. Even when SQL is working correctly, you might make further minor
+      modifications for best performance. </p>
 
     <p outputclass="toc inpage"/>
   </conbody>
@@ -351,152 +349,130 @@ select count(*) from (select * from t1);
 
     <conbody>
 
-      <p>
-        Some SQL statements or clauses that you might be familiar with are not currently supported in Impala:
-      </p>
+      <p> The following SQL statements or clauses are not currently supported or
+        supported with limitations in Impala: </p>
 
       <ul>
         <li>
-          <p>
-            Impala has no <codeph>DELETE</codeph> statement. Impala is intended for data warehouse-style operations
-            where you do bulk moves and transforms of large quantities of data. Instead of using
-            <codeph>DELETE</codeph>, use <codeph>INSERT OVERWRITE</codeph> to entirely replace the contents of a
-            table or partition, or use <codeph>INSERT ... SELECT</codeph> to copy a subset of data (everything but
-            the rows you intended to delete) from one table to another. See <xref href="impala_dml.xml#dml"/> for
-            an overview of Impala DML statements.
-          </p>
-        </li>
-
-        <li>
-          <p>
-            Impala has no <codeph>UPDATE</codeph> statement. Impala is intended for data warehouse-style operations
-            where you do bulk moves and transforms of large quantities of data. Instead of using
-            <codeph>UPDATE</codeph>, do all necessary transformations early in the ETL process, such as in the job
-            that generates the original data, or when copying from one table to another to convert to a particular
-            file format or partitioning scheme. See <xref href="impala_dml.xml#dml"/> for an overview of Impala DML
-            statements.
-          </p>
-        </li>
-
-        <li>
-          <p>
-            Impala has no transactional statements, such as <codeph>COMMIT</codeph> or <codeph>ROLLBACK</codeph>.
-            Impala effectively works like the <codeph>AUTOCOMMIT</codeph> mode in some database systems, where
-            changes take effect as soon as they are made.
-          </p>
-        </li>
-
-        <li>
-          <p>
-            If your database, table, column, or other names conflict with Impala reserved words, use different
-            names or quote the names with backticks. See <xref href="impala_reserved_words.xml#reserved_words"/>
-            for the current list of Impala reserved words.
-          </p>
-          <p>
-            Conversely, if you use a keyword that Impala does not recognize, it might be interpreted as a table or
-            column alias. For example, in <codeph>SELECT * FROM t1 NATURAL JOIN t2</codeph>, Impala does not
-            recognize the <codeph>NATURAL</codeph> keyword and interprets it as an alias for the table
-            <codeph>t1</codeph>. If you experience any unexpected behavior with queries, check the list of reserved
-            words to make sure all keywords in join and <codeph>WHERE</codeph> clauses are recognized.
-          </p>
-        </li>
-
-        <li>
-          <p>
-            Impala supports subqueries only in the <codeph>FROM</codeph> clause of a query, not within the
-            <codeph>WHERE</codeph> clauses. Therefore, you cannot use clauses such as <codeph>WHERE
-            <varname>column</varname> IN (<varname>subquery</varname>)</codeph>. Also, Impala does not allow
-            <codeph>EXISTS</codeph> or <codeph>NOT EXISTS</codeph> clauses (although <codeph>EXISTS</codeph> is a
-            reserved keyword).
-          </p>
-        </li>
-
-        <li>
-          <p>
-            Impala supports <codeph>UNION</codeph> and <codeph>UNION ALL</codeph> set operators, but not
-            <codeph>INTERSECT</codeph>. <ph conref="../shared/impala_common.xml#common/union_all_vs_union"/>
-          </p>
-        </li>
-
-        <li>
-          <p>
-            Within queries, Impala requires query aliases for any subqueries:
-          </p>
-<codeblock>-- Without the alias 'contents_of_t1' at the end, query gives syntax error.
-select count(*) from (select * from t1) contents_of_t1;</codeblock>
-        </li>
-
-        <li>
-          <p>
-            When an alias is declared for an expression in a query, that alias cannot be referenced again within
-            the same query block:
-          </p>
-<codeblock>-- Can't reference AVERAGE twice in the SELECT list where it's defined.
-select avg(x) as average, average+1 from t1 group by x;
-ERROR: AnalysisException: couldn't resolve column reference: 'average'
-
--- Although it can be referenced again later in the same query.
-select avg(x) as average from t1 group by x having average &gt; 3;</codeblock>
-          <p>
-            For Impala, either repeat the expression again, or abstract the expression into a <codeph>WITH</codeph>
-            clause, creating named columns that can be referenced multiple times anywhere in the base query:
-          </p>
-<codeblock>-- The following 2 query forms are equivalent.
-select avg(x) as average, avg(x)+1 from t1 group by x;
-with avg_t as (select avg(x) average from t1 group by x) select average, average+1 from avg_t;</codeblock>
-<!-- An alternative bunch of queries to use in the example above.
-[localhost:21000] > select x*x as x_squared from t1;
-
-[localhost:21000] > select x*x as x_squared from t1 where x_squared < 100;
-ERROR: AnalysisException: couldn't resolve column reference: 'x_squared'
-[localhost:21000] > select x*x as x_squared, x_squared * pi() as pi_x_squared from t1;
-ERROR: AnalysisException: couldn't resolve column reference: 'x_squared'
-[localhost:21000] > select x*x as x_squared from t1 group by x_squared;
-
-[localhost:21000] > select x*x as x_squared from t1 group by x_squared having x_squared < 100;
--->
-        </li>
-
-        <li>
-          <p>
-            Impala does not support certain rarely used join types that are less appropriate for high-volume tables
-            used for data warehousing. In some cases, Impala supports join types but requires explicit syntax to
-            ensure you do not do inefficient joins of huge tables by accident. For example, Impala does not support
-            natural joins or anti-joins, and requires the <codeph>CROSS JOIN</codeph> operator for Cartesian
-            products. See <xref href="impala_joins.xml#joins"/> for details on the syntax for Impala join clauses.
-          </p>
-        </li>
-
-        <li>
-          <p>
-            Impala has a limited choice of partitioning types. Partitions are defined based on each distinct
-            combination of values for one or more partition key columns. Impala does not redistribute or check data
-            to create evenly distributed partitions; you must choose partition key columns based on your knowledge
-            of the data volume and distribution. Adapt any tables that use range, list, hash, or key partitioning
-            to use the Impala partition syntax for <codeph>CREATE TABLE</codeph> and <codeph>ALTER TABLE</codeph>
-            statements. Impala partitioning is similar to range partitioning where every range has exactly one
-            value, or key partitioning where the hash function produces a separate bucket for every combination of
-            key values. See <xref href="impala_partitioning.xml#partitioning"/> for usage details, and
-            <xref href="impala_create_table.xml#create_table"/> and
-            <xref href="impala_alter_table.xml#alter_table"/> for syntax.
-          </p>
-          <note>
-            Because the number of separate partitions is potentially higher than in other database systems, keep a
-            close eye on the number of partitions and the volume of data in each one; scale back the number of
-            partition key columns if you end up with too many partitions with a small volume of data in each one.
-            Remember, to distribute work for a query across a cluster, you need at least one HDFS block per node.
-            HDFS blocks are typically multiple megabytes, <ph rev="parquet_block_size">especially</ph> for Parquet
-            files. Therefore, if each partition holds only a few megabytes of data, you are unlikely to see much
-            parallelism in the query because such a small amount of data is typically processed by a single node.
-          </note>
-        </li>
-
-        <li>
-          <p>
-            For <q>top-N</q> queries, Impala uses the <codeph>LIMIT</codeph> clause rather than comparing against a
-            pseudocolumn named <codeph>ROWNUM</codeph> or <codeph>ROW_NUM</codeph>. See
-            <xref href="impala_limit.xml#limit"/> for details.
-          </p>
+          <p> Impala supports the <codeph>DELETE</codeph> statement only for
+            Kudu tables. </p>
+          <p>Impala is intended for data warehouse-style operations where you do
+            bulk moves and transforms of large quantities of data. When not
+            using Kudu tables, instead of <codeph>DELETE</codeph>, use
+              <codeph>INSERT OVERWRITE</codeph> to entirely replace the contents
+            of a table or partition, or use <codeph>INSERT ... SELECT</codeph>
+            to copy a subset of data (everything but the rows you intended to
+            delete) from one table to another. See <xref
+              href="impala_dml.xml#dml"/> for an overview of Impala DML
+            statements. </p>
+        </li>
+        <li>
+          <p> Impala supports the <codeph>UPDATE</codeph> statement only for
+            Kudu tables.</p>
+          <p>When not using Kudu tables, instead of <codeph>UPDATE</codeph>, do
+            all necessary transformations early in the ETL process, such as in
+            the job that generates the original data, or when copying from one
+            table to another to convert to a particular file format or
+            partitioning scheme. See <xref href="impala_dml.xml#dml"/> for an
+            overview of Impala DML statements. </p>
+        </li>
+        <li>
+          <p> Impala has no transactional statements, such as
+              <codeph>COMMIT</codeph> or <codeph>ROLLBACK</codeph>. </p>
+          <p>Impala effectively works like the <codeph>AUTOCOMMIT</codeph> mode
+            in some database systems, where changes take effect as soon as they
+            are made. </p>
+        </li>
+        <li>
+          <p> If your database, table, column, or other names conflict with
+            Impala reserved words, use different names or quote the names with
+            backticks. </p>
+          <p>See <xref href="impala_reserved_words.xml#reserved_words"/> for the
+            current list of Impala reserved words. </p>
+          <p> Conversely, if you use a keyword that Impala does not recognize,
+            it might be interpreted as a table or column alias. </p>
+          <p>For example, in <codeph>SELECT * FROM t1 NATURAL JOIN t2</codeph>,
+            Impala does not recognize the <codeph>NATURAL</codeph> keyword and
+            interprets it as an alias for the table <codeph>t1</codeph>. If you
+            experience any unexpected behavior with queries, check the list of
+            reserved words to make sure all keywords in join and
+              <codeph>WHERE</codeph> clauses are supported keywords in Impala.
+          </p>
+        </li>
+        <li>
+          <p>Impala has some restrictions on subquery support. See <keyword
+              keyref="subqueries"/> for the current details.</p>
+        </li>
+        <li>
+          <p> Impala supports <codeph>UNION</codeph> and <codeph>UNION
+              ALL</codeph> set operators, but not <codeph>INTERSECT</codeph>. </p>
+          <p><ph conref="../shared/impala_common.xml#common/union_all_vs_union"
+            />
+          </p>
+        </li>
+        <li><p>Impala requires query aliases for the subqueries used as inline
+            views in the <codeph>FROM</codeph> clause. </p><p>For example,
+            without the alias <codeph>contents_of_t1</codeph> at the end, the
+            following query gives a syntax
+            error:<codeblock>SELECT COUNT(*) FROM (SELECT * FROM t1) contents_of_t1;</codeblock></p>Aliases
+          are not required for the subqueries used in other parts of queries.
+          For
+          example:<codeblock>SELECT * FROM functional.alltypes WHERE id = (SELECT MIN(id) FROM functional.alltypes);
+</codeblock></li>
+        <li>
+          <p> When an alias is declared for an expression in a query, that alias
+            cannot be referenced again within the same <codeph>SELECT</codeph>
+            list.</p>
+          <p>For example, the <codeph>average</codeph> alias cannot be
+            referenced twice in the <codeph>SELECT</codeph> list as below. You
+            will receive an error:</p>
+          <codeblock>SELECT AVG(x) AS average, average+1 FROM t1 GROUP BY x;</codeblock>
+          <p>An alias can be referenced again in the same query if not in the
+              <codeph>SELECT</codeph> list. For example, the
+              <codeph>average</codeph> alias can be referenced twice as shown
+            below:<codeblock>SELECT AVG(x) AS average FROM t1 GROUP BY x HAVING average &gt; 3;</codeblock></p>
+        </li>
+        <li>
+          <p> Impala does not support <codeph>NATURAL JOIN</codeph>, and it does
+            not support the <codeph>USING</codeph> clause in joins. See <xref
+              href="impala_joins.xml#joins"/> for details on the syntax for
+            Impala join clauses. </p>
+        </li>
+        <li>
+          <p> Impala supports a limited choice of partitioning types. </p>
+          <p>Partitions are defined based on each distinct combination of values
+            for one or more partition key columns. Impala does not redistribute
+            or check data to create evenly distributed partitions. You must
+            choose partition key columns based on your knowledge of the data
+            volume and distribution. Adapt any tables that use range, list,
+            hash, or key partitioning to use the Impala partition syntax for
+              <codeph>CREATE TABLE</codeph> and <codeph>ALTER TABLE</codeph>
+            statements. </p>
+          <p>Impala partitioning is similar to range partitioning where every
+            range has exactly one value, or key partitioning where the hash
+            function produces a separate bucket for every combination of key
+            values. See <xref href="impala_partitioning.xml#partitioning"/> for
+            usage details, and <xref href="impala_create_table.xml#create_table"
+            /> and <xref href="impala_alter_table.xml#alter_table"/> for syntax. </p>
+          <note> Because the number of separate partitions is potentially higher
+            than in other database systems, keep a close eye on the number of
+            partitions and the volume of data in each one; scale back the number
+            of partition key columns if you end up with too many partitions with
+            a small volume of data in each one. <p>To distribute work for a
+              query across a cluster, you need at least one HDFS block per node.
+              HDFS blocks are typically multiple megabytes, <ph
+                rev="parquet_block_size">especially</ph> for Parquet files.
+              Therefore, if each partition holds only a few megabytes of data,
+              you are unlikely to see much parallelism in the query because such
+              a small amount of data is typically processed by a single node.
+            </p></note>
+        </li>
+        <li>
+          <p> For the <q>top-N</q> queries, Impala uses the
+              <codeph>LIMIT</codeph> clause rather than comparing against a
+            pseudo column named <codeph>ROWNUM</codeph> or
+              <codeph>ROW_NUM</codeph>. </p>
+          <p>See <xref href="impala_limit.xml#limit"/> for details. </p>
         </li>
       </ul>
     </conbody>
@@ -504,16 +480,15 @@ ERROR: AnalysisException: couldn't resolve column reference: 'x_squared'
 
   <concept id="porting_antipatterns">
 
-    <title>SQL Constructs to Doublecheck</title>
+    <title>SQL Constructs to Double-check</title>
 
     <conbody>
 
-      <p>
-        Some SQL constructs that are supported have behavior or defaults more oriented towards convenience than
-        optimal performance. Also, sometimes machine-generated SQL, perhaps issued through JDBC or ODBC
-        applications, might have inefficiencies or exceed internal Impala limits. As you port SQL code, be alert
-        and change these things where appropriate:
-      </p>
+      <p> Some SQL constructs that are supported have behavior or defaults more
+        oriented towards convenience than optimal performance. Also, sometimes
+        machine-generated SQL, perhaps issued through JDBC or ODBC applications,
+        might have inefficiencies or exceed internal Impala limits. As you port
+        SQL code, examine and possibly update the following where appropriate: </p>
 
       <ul>
         <li>
@@ -528,16 +503,10 @@ ERROR: AnalysisException: couldn't resolve column reference: 'x_squared'
         </li>
 
         <li>
-          <p>
-            A <codeph>CREATE TABLE</codeph> statement with no <codeph>PARTITIONED BY</codeph> clause stores all the
-            data files in the same physical location, which can lead to scalability problems when the data volume
-            becomes large.
-          </p>
-          <p>
-            On the other hand, adapting tables that were already partitioned in a different database system could
-            produce an Impala table with a high number of partitions and not enough data in each one, leading to
-            underutilization of Impala's parallel query features.
-          </p>
+          <p> Adapting tables that were already partitioned in a different
+            database system could produce an Impala table with a high number of
+            partitions and not enough data in each one, leading to
+            underutilization of Impala's parallel query features. </p>
           <p>
             See <xref href="impala_partitioning.xml#partitioning"/> for details about setting up partitioning and
             tuning the performance of queries on partitioned tables.
@@ -545,17 +514,22 @@ ERROR: AnalysisException: couldn't resolve column reference: 'x_squared'
         </li>
 
         <li>
-          <p>
-            The <codeph>INSERT ... VALUES</codeph> syntax is suitable for setting up toy tables with a few rows for
-            functional testing, but because each such statement creates a separate tiny file in HDFS, it is not a
-            scalable technique for loading megabytes or gigabytes (let alone petabytes) of data. Consider revising
-            your data load process to produce raw data files outside of Impala, then setting up Impala external
-            tables or using the <codeph>LOAD DATA</codeph> statement to use those data files instantly in Impala
-            tables, with no conversion or indexing stage. See <xref href="impala_tables.xml#external_tables"/> and
-            <xref href="impala_load_data.xml#load_data"/> for details about the Impala techniques for working with
-            data files produced outside of Impala; see <xref href="impala_tutorial.xml#tutorial_etl"/> for examples
-            of ETL workflow for Impala.
-          </p>
+          <p> The <codeph>INSERT ... VALUES</codeph> syntax is suitable for
+            setting up toy tables with a few rows for functional testing when
+            used with HDFS. Each such statement creates a separate tiny file in
+            HDFS, and it is not a scalable technique for loading megabytes or
+            gigabytes (let alone petabytes) of data. </p>
+          <p>Consider revising your data load process to produce raw data files
+            outside of Impala, then setting up Impala external tables or using
+            the <codeph>LOAD DATA</codeph> statement to use those data files
+            instantly in Impala tables, with no conversion or indexing stage.
+            See <xref href="impala_tables.xml#external_tables"/> and <xref
+              href="impala_load_data.xml#load_data"/> for details about the
+            Impala techniques for working with data files produced outside of
+            Impala; see <xref href="impala_tutorial.xml#tutorial_etl"/> for
+            examples of ETL workflow for Impala. </p>
+          <p><codeph>INSERT</codeph> works fine for Kudu tables even though not
+            particularly fast.</p>
         </li>
 
         <li>
@@ -566,22 +540,18 @@ ERROR: AnalysisException: couldn't resolve column reference: 'x_squared'
             new table and reorganize into a more efficient layout in the same operation. See
             <xref href="impala_insert.xml#insert"/> for details about the <codeph>INSERT</codeph> statement.
           </p>
-          <p>
-            You can do <codeph>INSERT ... SELECT</codeph> into a table with a more efficient file format (see
-            <xref href="impala_file_formats.xml#file_formats"/>) or from an unpartitioned table into a partitioned
-            one (see <xref href="impala_partitioning.xml#partitioning"/>).
-          </p>
+          <p> You can do <codeph>INSERT ... SELECT</codeph> into a table with a
+            more efficient file format (see <xref
+              href="impala_file_formats.xml#file_formats"/>) or from an
+            unpartitioned table into a partitioned one. See <xref
+              href="impala_partitioning.xml#partitioning"/>. </p>
         </li>
 
         <li>
-          <p>
-            The number of expressions allowed in an Impala query might be smaller than for some other database
-            systems, causing failures for very complicated queries (typically produced by automated SQL
-            generators). Where practical, keep the number of expressions in the <codeph>WHERE</codeph> clauses to
-            approximately 2000 or fewer. As a workaround, set the query option
-            <codeph>DISABLE_CODEGEN=true</codeph> if queries fail for this reason. See
-            <xref href="impala_disable_codegen.xml#disable_codegen"/> for details.
-          </p>
+          <p> Complex queries may have high codegen time. As a workaround, set
+            the query option <codeph>DISABLE_CODEGEN=true</codeph> if queries
+            fail for this reason. See <xref
+              href="impala_disable_codegen.xml#disable_codegen"/> for details. </p>
         </li>
 
         <li>
@@ -600,42 +570,31 @@ ERROR: AnalysisException: couldn't resolve column reference: 'x_squared'
 
     <conbody>
 
-      <p>
-        Throughout this section, some of the decisions you make during the porting process also have a substantial
-        impact on performance. After your SQL code is ported and working correctly, doublecheck the
-        performance-related aspects of your schema design, physical layout, and queries to make sure that the
-        ported application is taking full advantage of Impala's parallelism, performance-related SQL features, and
-        integration with Hadoop components.
-      </p>
+      <p> Some of the decisions you make during the porting process can have an
+        impact on performance. After your SQL code is ported and working
+        correctly, examine the performance-related aspects of your schema
+        design, physical layout, and queries to make sure that the ported
+        application is taking full advantage of Impala's parallelism,
+        performance-related SQL features, and integration with Hadoop
+        components. The following are a few of the areas you should examine:</p>
 
       <ul>
-        <li>
-          Have you run the <codeph>COMPUTE STATS</codeph> statement on each table involved in join queries? Have
-          you also run <codeph>COMPUTE STATS</codeph> for each table used as the source table in an <codeph>INSERT
-          ... SELECT</codeph> or <codeph>CREATE TABLE AS SELECT</codeph> statement?
-        </li>
+        <li> For the optimal performance, we recommend that you run
+            <codeph>COMPUTE STATS</codeph> on all tables.</li>
 
-        <li>
-          Are you using the most efficient file format for your data volumes, table structure, and query
-          characteristics?
-        </li>
+        <li> Use the most efficient file format for your data volumes, table
+          structure, and query characteristics.</li>
 
-        <li>
-          Are you using partitioning effectively? That is, have you partitioned on columns that are often used for
-          filtering in <codeph>WHERE</codeph> clauses? Have you partitioned at the right granularity so that there
-          is enough data in each partition to parallelize the work for each query?
-        </li>
+        <li> Partition on columns that are often used for filtering in
+            <codeph>WHERE</codeph> clauses.</li>
 
-        <li>
-          Does your ETL process produce a relatively small number of multi-megabyte data files (good) rather than a
-          huge number of small files (bad)?
-        </li>
+        <li> Your ETL process should produce a relatively small number of
+          multi-megabyte data files rather than a huge number of small
+          files.</li>
       </ul>
 
-      <p>
-        See <xref href="impala_performance.xml#performance"/> for details about the whole performance tuning
-        process.
-      </p>
+      <p> See <xref href="impala_performance.xml#performance"/> for details
+        about the performance tuning process. </p>
     </conbody>
   </concept>
 </concept>


[2/2] impala git commit: IMPALA-7403: fix child batch mem mgmt in analytic

Posted by ta...@apache.org.
IMPALA-7403: fix child batch mem mgmt in analytic

The core of the fix is in ProcessChildBatches(), where we copy
'prev_input_tuple_' to 'prev_input_tuple_pool_' and reset
the child batch *before* the call to child(0)->GetNext(). This
solves a couple of problems:
* prev_input_tuple_ may be referencing memory from the child
  that had the needs_deep_copy() flag set and therefore will
  be freed or recycled when calling child(0)->GetNext() again.
* 'prev_child_batch_' may have been holding onto resources that
  the child had flushed, which means they need to be freed before
  the next GetNext() call.

Also refactors the logic around child_cmp_row_ to make the variable
lifetime and data flow clearer.

Testing:
Add regression test. The test passes both with this patch alone and with
IMPALA-7333 reapplied.

Change-Id: I09eb6213d47287f2addb72f8c1304085d2d48c55
Reviewed-on: http://gerrit.cloudera.org:8080/11155
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a6c35685
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a6c35685
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a6c35685

Branch: refs/heads/master
Commit: a6c356850bd4606f8506fa763d5f543229dce780
Parents: dd8a25d
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Aug 7 14:17:10 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 8 20:51:45 2018 +0000

----------------------------------------------------------------------
 be/src/exec/analytic-eval-node.cc               | 81 ++++++++++----------
 be/src/exec/analytic-eval-node.h                | 61 ++++++++-------
 be/src/exprs/scalar-expr-evaluator.cc           | 22 +++---
 be/src/exprs/scalar-expr-evaluator.h            | 22 +++---
 .../queries/QueryTest/analytic-fns-tpcds.test   | 46 ++++++++++-
 5 files changed, 137 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/a6c35685/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index c153b64..f51a5b5 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -53,7 +53,6 @@ AnalyticEvalNode::AnalyticEvalNode(
     rows_end_offset_(0),
     has_first_val_null_offset_(false),
     first_val_null_offset_(0),
-    child_tuple_cmp_row_(nullptr),
     last_result_idx_(-1),
     prev_pool_last_result_idx_(-1),
     prev_pool_last_window_idx_(-1),
@@ -161,6 +160,7 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) {
   curr_tuple_pool_.reset(new MemPool(mem_tracker()));
   prev_tuple_pool_.reset(new MemPool(mem_tracker()));
   mem_pool_.reset(new MemPool(mem_tracker()));
+  prev_input_tuple_pool_.reset(new MemPool(mem_tracker()));
   evaluation_timer_ = ADD_TIMER(runtime_profile(), "EvaluationTime");
 
   DCHECK_EQ(result_tuple_desc_->slots().size(), analytic_fns_.size());
@@ -222,13 +222,6 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
     RETURN_IF_ERROR(order_by_eq_expr_eval_->Open(state));
   }
 
-  if (buffered_tuple_desc_ != nullptr) {
-    // The backing mem_pool_ is freed in Reset(), so we need to allocate
-    // a new row every time we Open().
-    child_tuple_cmp_row_ = reinterpret_cast<TupleRow*>(
-        mem_pool_->Allocate(sizeof(Tuple*) * 2));
-  }
-
   // An intermediate tuple is only allocated once and is reused.
   curr_tuple_ = Tuple::Create(intermediate_tuple_desc_->byte_size(), mem_pool_.get());
   AggFnEvaluator::Init(analytic_fn_evals_, curr_tuple_);
@@ -241,8 +234,6 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
 
   // Initialize state for the first partition.
   RETURN_IF_ERROR(InitNextPartition(state, 0));
-  prev_child_batch_.reset(new RowBatch(child(0)->row_desc(), state->batch_size(),
-      mem_tracker()));
   curr_child_batch_.reset(new RowBatch(child(0)->row_desc(), state->batch_size(),
       mem_tracker()));
   return Status::OK();
@@ -403,22 +394,22 @@ Status AnalyticEvalNode::AddResultTuple(int64_t stream_idx) {
   return Status::OK();
 }
 
-inline Status AnalyticEvalNode::TryAddResultTupleForPrevRow(bool next_partition,
-    int64_t stream_idx, TupleRow* row) {
+inline Status AnalyticEvalNode::TryAddResultTupleForPrevRow(
+    const TupleRow* child_tuple_cmp_row, bool next_partition, int64_t stream_idx) {
   // The analytic fns are finalized after the previous row if we found a new partition
   // or the window is a RANGE and the order by exprs changed. For ROWS windows we do not
   // need to compare the current row to the previous row.
   VLOG_ROW << id() << " TryAddResultTupleForPrevRow partition=" << next_partition
            << " idx=" << stream_idx;
-  if (fn_scope_ != ROWS && (next_partition || (fn_scope_ == RANGE &&
-      window_.__isset.window_end && !PrevRowCompare(order_by_eq_expr_eval_)))) {
+  if (fn_scope_ != ROWS && (next_partition
+        || (fn_scope_ == RANGE && window_.__isset.window_end
+            && !PrevRowCompare(order_by_eq_expr_eval_, child_tuple_cmp_row)))) {
     RETURN_IF_ERROR(AddResultTuple(stream_idx - 1));
   }
   return Status::OK();
 }
 
-inline Status AnalyticEvalNode::TryAddResultTupleForCurrRow(int64_t stream_idx,
-    TupleRow* row) {
+inline Status AnalyticEvalNode::TryAddResultTupleForCurrRow(int64_t stream_idx) {
   VLOG_ROW << id() << " TryAddResultTupleForCurrRow idx=" << stream_idx;
   // We only add results at this point for ROWS windows (unless unbounded following)
   // Nothing to add if the end offset is before the start of the partition.
@@ -586,9 +577,10 @@ inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state,
   return Status::OK();
 }
 
-inline bool AnalyticEvalNode::PrevRowCompare(ScalarExprEvaluator* pred_eval) {
+inline bool AnalyticEvalNode::PrevRowCompare(
+   ScalarExprEvaluator* pred_eval, const TupleRow* child_tuple_cmp_row) {
   DCHECK(pred_eval != nullptr);
-  BooleanVal result = pred_eval->GetBooleanVal(child_tuple_cmp_row_);
+  BooleanVal result = pred_eval->GetBooleanVal(child_tuple_cmp_row);
   DCHECK(!result.is_null);
   return result.val;
 }
@@ -599,24 +591,31 @@ Status AnalyticEvalNode::ProcessChildBatches(RuntimeState* state) {
   // allows us to simplify the logic dealing with last_result_idx_ and result_tuples_.
   while (!input_eos_ && NumOutputRowsReady() < state->batch_size() + 1) {
     RETURN_IF_CANCELLED(state);
+    if (has_partition_or_order_by_expr_eval() && prev_input_tuple_ != nullptr
+        && curr_child_batch_->num_rows() > 0) {
+      // 'prev_input_tuple_' is from the last row in  'curr_child_batch_' and is needed
+      // by subsequent calls to ProcessChildBatch(). Deep copy it so that we can safely
+      // free the memory backing it. 'prev_input_tuple_pool_' is not backing the previous
+      // input tuple any more - it is from the last row in 'curr_child_batch_' and
+      // therefore backed by 'curr_child_batch_'.
+      prev_input_tuple_pool_->Clear();
+      prev_input_tuple_ = prev_input_tuple_->DeepCopy(
+          *child(0)->row_desc()->tuple_descriptors()[0], prev_input_tuple_pool_.get());
+    }
+    curr_child_batch_->Reset();
     RETURN_IF_ERROR(child(0)->GetNext(state, curr_child_batch_.get(), &input_eos_));
     RETURN_IF_ERROR(QueryMaintenance(state));
     RETURN_IF_ERROR(ProcessChildBatch(state));
     // TODO: DCHECK that the size of result_tuples_ is bounded. It shouldn't be larger
     // than 2x the batch size unless the end bound has an offset preceding, in which
     // case it may be slightly larger (proportional to the offset but still bounded).
-    prev_child_batch_->Reset();
-    prev_child_batch_.swap(curr_child_batch_);
-  }
-  if (input_eos_) {
-    curr_child_batch_.reset();
-    prev_child_batch_.reset();
   }
+  if (input_eos_) curr_child_batch_.reset();
   return Status::OK();
 }
 
 Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) {
-  // TODO: DCHECK input is sorted (even just first row vs prev_input_row_)
+  // TODO: DCHECK input is sorted (even just first row vs prev_input_tuple_)
   VLOG_FILE << id() << " ProcessChildBatch: " << DebugStateString()
             << " input batch size:" << curr_child_batch_->num_rows()
             << " tuple pool size:" << curr_tuple_pool_->total_allocated_bytes();
@@ -634,23 +633,24 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) {
   if (UNLIKELY(stream_idx == 0 && curr_child_batch_->num_rows() > 0)) {
     TupleRow* row = curr_child_batch_->GetRow(0);
     RETURN_IF_ERROR(AddRow(0, row));
-    RETURN_IF_ERROR(TryAddResultTupleForCurrRow(0, row));
-    prev_input_row_ = row;
+    RETURN_IF_ERROR(TryAddResultTupleForCurrRow(0));
+    prev_input_tuple_ = row->GetTuple(0);
     ++batch_idx;
     ++stream_idx;
   }
-
+  Tuple* child_tuple_cmp_row_tuples[2] = {nullptr, nullptr};
+  TupleRow* child_tuple_cmp_row =
+      reinterpret_cast<TupleRow*>(child_tuple_cmp_row_tuples);
   for (; batch_idx < curr_child_batch_->num_rows(); ++batch_idx, ++stream_idx) {
     TupleRow* row = curr_child_batch_->GetRow(batch_idx);
-    if (partition_by_eq_expr_eval_ != nullptr ||
-        order_by_eq_expr_eval_ != nullptr) {
-      // Only set the tuples in child_tuple_cmp_row_ if there are partition exprs or
+    if (has_partition_or_order_by_expr_eval()) {
+      // Only set the tuples in child_tuple_cmp_row if there are partition exprs or
       // order by exprs that require comparing the current and previous rows. If there
       // aren't partition or order by exprs (i.e. empty OVER() clause), there was no
       // sort and there could be nullable tuples (whereas the sort node does not produce
       // them), see IMPALA-1562.
-      child_tuple_cmp_row_->SetTuple(0, prev_input_row_->GetTuple(0));
-      child_tuple_cmp_row_->SetTuple(1, row->GetTuple(0));
+      child_tuple_cmp_row->SetTuple(0, prev_input_tuple_);
+      child_tuple_cmp_row->SetTuple(1, row->GetTuple(0));
     }
     TryRemoveRowsBeforeWindow(stream_idx);
 
@@ -667,16 +667,17 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) {
     bool next_partition = false;
     if (partition_by_eq_expr_eval_ != nullptr) {
       // partition_by_eq_expr_eval_ checks equality over the predicate exprs
-      next_partition = !PrevRowCompare(partition_by_eq_expr_eval_);
+      next_partition = !PrevRowCompare(partition_by_eq_expr_eval_, child_tuple_cmp_row);
     }
-    RETURN_IF_ERROR(TryAddResultTupleForPrevRow(next_partition, stream_idx, row));
+    RETURN_IF_ERROR(TryAddResultTupleForPrevRow(
+          child_tuple_cmp_row, next_partition, stream_idx));
     if (next_partition) RETURN_IF_ERROR(InitNextPartition(state, stream_idx));
 
     // The analytic_fn_evals_ are updated with the current row.
     RETURN_IF_ERROR(AddRow(stream_idx, row));
 
-    RETURN_IF_ERROR(TryAddResultTupleForCurrRow(stream_idx, row));
-    prev_input_row_ = row;
+    RETURN_IF_ERROR(TryAddResultTupleForCurrRow(stream_idx));
+    prev_input_tuple_ = row->GetTuple(0);
   }
 
   if (UNLIKELY(input_eos_ && stream_idx > curr_partition_idx_)) {
@@ -842,10 +843,8 @@ Status AnalyticEvalNode::Reset(RuntimeState* state) {
   DCHECK(input_stream_ == nullptr || input_stream_->is_closed());
   input_stream_.reset();
   curr_tuple_ = nullptr;
-  child_tuple_cmp_row_ = nullptr;
   dummy_result_tuple_ = nullptr;
-  prev_input_row_ = nullptr;
-  prev_child_batch_.reset();
+  prev_input_tuple_ = nullptr;
   curr_child_batch_.reset();
   return ExecNode::Reset(state);
 }
@@ -882,11 +881,11 @@ void AnalyticEvalNode::Close(RuntimeState* state) {
     if (order_by_eq_expr_eval_ != nullptr) order_by_eq_expr_eval_->Close(state);
     order_by_eq_expr_->Close();
   }
-  if (prev_child_batch_.get() != nullptr) prev_child_batch_.reset();
   if (curr_child_batch_.get() != nullptr) curr_child_batch_.reset();
   if (curr_tuple_pool_.get() != nullptr) curr_tuple_pool_->FreeAll();
   if (prev_tuple_pool_.get() != nullptr) prev_tuple_pool_->FreeAll();
   if (mem_pool_.get() != nullptr) mem_pool_->FreeAll();
+  if (prev_input_tuple_pool_.get() != nullptr) prev_input_tuple_pool_->FreeAll();
   ExecNode::Close(state);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/a6c35685/be/src/exec/analytic-eval-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index c7e7873..bf15ebf 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -18,6 +18,8 @@
 #ifndef IMPALA_EXEC_ANALYTIC_EVAL_NODE_H
 #define IMPALA_EXEC_ANALYTIC_EVAL_NODE_H
 
+#include <memory>
+
 #include "exec/exec-node.h"
 #include "runtime/buffered-tuple-stream.h"
 #include "runtime/tuple.h"
@@ -128,19 +130,20 @@ class AnalyticEvalNode : public ExecNode {
   /// Adds the row to the evaluators and the tuple stream.
   Status AddRow(int64_t stream_idx, TupleRow* row);
 
-  /// Determines if there is a window ending at the previous row, and if so, calls
-  /// AddResultTuple() with the index of the previous row in 'input_stream_'.
-  /// 'next_partition' indicates if the current row is the start of a new partition.
-  /// 'stream_idx' is the index of the current input row from 'input_stream_'.
-  /// Returns an error when memory limit is exceeded.
-  Status TryAddResultTupleForPrevRow(bool next_partition, int64_t stream_idx,
-      TupleRow* row);
+  /// Determines if there is a window ending at the previous row by evaluating
+  /// 'child_tuple_cmp_row', and if so, calls AddResultTuple() with the index
+  /// of the previous row in 'input_stream_'. 'next_partition' indicates if
+  /// the current row is the start of a new partition. 'stream_idx' is the
+  /// index of the current input row from 'input_stream_'.  Returns an error
+  /// when memory limit is exceeded.
+  Status TryAddResultTupleForPrevRow(
+      const TupleRow* child_tuple_cmp_row, bool next_partition, int64_t stream_idx);
 
   /// Determines if there is a window ending at the current row, and if so, calls
   /// AddResultTuple() with the index of the current row in 'input_stream_'.
   /// 'stream_idx' is the index of the current input row from 'input_stream_'.
   /// Returns an error when memory limit is exceeded.
-  Status TryAddResultTupleForCurrRow(int64_t stream_idx, TupleRow* row);
+  Status TryAddResultTupleForCurrRow(int64_t stream_idx);
 
   /// Adds additional result tuples at the end of a partition, e.g. if the end bound is
   /// FOLLOWING. partition_idx is the index into input_stream_ of the new partition,
@@ -172,10 +175,17 @@ class AnalyticEvalNode : public ExecNode {
   /// This is necessary to produce the default value (set by Init()).
   void ResetLeadFnSlots();
 
-  /// Evaluates the predicate pred_eval over child_tuple_cmp_row_, which is
-  /// a TupleRow* containing the previous row and the current row set during
-  /// ProcessChildBatch().
-  bool PrevRowCompare(ScalarExprEvaluator* pred_eval);
+  /// Evaluates the predicate pred_eval over child_tuple_cmp_row, which is
+  /// a TupleRow* containing the previous row and the current row.
+  bool PrevRowCompare(
+      ScalarExprEvaluator* pred_eval, const TupleRow* child_tuple_cmp_row);
+
+  /// Return true if there are partition or order by expression evaluators. These
+  /// evaluators are created in Prepare() if PARTITION BY or ORDER BY clauses exist
+  /// for the analytics.
+  bool has_partition_or_order_by_expr_eval() const {
+    return partition_by_eq_expr_eval_ != nullptr || order_by_eq_expr_eval_ != nullptr;
+  }
 
   /// Debug string containing current state. If 'detailed', per-row state is included.
   std::string DebugStateString(bool detailed) const;
@@ -252,11 +262,6 @@ class AnalyticEvalNode : public ExecNode {
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
-  /// TupleRow* composed of the first child tuple and the buffered tuple, used by
-  /// partition_by_eq_expr_eval_ and order_by_eq_expr_eval_. Set in Open() if
-  /// buffered_tuple_desc_ is not NULL, allocated from mem_pool_.
-  TupleRow* child_tuple_cmp_row_ = nullptr;
-
   /// Queue of tuples which are ready to be set in output rows, with the index into
   /// the input_stream_ stream of the last TupleRow that gets the Tuple, i.e. this is a
   /// sparse structure. For example, if result_tuples_ contains tuples with indexes x1 and
@@ -308,17 +313,17 @@ class AnalyticEvalNode : public ExecNode {
   /// Index of the row in input_stream_ at which the current partition started.
   int64_t curr_partition_idx_;
 
-  /// Previous input row used to compare partition boundaries and to determine when the
-  /// order-by expressions change.
-  TupleRow* prev_input_row_ = nullptr;
-
-  /// Current and previous input row batches from the child. RowBatches are allocated
-  /// once and reused. Previous input row batch owns prev_input_row_ between calls to
-  /// ProcessChildBatch(). The prev batch is Reset() after calling ProcessChildBatch()
-  /// and then swapped with the curr batch so the RowBatch owning prev_input_row_ is
-  /// stored in prev_child_batch_ for the next call to ProcessChildBatch().
-  boost::scoped_ptr<RowBatch> prev_child_batch_;
-  boost::scoped_ptr<RowBatch> curr_child_batch_;
+  /// Previous input tuple used to compare partition boundaries and to determine when the
+  /// order-by expressions change. We only need to store the first tuple of the row
+  /// because all the partitioning and ordering columns are in the first tuple. Initially
+  /// this points to the first tuple of the last row processed from 'curr_child_batch_',
+  /// but it is later deep copied into 'prev_input_tuple_pool_' before 'curr_child_batch_'
+  /// is reset.
+  Tuple* prev_input_tuple_ = nullptr;
+  std::unique_ptr<MemPool> prev_input_tuple_pool_;
+
+  /// Current input row batch from the child. Allocated once and reused.
+  std::unique_ptr<RowBatch> curr_child_batch_;
 
   /// Buffers input rows added in ProcessChildBatch() until enough rows are able to
   /// be returned by GetNextOutputBatch(), in which case row batches are returned from

http://git-wip-us.apache.org/repos/asf/impala/blob/a6c35685/be/src/exprs/scalar-expr-evaluator.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-expr-evaluator.cc b/be/src/exprs/scalar-expr-evaluator.cc
index 0cadcc2..342b8e1 100644
--- a/be/src/exprs/scalar-expr-evaluator.cc
+++ b/be/src/exprs/scalar-expr-evaluator.cc
@@ -364,47 +364,47 @@ void ScalarExprEvaluator::PrintValue(const TupleRow* row, stringstream* stream)
   RawValue::PrintValue(GetValue(row), root_.type(), output_scale_, stream);
 }
 
-BooleanVal ScalarExprEvaluator::GetBooleanVal(TupleRow* row) {
+BooleanVal ScalarExprEvaluator::GetBooleanVal(const TupleRow* row) {
   return root_.GetBooleanVal(this, row);
 }
 
-TinyIntVal ScalarExprEvaluator::GetTinyIntVal(TupleRow* row) {
+TinyIntVal ScalarExprEvaluator::GetTinyIntVal(const TupleRow* row) {
   return root_.GetTinyIntVal(this, row);
 }
 
-SmallIntVal ScalarExprEvaluator::GetSmallIntVal(TupleRow* row) {
+SmallIntVal ScalarExprEvaluator::GetSmallIntVal(const TupleRow* row) {
   return root_.GetSmallIntVal(this, row);
 }
 
-IntVal ScalarExprEvaluator::GetIntVal(TupleRow* row) {
+IntVal ScalarExprEvaluator::GetIntVal(const TupleRow* row) {
   return root_.GetIntVal(this, row);
 }
 
-BigIntVal ScalarExprEvaluator::GetBigIntVal(TupleRow* row) {
+BigIntVal ScalarExprEvaluator::GetBigIntVal(const TupleRow* row) {
   return root_.GetBigIntVal(this, row);
 }
 
-FloatVal ScalarExprEvaluator::GetFloatVal(TupleRow* row) {
+FloatVal ScalarExprEvaluator::GetFloatVal(const TupleRow* row) {
   return root_.GetFloatVal(this, row);
 }
 
-DoubleVal ScalarExprEvaluator::GetDoubleVal(TupleRow* row) {
+DoubleVal ScalarExprEvaluator::GetDoubleVal(const TupleRow* row) {
   return root_.GetDoubleVal(this, row);
 }
 
-StringVal ScalarExprEvaluator::GetStringVal(TupleRow* row) {
+StringVal ScalarExprEvaluator::GetStringVal(const TupleRow* row) {
   return root_.GetStringVal(this, row);
 }
 
-CollectionVal ScalarExprEvaluator::GetCollectionVal(TupleRow* row) {
+CollectionVal ScalarExprEvaluator::GetCollectionVal(const TupleRow* row) {
   return root_.GetCollectionVal(this, row);
 }
 
-TimestampVal ScalarExprEvaluator::GetTimestampVal(TupleRow* row) {
+TimestampVal ScalarExprEvaluator::GetTimestampVal(const TupleRow* row) {
   return root_.GetTimestampVal(this, row);
 }
 
-DecimalVal ScalarExprEvaluator::GetDecimalVal(TupleRow* row) {
+DecimalVal ScalarExprEvaluator::GetDecimalVal(const TupleRow* row) {
   return root_.GetDecimalVal(this, row);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/a6c35685/be/src/exprs/scalar-expr-evaluator.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-expr-evaluator.h b/be/src/exprs/scalar-expr-evaluator.h
index 9a27951..ba8cf17 100644
--- a/be/src/exprs/scalar-expr-evaluator.h
+++ b/be/src/exprs/scalar-expr-evaluator.h
@@ -151,17 +151,17 @@ class ScalarExprEvaluator {
 
   /// Evaluates the expression of this evaluator on tuple row 'row' and returns
   /// the results. One function for each data type implemented.
-  BooleanVal GetBooleanVal(TupleRow* row);
-  TinyIntVal GetTinyIntVal(TupleRow* row);
-  SmallIntVal GetSmallIntVal(TupleRow* row);
-  IntVal GetIntVal(TupleRow* row);
-  BigIntVal GetBigIntVal(TupleRow* row);
-  FloatVal GetFloatVal(TupleRow* row);
-  DoubleVal GetDoubleVal(TupleRow* row);
-  StringVal GetStringVal(TupleRow* row);
-  CollectionVal GetCollectionVal(TupleRow* row);
-  TimestampVal GetTimestampVal(TupleRow* row);
-  DecimalVal GetDecimalVal(TupleRow* row);
+  BooleanVal GetBooleanVal(const TupleRow* row);
+  TinyIntVal GetTinyIntVal(const TupleRow* row);
+  SmallIntVal GetSmallIntVal(const TupleRow* row);
+  IntVal GetIntVal(const TupleRow* row);
+  BigIntVal GetBigIntVal(const TupleRow* row);
+  FloatVal GetFloatVal(const TupleRow* row);
+  DoubleVal GetDoubleVal(const TupleRow* row);
+  StringVal GetStringVal(const TupleRow* row);
+  CollectionVal GetCollectionVal(const TupleRow* row);
+  TimestampVal GetTimestampVal(const TupleRow* row);
+  DecimalVal GetDecimalVal(const TupleRow* row);
 
   /// Returns an error status if there was any error in evaluating the expression
   /// or its sub-expressions. 'start_idx' and 'end_idx' correspond to the range

http://git-wip-us.apache.org/repos/asf/impala/blob/a6c35685/testdata/workloads/functional-query/queries/QueryTest/analytic-fns-tpcds.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns-tpcds.test b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns-tpcds.test
index 9d98dec..5a440c6 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns-tpcds.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns-tpcds.test
@@ -1,6 +1,6 @@
 ====
 ---- QUERY
-# Analyic function with no partition
+# Analytic function with no partition
 select i_item_sk, i_current_price,
    SUM (i_current_price)
   OVER (ORDER BY i_item_sk) running_total
@@ -72,7 +72,7 @@ limit 10
 BIGINT, STRING, DECIMAL, DECIMAL
 ====
 ---- QUERY
-# Nested analyic functions
+# Nested analytic functions
 select i_item_sk, i_brand, running_total,
    SUM (running_total)
   OVER (partition by i_manufact_id ORDER BY running_total) running_total2
@@ -104,7 +104,7 @@ limit 10
 BIGINT, STRING, DECIMAL, DECIMAL
 ====
 ---- QUERY
-# 2 analyic functions on different partition and order by columns
+# 2 analytic functions on different partition and order by columns
 select i_item_sk, i_brand, i_current_price, i_manufact_id,
    SUM (i_current_price)
   OVER (partition by i_brand ORDER BY i_current_price) running_total,
@@ -173,4 +173,42 @@ from (
 2815.26
 ---- TYPES
 DECIMAL
-====
\ No newline at end of file
+====
+---- QUERY
+# Regression test for IMPALA-7403 that reproduces a memory management bug.
+# The plan has two analytic operators with no intervening sort. A low NDV
+# sort key seems to help with reproducing the bug.
+SELECT c_birth_country, a, b, count(*)
+FROM (
+  SELECT c_birth_country,
+         LAG(c_birth_country) OVER (ORDER BY c_birth_country) a,
+         MIN(c_birth_country) OVER (ORDER BY c_birth_country) b
+  FROM customer
+) v
+GROUP BY c_birth_country, a, b
+ORDER BY count(*), c_birth_country, a, b
+LIMIT 20
+---- RESULTS
+'AFGHANISTAN','NULL','AFGHANISTAN',1
+'ALAND ISLANDS','AFGHANISTAN','AFGHANISTAN',1
+'ALBANIA','ALAND ISLANDS','AFGHANISTAN',1
+'ALGERIA','ALBANIA','AFGHANISTAN',1
+'AMERICAN SAMOA','ALGERIA','AFGHANISTAN',1
+'ANDORRA','AMERICAN SAMOA','AFGHANISTAN',1
+'ANGOLA','ANDORRA','AFGHANISTAN',1
+'ANGUILLA','ANGOLA','AFGHANISTAN',1
+'ANTARCTICA','ANGUILLA','AFGHANISTAN',1
+'ANTIGUA AND BARBUDA','ANTARCTICA','AFGHANISTAN',1
+'ARGENTINA','ANTIGUA AND BARBUDA','AFGHANISTAN',1
+'ARMENIA','ARGENTINA','AFGHANISTAN',1
+'ARUBA','ARMENIA','AFGHANISTAN',1
+'AUSTRALIA','ARUBA','AFGHANISTAN',1
+'AUSTRIA','AUSTRALIA','AFGHANISTAN',1
+'AZERBAIJAN','AUSTRIA','AFGHANISTAN',1
+'BAHAMAS','AZERBAIJAN','AFGHANISTAN',1
+'BAHRAIN','BAHAMAS','AFGHANISTAN',1
+'BANGLADESH','BAHRAIN','AFGHANISTAN',1
+'BARBADOS','BANGLADESH','AFGHANISTAN',1
+---- TYPES
+STRING,STRING,STRING,BIGINT
+====