You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/02/06 21:11:03 UTC

[1/7] incubator-impala git commit: [DOCS] Genericize HBase page

Repository: incubator-impala
Updated Branches:
  refs/heads/master 1d933919e -> 6a9df5409


[DOCS] Genericize HBase page

2x links that point to archive.cloudera.com mirror of HBase
docs, point to Apache HBase doc site instead.

Remove a little Cloudera-specific verbiage, esp. in banners of
hive / impala-shell output.

Change-Id: I1b832d23be7182d88c99ee169976ad9aeed746b1
Reviewed-on: http://gerrit.cloudera.org:8080/5900
Reviewed-by: Ambreen Kazi <am...@cloudera.com>
Reviewed-by: John Russell <jr...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7704b64d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7704b64d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7704b64d

Branch: refs/heads/master
Commit: 7704b64df0bce05a73b33825c404498ef9fbba48
Parents: 1d93391
Author: John Russell <jr...@cloudera.com>
Authored: Fri Feb 3 14:57:08 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Feb 4 00:23:19 2017 +0000

----------------------------------------------------------------------
 docs/impala_keydefs.ditamap  | 12 +++++++++---
 docs/topics/impala_hbase.xml | 25 ++++++-------------------
 2 files changed, 15 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7704b64d/docs/impala_keydefs.ditamap
----------------------------------------------------------------------
diff --git a/docs/impala_keydefs.ditamap b/docs/impala_keydefs.ditamap
index da62dd1..232d3d1 100644
--- a/docs/impala_keydefs.ditamap
+++ b/docs/impala_keydefs.ditamap
@@ -26,6 +26,15 @@ under the License.
   <keydef keys="support_org"><topicmeta><keywords><keyword>Cloudera Support</keyword></keywords></topicmeta></keydef>
 
 <!-- URLs used in external links. Many, perhaps most, to be turned into keydefs to genericize and optionally re-point. -->
+
+  <keydef keys="upstream_hbase_docs" href="https://hbase.apache.org/book.html" scope="external" format="html">
+    <topicmeta><linktext>the Apache HBase documentation</linktext></topicmeta>
+  </keydef>
+
+  <keydef keys="upstream_hbase_security_docs" href="https://hbase.apache.org/book.html#security" scope="external" format="html">
+    <topicmeta><linktext>the Security chapter in the Apache HBase documentation</linktext></topicmeta>
+  </keydef>
+
 <!--
 http://aws.amazon.com/cli/
 http://aws.amazon.com/s3/
@@ -38,7 +47,6 @@ http://google-glog.googlecode.com/svn/trunk/doc/glog.html
 http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/FairScheduler.html#Configuration
 http://hadoop.apache.org/docs/r0.19.0/distcp.html
 http://haproxy.1wt.eu/
-http://hbase.apache.org/book/ch08s04.html
 http://jdbc.postgresql.org/download.html
 http://radar.oreilly.com/2013/10/cloudera-impala-bringing-the-sql-and-hadoop-worlds-together.html
 http://s3tools.org/s3cmd
@@ -69,10 +77,8 @@ http://www.tpc.org/tpcds/
 https://archive.cloudera.com/cdh/3/hadoop/hdfs_permissions_guide.html
 https://archive.cloudera.com/cdh4/cdh/4/hadoop/hadoop-project-dist/hadoop-common/FileSystemShell.html
 https://archive.cloudera.com/cdh4/cdh/4/hadoop/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html
-https://archive.cloudera.com/cdh4/cdh/4/hbase/
 https://archive.cloudera.com/cdh5/
 https://archive.cloudera.com/cdh5/cdh/5/hadoop/hadoop-yarn/hadoop-yarn-site/FairScheduler.html#Configuration
-https://archive.cloudera.com/cdh5/cdh/5/hbase/
 https://archive.cloudera.com/gplextras/debian/squeeze/amd64/gplextras/cloudera.list
 https://archive.cloudera.com/gplextras/redhat/5/x86_64/gplextras/
 https://archive.cloudera.com/gplextras/redhat/5/x86_64/gplextras/cloudera-gplextras4.repo

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7704b64d/docs/topics/impala_hbase.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_hbase.xml b/docs/topics/impala_hbase.xml
index 429d828..0068e48 100644
--- a/docs/topics/impala_hbase.xml
+++ b/docs/topics/impala_hbase.xml
@@ -51,11 +51,7 @@ under the License.
     </p>
 
     <p>
-      For background information on HBase, see the snapshot of the Apache HBase site (including documentation) for
-      the level of HBase that comes with
-      <xref href="https://archive.cloudera.com/cdh5/cdh/5/hbase/" scope="external" format="html">CDH 5</xref>. To
-      install HBase on a CDH cluster, see the installation instructions for
-      <xref href="http://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_hbase_installation.html" scope="external" format="html">CDH 5</xref>.
+      For background information on HBase, see <xref keyref="upstream_hbase_docs"/>.
     </p>
 
     <p outputclass="toc inpage"/>
@@ -123,9 +119,7 @@ under the License.
       <p>
         To work with an HBase table from Impala, ensure that the <codeph>impala</codeph> user has read/write
         privileges for the HBase table, using the <codeph>GRANT</codeph> command in the HBase shell. For details
-        about HBase security, see the
-        <xref href="http://hbase.apache.org/book/ch08s04.html" format="html" scope="external">Security chapter in
-        the HBase Reference Guide</xref>.
+        about HBase security, see <xref keyref="upstream_hbase_security_docs"/>.
       </p>
     </conbody>
   </concept>
@@ -148,8 +142,8 @@ under the License.
 
       <p>
         To avoid delays if HBase is unavailable during Impala startup or after an <codeph>INVALIDATE
-        METADATA</codeph> statement, <ph rev="upstream">Cloudera</ph> recommends setting timeout values as follows in
-        <filepath>/etc/impala/conf/hbase-site.xml</filepath> (for environments not managed by Cloudera Manager):
+        METADATA</codeph> statement, set timeout values similar to the following in
+        <filepath>/etc/impala/conf/hbase-site.xml</filepath>:
       </p>
 
 <codeblock>&lt;property&gt;
@@ -832,8 +826,7 @@ hbase(main):006:0> quit
         </note>
 
 <codeblock>$ hive
-Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties
-Hive history file=/tmp/cloudera/hive_job_log_cloudera_201502101610_1980712808.txt
+...
 hive> use hbase;
 OK
 Time taken: 4.095 seconds
@@ -876,12 +869,7 @@ hive> quit;
 <codeblock>$ impala-shell -i localhost -d hbase
 Starting Impala Shell without Kerberos authentication
 Connected to localhost:21000
-Server version: impalad version 2.1.0-cdh4 RELEASE (build d520a9cdea2fc97e8d5da9fbb0244e60ee416bfa)
-Welcome to the Impala shell. Press TAB twice to see a list of available commands.
-
-Copyright (c) 2012 Cloudera, Inc. All rights reserved.
-
-(Shell build version: Impala Shell v2.1.0-cdh4 (d520a9c) built on Mon Dec  8 21:41:17 PST 2014)
+...
 Query: use `hbase`
 [localhost:21000] > invalidate metadata hbasestringids;
 Fetched 0 row(s) in 0.09s
@@ -916,7 +904,6 @@ Fetched 1 row(s) in 0.54s
 </codeblock>
 
         <note conref="../shared/impala_common.xml#common/invalidate_metadata_hbase"/>
-<!--      </section> -->
     </conbody>
   </concept>
 </concept>


[5/7] incubator-impala git commit: IMPALA-4359: qgen: add UPSERT support

Posted by ta...@apache.org.
IMPALA-4359: qgen: add UPSERT support

UPSERTs are very similar to INSERTs, so the UPSERT support is simply
folded into that of INSERT. We do this by adding another "conflict
action", CONFLICT_ACTION_UPDATE. The object responsible for holding the
conflict_action attribute is now the InsertClause. This is needed here
because the SqlWriter now needs to know the conflict_action both when
writing the InsertClause (Impala) and at the tail end of the
InsertStatement (PostgreSQL). We also add a few properties to the
InsertStatement interface so that the PostgresqlSqlWriter can form the
correct "DO UPDATE" conflic action, in which primary key columns and
updatable columns must be known. More information on that here:

https://www.postgresql.org/docs/9.5/static/sql-insert.html

By default, we will tend to generate 3 UPSERTs for every 1 INSERT.

In addition to adding unit tests to make sure UPSERTs are properly
written, I used discrepancy_searcher.py --profile dmlonly, both with and
without --explain-only, do run tests. I made sure we were generating
syntactically valid UPSERT statements, and that the INSERT/UPSERT ratio
was roughly 1/3 after 100 statements.

Change-Id: I6382f6ab22ba29c117e39a5d90592d3637df4b25
Reviewed-on: http://gerrit.cloudera.org:8080/5795
Reviewed-by: Taras Bobrovytsky <tb...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0154ace6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0154ace6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0154ace6

Branch: refs/heads/master
Commit: 0154ace61feff65917388dd08591cb9d0d4369ed
Parents: 39987d9
Author: Michael Brown <mi...@cloudera.com>
Authored: Mon Jan 23 10:55:47 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Feb 4 06:13:34 2017 +0000

----------------------------------------------------------------------
 tests/comparison/discrepancy_searcher.py        |  18 +--
 tests/comparison/model_translator.py            |  29 +++-
 tests/comparison/query.py                       | 104 +++++++++----
 tests/comparison/query_profile.py               |  25 ++-
 tests/comparison/statement_generator.py         |  35 +++--
 tests/comparison/tests/query_object_testdata.py | 155 ++++++++++++++++++-
 6 files changed, 296 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0154ace6/tests/comparison/discrepancy_searcher.py
----------------------------------------------------------------------
diff --git a/tests/comparison/discrepancy_searcher.py b/tests/comparison/discrepancy_searcher.py
index 4b0bce9..6534e4d 100755
--- a/tests/comparison/discrepancy_searcher.py
+++ b/tests/comparison/discrepancy_searcher.py
@@ -660,13 +660,13 @@ class QueryResultDiffSearcher(object):
     copy_select_query.from_clause = FromClause(src_table)
 
     if new_table.primary_keys:
-      conflict_action = InsertStatement.CONFLICT_ACTION_IGNORE
+      conflict_action = InsertClause.CONFLICT_ACTION_IGNORE
     else:
-      conflict_action = InsertStatement.CONFLICT_ACTION_DEFAULT
+      conflict_action = InsertClause.CONFLICT_ACTION_DEFAULT
 
     table_copy_statement = InsertStatement(
-        insert_clause=InsertClause(new_table), select_query=copy_select_query,
-        conflict_action=conflict_action, execution=StatementExecutionMode.DML_SETUP)
+        insert_clause=InsertClause(new_table, conflict_action=conflict_action),
+        select_query=copy_select_query, execution=StatementExecutionMode.DML_SETUP)
 
     result = self.query_result_comparator.compare_query_results(table_copy_statement)
     if result.error:
@@ -703,15 +703,15 @@ class QueryResultDiffSearcher(object):
       dml_table = None
       if issubclass(statement_type, (InsertStatement,)):
         dml_choice_src_table = self.query_profile.choose_table(self.common_tables)
-        # Copy the table we want to INSERT INTO. Do this for the following reasons:
+        # Copy the table we want to INSERT/UPSERT INTO. Do this for the following reasons:
         #
         # 1. If we don't copy, the tables will get larger and larger
         # 2. If we want to avoid tables getting larger and larger, we have to come up
         # with some threshold about when to cut and start over.
-        # 3. If we keep INSERTing into tables and finally find a crash, we have to
-        # replay all previous INSERTs again. Those INSERTs may not produce the same rows
-        # as before. To maximize the chance of bug reproduction, run every INSERT on a
-        # pristine table.
+        # 3. If we keep INSERT/UPSERTing into tables and finally find a crash, we have to
+        # replay all previous INSERT/UPSERTs again. Those INSERTs may not produce the
+        # same rows as before. To maximize the chance of bug reproduction, run every
+        # INSERT/UPSERT on a pristine table.
         dml_table = self._concurrently_copy_table(dml_choice_src_table)
       statement = statement_generator.generate_statement(
           self.common_tables, dml_table=dml_table)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0154ace6/tests/comparison/model_translator.py
----------------------------------------------------------------------
diff --git a/tests/comparison/model_translator.py b/tests/comparison/model_translator.py
index ef87bed..7064d5c 100644
--- a/tests/comparison/model_translator.py
+++ b/tests/comparison/model_translator.py
@@ -31,7 +31,7 @@ from tests.comparison.db_types import (
     Timestamp,
     TinyInt,
     VarChar)
-from tests.comparison.query import InsertStatement, Query
+from tests.comparison.query import InsertClause, Query
 from tests.comparison.query_flattener import QueryFlattener
 
 LOG = getLogger(__name__)
@@ -528,6 +528,20 @@ class ImpalaSqlWriter(SqlWriter):
       result = 'TRIM(%s)' % result
     return result
 
+  def _write_insert_clause(self, insert_clause):
+    sql = super(ImpalaSqlWriter, self)._write_insert_clause(insert_clause)
+    if insert_clause.conflict_action == InsertClause.CONFLICT_ACTION_UPDATE:
+      # The value of sql at this point would be something like:
+      #
+      # INSERT INTO <table name> [(column list)]
+      #
+      # If it happens that the table name or column list contains the text INSERT in an
+      # identifier, we want to ensure that the replace() call below does not alter their
+      # names but instead only modifiers the INSERT keyword to UPSERT.
+      return sql.replace('INSERT', 'UPSERT', 1)
+    else:
+      return sql
+
 
 class OracleSqlWriter(SqlWriter):
 
@@ -643,10 +657,19 @@ class PostgresqlSqlWriter(SqlWriter):
 
   def _write_insert_statement(self, insert_statement):
     sql = SqlWriter._write_insert_statement(self, insert_statement)
-    if insert_statement.conflict_action == InsertStatement.CONFLICT_ACTION_DEFAULT:
+    if insert_statement.conflict_action == InsertClause.CONFLICT_ACTION_DEFAULT:
       pass
-    elif insert_statement.conflict_action == InsertStatement.CONFLICT_ACTION_IGNORE:
+    elif insert_statement.conflict_action == InsertClause.CONFLICT_ACTION_IGNORE:
       sql += '\nON CONFLICT DO NOTHING'
+    elif insert_statement.conflict_action == InsertClause.CONFLICT_ACTION_UPDATE:
+      if insert_statement.updatable_column_names:
+        primary_keys = insert_statement.primary_key_string
+        columns = ',\n'.join('{name} = EXCLUDED.{name}'.format(name=name) for name in
+                             insert_statement.updatable_column_names)
+        sql += '\nON CONFLICT {primary_keys}\nDO UPDATE SET\n{columns}'.format(
+            primary_keys=primary_keys, columns=columns)
+      else:
+        sql += '\nON CONFLICT DO NOTHING'
     else:
       raise Exception('InsertStatement has unsupported conflict_action: {0}'.format(
           insert_statement.conflict_action))

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0154ace6/tests/comparison/query.py
----------------------------------------------------------------------
diff --git a/tests/comparison/query.py b/tests/comparison/query.py
index 022dd9c..f7bd593 100644
--- a/tests/comparison/query.py
+++ b/tests/comparison/query.py
@@ -702,19 +702,69 @@ class LimitClause(object):
 
 class InsertClause(object):
 
-  def __init__(self, table, column_list=None):
+  # This enum represents possibilities for different types of INSERTs. A user of this
+  # object, like StatementGenerator, is responsible for setting the conflict_action
+  # value appropriately. These values are valid for the conflict_action parameter.
+  # Because an InsertStatement is a single piece of data shared across multiple SQL
+  # dialects, this setting can alter the written SQL in multiple dialects.
+  #
+  # CONLICT_ACTION_DEFAULT
+  #
+  # For Impala, this is a statement like INSERT INTO hdfs_table SELECT * FROM foo
+  # For PostgreSQL, this is a statement like INSERT INTO hdfs_table SELECT * FROM foo
+  #
+  # Example uses cases: inserting into tables that do not have primary keys, or
+  # inserting into PostgreSQL tables where you want to error if there are attempts to
+  # insert duplicate primary keys
+  #
+  # CONFLICT_ACTION_IGNORE
+  #
+  # For Impala, this is a statement like INSERT INTO kudu_table SELECT * FROM foo
+  # For PostgreSQL, this is a statement like INSERT INTO kudu_table SELECT * FROM foo
+  #                                          ON CONFLICT DO NOTHING
+  #
+  # Example use case: inserting into Kudu tables, where attempts to insert duplicate
+  # primary key rows are ignored by Impala, so they must also be ignored by PostgreSQL.
+  # Note that the *syntax* for INSERT doesn't change with Impala, but because it's a
+  # Kudu table, the behavior differs.
+  #
+  # CONFLICT_ACTION_UPDATE
+  #
+  # For Impala, this is a statement like UPSERT INTO kudu_table SELECT * FROM foo
+  # For PostgreSQL, this is a statement like INSERT INTO kudu_table SELECT * FROM foo
+  #                                          ON CONFLICT DO UPDATE SET
+  #                                          (col1 = EXCLUDED.col1, ...)
+  #
+  # Example use case: upserting into Kudu tables, where attempts to insert duplicate
+  # primary key rows will either insert a single row, or update a single row already
+  # there, without error. In PostgreSQL, UPSERT is written via this "ON CONFLICT DO
+  # UPDATE" clause.
+  #
+  # More on PostgreSQL INSERT/UPSERT syntax here:
+  # https://www.postgresql.org/docs/9.5/static/sql-insert.html
+
+  (CONFLICT_ACTION_DEFAULT,
+   CONFLICT_ACTION_IGNORE,
+   CONFLICT_ACTION_UPDATE) = range(3)
+
+  def __init__(self, table, column_list=None, conflict_action=CONFLICT_ACTION_DEFAULT):
     """
-    Represent an INSERT clause, which is the first half of an INSERT statement. The
-    table is a Table object.
+    Represent an INSERT/UPSERT clause, which is the first half of an INSERT/UPSERT
+    statement. Note that UPSERTs are very similar to INSERTs, so this data structure can
+    easily deal with both.
 
-    column_list is an optional list, tuple, or other sequence of
-    tests.comparison.common.Column objects.
+    The table is a Table object.
 
-    In an INSERT statement, it's a sequence of column names. See
+    column_list is an optional list, tuple, or other sequence of
+    tests.comparison.common.Column objects. In an Impala INSERT/UPSERT SQL statement,
+    it's a sequence of column names. See
     http://www.cloudera.com/documentation/enterprise/latest/topics/impala_insert.html
+
+    conflict_action takes in one of the CONFLICT_ACTION_* class attributes. See above.
     """
     self.table = table
     self.column_list = column_list
+    self.conflict_action = conflict_action
 
 
 class ValuesRow(object):
@@ -728,32 +778,22 @@ class ValuesRow(object):
 class ValuesClause(object):
   def __init__(self, values_rows):
     """
-    Represent the VALUES clause of an INSERT statement. The values_rows is a sequence of
-    ValuesRow objects.
+    Represent the VALUES clause of an INSERT/UPSERT statement. The values_rows is a
+    sequence of ValuesRow objects.
     """
     self.values_rows = values_rows
 
 
 class InsertStatement(AbstractStatement):
 
-  (CONFLICT_ACTION_DEFAULT,
-   CONFLICT_ACTION_IGNORE) = range(2)
-
   def __init__(self, with_clause=None, insert_clause=None, select_query=None,
-               values_clause=None, conflict_action=CONFLICT_ACTION_DEFAULT,
-               execution=None):
+               values_clause=None, execution=None):
     """
-    Represent an INSERT statement. The INSERT may have an optional WithClause, and then
-    either a SELECT query (Query) object from whose rows we INSERT, or a VALUES clause,
-    but not both.
+    Represent an INSERT/UPSERT statement. Note that UPSERTs are very similar to INSERTs,
+    so this data structure can easily deal with both.
 
-    conflict_action takes in one of the CONFLICT_ACTION_* class attributes. On INSERT if
-    the conflict_action is CONFLICT_ACTION_DEFAULT, we write standard INSERT queries.
-
-    If CONFLICT_ACTION_IGNORE is chosen instead, PostgreSQL INSERTs will use "ON
-    CONFLICT DO NOTHING". The syntax doesn't change for Impala, but the implied
-    semantics are needed: if we are INSERTing a Kudu table, conflict_action must be
-    CONFLICT_ACTION_IGNORE.
+    The INSERT/UPSERT may have an optional WithClause, and then either a SELECT query
+    (Query) object from whose rows we INSERT, or a VALUES clause, but not both.
 
     The execution attribute is used by the discrepancy_searcher to track whether this
     InsertStatement is some sort of setup operation or a true random statement test.
@@ -766,7 +806,6 @@ class InsertStatement(AbstractStatement):
     self.values_clause = values_clause
     self.with_clause = with_clause
     self.insert_clause = insert_clause
-    self.conflict_action = conflict_action
 
   @property
   def select_query(self):
@@ -777,7 +816,7 @@ class InsertStatement(AbstractStatement):
     if self.values_clause is None or select_query is None:
       self._select_query = select_query
     else:
-      raise Exception('An INSERT statement may not have both the select_query and '
+      raise Exception('An INSERT/UPSERT statement may not have both the select_query and '
                       'values_clause set: {select}; {values}'.format(
                           select=select_query, values=self.values_clause))
 
@@ -790,7 +829,7 @@ class InsertStatement(AbstractStatement):
     if self.select_query is None or values_clause is None:
       self._values_clause = values_clause
     else:
-      raise Exception('An INSERT statement may not have both the select_query and '
+      raise Exception('An INSERT/UPSERT statement may not have both the select_query and '
                       'values_clause set: {select}; {values}'.format(
                           select=self.select_query, values=values_clause))
 
@@ -815,3 +854,16 @@ class InsertStatement(AbstractStatement):
   @property
   def dml_table(self):
     return self.insert_clause.table
+
+  @property
+  def conflict_action(self):
+    return self.insert_clause.conflict_action
+
+  @property
+  def primary_key_string(self):
+    return '({primary_key_list})'.format(
+        primary_key_list=', '.join(self.insert_clause.table.primary_key_names))
+
+  @property
+  def updatable_column_names(self):
+    return self.insert_clause.table.updatable_column_names

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0154ace6/tests/comparison/query_profile.py
----------------------------------------------------------------------
diff --git a/tests/comparison/query_profile.py b/tests/comparison/query_profile.py
index 06b8815..81a98a1 100644
--- a/tests/comparison/query_profile.py
+++ b/tests/comparison/query_profile.py
@@ -27,6 +27,7 @@ from tests.comparison.db_types import (
     TYPES,
     Timestamp)
 from tests.comparison.query import (
+    InsertClause,
     InsertStatement,
     Query,
     StatementExecutionMode,
@@ -211,7 +212,10 @@ class DefaultProfile(object):
             'none': 1},
         'VALUES_ITEM_EXPR': {
             'constant': 1,
-            'function': 2}}
+            'function': 2},
+        'INSERT_UPSERT': {
+            InsertClause.CONFLICT_ACTION_IGNORE: 1,
+            InsertClause.CONFLICT_ACTION_UPDATE: 3}}
 
     # On/off switches
     self._flags = {
@@ -631,16 +635,16 @@ class DefaultProfile(object):
 
   def choose_insert_source_clause(self):
     """
-    Returns whether we generate an INSERT SELECT or an INSERT VALUES
+    Returns whether we generate an INSERT/UPSERT SELECT or an INSERT/UPSERT VALUES
     """
     return self._choose_from_weights('INSERT_SOURCE_CLAUSE')
 
   def choose_insert_column_list(self, table):
     """
-    Decide whether or not an INSERT will be in the form of:
-    INSERT INTO table SELECT|VALUES ...
+    Decide whether or not an INSERT/UPSERT will be in the form of:
+    INSERT/UPSERT INTO table SELECT|VALUES ...
     or
-    INSERT INTO table (col1, col2, ...) SELECT|VALUES ...
+    INSERT/UPSERT INTO table (col1, col2, ...) SELECT|VALUES ...
     If the second form, the column list is shuffled. The column list will always contain
     the primary key columns and between 0 and all additional columns.
     """
@@ -649,7 +653,8 @@ class DefaultProfile(object):
       min_additional_insert_cols = 0 if columns_to_insert else 1
       remaining_columns = [col for col in table.cols if not col.is_primary_key]
       shuffle(remaining_columns)
-      additional_column_count = randint(min_additional_insert_cols, len(remaining_columns))
+      additional_column_count = randint(min_additional_insert_cols,
+                                        len(remaining_columns))
       columns_to_insert.extend(remaining_columns[:additional_column_count])
       shuffle(columns_to_insert)
       return columns_to_insert
@@ -658,7 +663,7 @@ class DefaultProfile(object):
 
   def choose_insert_values_row_count(self):
     """
-    Choose the number of rows to insert in an INSERT VALUES
+    Choose the number of rows to insert in an INSERT/UPSERT VALUES
     """
     return self._choose_from_bounds('INSERT_VALUES_ROWS')
 
@@ -669,6 +674,12 @@ class DefaultProfile(object):
     """
     return self._choose_from_weights('VALUES_ITEM_EXPR')
 
+  def choose_insert_vs_upsert(self):
+    """
+    Choose whether a particular insertion-type statement will be INSERT or UPSERT.
+    """
+    return self._choose_from_weights('INSERT_UPSERT')
+
 
 class ImpalaNestedTypesProfile(DefaultProfile):
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0154ace6/tests/comparison/statement_generator.py
----------------------------------------------------------------------
diff --git a/tests/comparison/statement_generator.py b/tests/comparison/statement_generator.py
index 55496ac..3784257 100644
--- a/tests/comparison/statement_generator.py
+++ b/tests/comparison/statement_generator.py
@@ -33,20 +33,21 @@ class InsertStatementGenerator(object):
   def __init__(self, profile):
     # QueryProfile-like object
     self.profile = profile
-    # used to generate SELECT queries for INSERT ... SELECT statements;
+    # used to generate SELECT queries for INSERT/UPSERT ... SELECT statements;
     # to ensure state is completely reset, this is created anew with each call to
     # generate_statement()
     self.select_stmt_generator = None
 
   def generate_statement(self, tables, dml_table):
     """
-    Return a randomly generated INSERT statement.
+    Return a randomly generated INSERT or UPSERT statement. Note that UPSERTs are very
+    similar to INSERTs, which is why this generator handles both.
 
     tables should be a list of Table objects. A typical source of such a list comes from
     db_connection.DbCursor.describe_common_tables(). This list describes the possible
-    "sources" of the INSERT's WITH and FROM/WHERE clauses.
+    "sources" of the INSERT/UPSERT's WITH and FROM/WHERE clauses.
 
-    dml_table is a required Table object. The INSERT will be into this table.
+    dml_table is a required Table object. The INSERT/UPSERT will be into this table.
     """
     if not (isinstance(tables, list) and len(tables) > 0 and
             all((isinstance(t, Table) for t in tables))):
@@ -57,23 +58,23 @@ class InsertStatementGenerator(object):
 
     self.select_stmt_generator = QueryGenerator(self.profile)
 
-    if dml_table.primary_keys:
-      insert_statement = InsertStatement(
-          conflict_action=InsertStatement.CONFLICT_ACTION_IGNORE)
-    else:
-      insert_statement = InsertStatement(
-          conflict_action=InsertStatement.CONFLICT_ACTION_DEFAULT)
-
-    insert_statement.execution = StatementExecutionMode.DML_TEST
+    insert_statement = InsertStatement(execution=StatementExecutionMode.DML_TEST)
 
     # Choose whether this is a
-    #   INSERT INTO table SELECT/VALUES
+    #   INSERT/UPSERT INTO table SELECT/VALUES
     # or
-    #   INSERT INTO table (col1, col2, ...) SELECT/VALUES
+    #   INSERT/UPSERT INTO table (col1, col2, ...) SELECT/VALUES
     # If the method returns None, it's the former.
     insert_column_list = self.profile.choose_insert_column_list(dml_table)
+
+    if dml_table.primary_keys:
+      # Having primary keys implies the table is a Kudu table, which makes it subject to
+      # both INSERTs (with automatic ignoring of primary key duplicates) and UPSERTs.
+      conflict_action = self.profile.choose_insert_vs_upsert()
+    else:
+      conflict_action = InsertClause.CONFLICT_ACTION_DEFAULT
     insert_statement.insert_clause = InsertClause(
-        dml_table, column_list=insert_column_list)
+        dml_table, column_list=insert_column_list, conflict_action=conflict_action)
     # We still need to internally track the columns we're inserting. Keep in mind None
     # means "all" without an explicit column list. Since we've already created the
     # InsertClause object though, we can fill this in for ourselves.
@@ -81,7 +82,7 @@ class InsertStatementGenerator(object):
       insert_column_list = dml_table.cols
     insert_item_data_types = [col.type for col in insert_column_list]
 
-    # Decide whether this is INSERT VALUES or INSERT SELECT
+    # Decide whether this is INSERT/UPSERT VALUES or INSERT/UPSERT SELECT
     insert_source_clause = self.profile.choose_insert_source_clause()
 
     if issubclass(insert_source_clause, Query):
@@ -99,7 +100,7 @@ class InsertStatementGenerator(object):
     elif issubclass(insert_source_clause, ValuesClause):
       insert_statement.values_clause = self._generate_values_clause(insert_column_list)
     else:
-      raise Exception('unsupported INSERT source clause: {0}'.format(
+      raise Exception('unsupported INSERT/UPSERT source clause: {0}'.format(
           insert_source_clause))
     return insert_statement
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0154ace6/tests/comparison/tests/query_object_testdata.py
----------------------------------------------------------------------
diff --git a/tests/comparison/tests/query_object_testdata.py b/tests/comparison/tests/query_object_testdata.py
index 4861781..154c8b1 100644
--- a/tests/comparison/tests/query_object_testdata.py
+++ b/tests/comparison/tests/query_object_testdata.py
@@ -87,6 +87,24 @@ KUDU_TABLE = FakeTable(
     ]
 )
 
+FOUR_COL_KUDU_TABLE = FakeTable(
+    'four_col_kudu_table',
+    [
+        FakeColumn('int_col1', Int, is_primary_key=True),
+        FakeColumn('char_col1', Char, is_primary_key=True),
+        FakeColumn('int_col2', Int),
+        FakeColumn('char_col2', Char),
+    ]
+)
+
+
+ONE_COL_KUDU_TABLE = FakeTable(
+    'one_col_kudu_table',
+    [
+        FakeColumn('int_col', Int, is_primary_key=True),
+    ]
+)
+
 # This can't be used inline because we need its table expressions later.
 SIMPLE_WITH_CLAUSE = WithClause(
     TableExprList([
@@ -388,12 +406,13 @@ INSERT_QUERY_TEST_CASES = [
     InsertStatementTest(
         testid='insert into table select cols ignore conflicts',
         query=InsertStatement(
-            insert_clause=InsertClause(KUDU_TABLE),
+            insert_clause=InsertClause(
+                KUDU_TABLE,
+                conflict_action=InsertClause.CONFLICT_ACTION_IGNORE),
             select_query=FakeQuery(
                 select_clause=FakeSelectClause(*SIMPLE_TABLE.cols),
                 from_clause=FromClause(SIMPLE_TABLE)
             ),
-            conflict_action=InsertStatement.CONFLICT_ACTION_IGNORE
         ),
         impala_query_string=(
             'INSERT INTO kudu_table\n'
@@ -414,12 +433,14 @@ INSERT_QUERY_TEST_CASES = [
     InsertStatementTest(
         testid='insert 2 value rows ignore conflicts',
         query=InsertStatement(
-            insert_clause=InsertClause(KUDU_TABLE),
+            insert_clause=InsertClause(
+                KUDU_TABLE,
+                conflict_action=InsertClause.CONFLICT_ACTION_IGNORE,
+            ),
             values_clause=ValuesClause((
                 ValuesRow((Int(1), Char('a'))),
                 ValuesRow((Int(2), Char('b'))),
             )),
-            conflict_action=InsertStatement.CONFLICT_ACTION_IGNORE
         ),
         impala_query_string=(
             'INSERT INTO kudu_table\n'
@@ -439,13 +460,15 @@ INSERT_QUERY_TEST_CASES = [
         testid='insert values seleted from with clause ignore conflicts',
         query=InsertStatement(
             with_clause=SIMPLE_WITH_CLAUSE,
-            insert_clause=InsertClause(KUDU_TABLE,
-                                       column_list=(KUDU_TABLE.cols[0],)),
+            insert_clause=InsertClause(
+                KUDU_TABLE,
+                column_list=(KUDU_TABLE.cols[0],),
+                conflict_action=InsertClause.CONFLICT_ACTION_IGNORE,
+            ),
             select_query=FakeQuery(
                 select_clause=FakeSelectClause(*SIMPLE_WITH_CLAUSE.table_exprs[0].cols),
                 from_clause=FromClause(SIMPLE_WITH_CLAUSE.table_exprs[0])
             ),
-            conflict_action=InsertStatement.CONFLICT_ACTION_IGNORE
         ),
         impala_query_string=(
             'WITH with_view AS (SELECT\n'
@@ -466,5 +489,121 @@ INSERT_QUERY_TEST_CASES = [
             'FROM with_view\n'
             'ON CONFLICT DO NOTHING'
         ),
-    )
+    ),
+    InsertStatementTest(
+        testid='upsert into table select cols',
+        query=InsertStatement(
+            insert_clause=InsertClause(
+                KUDU_TABLE,
+                conflict_action=InsertClause.CONFLICT_ACTION_UPDATE),
+            select_query=FakeQuery(
+                select_clause=FakeSelectClause(*SIMPLE_TABLE.cols),
+                from_clause=FromClause(SIMPLE_TABLE)
+            ),
+        ),
+        impala_query_string=(
+            'UPSERT INTO kudu_table\n'
+            'SELECT\n'
+            'fake_table.int_col,\n'
+            'TRIM(fake_table.char_col)\n'
+            'FROM fake_table'
+        ),
+        postgres_query_string=(
+            'INSERT INTO kudu_table\n'
+            'SELECT\n'
+            'fake_table.int_col,\n'
+            'fake_table.char_col\n'
+            'FROM fake_table\n'
+            'ON CONFLICT (int_col)\n'
+            'DO UPDATE SET\n'
+            'char_col = EXCLUDED.char_col'
+        ),
+    ),
+    InsertStatementTest(
+        testid='upsert 2 value rows',
+        query=InsertStatement(
+            insert_clause=InsertClause(
+                KUDU_TABLE,
+                conflict_action=InsertClause.CONFLICT_ACTION_UPDATE,
+            ),
+            values_clause=ValuesClause((
+                ValuesRow((Int(1), Char('a'))),
+                ValuesRow((Int(2), Char('b'))),
+            )),
+        ),
+        impala_query_string=(
+            'UPSERT INTO kudu_table\n'
+            'VALUES\n'
+            "(1, 'a'),\n"
+            "(2, 'b')"
+        ),
+        postgres_query_string=(
+            'INSERT INTO kudu_table\n'
+            'VALUES\n'
+            "(1, 'a' || ''),\n"
+            "(2, 'b' || '')\n"
+            'ON CONFLICT (int_col)\n'
+            'DO UPDATE SET\n'
+            'char_col = EXCLUDED.char_col'
+        ),
+    ),
+    InsertStatementTest(
+        testid='upsert select into table with multiple pk / updatable columns',
+        query=InsertStatement(
+            insert_clause=InsertClause(
+                FOUR_COL_KUDU_TABLE,
+                conflict_action=InsertClause.CONFLICT_ACTION_UPDATE),
+            select_query=FakeQuery(
+                select_clause=FakeSelectClause(*FOUR_COL_KUDU_TABLE.cols),
+                from_clause=FromClause(FOUR_COL_KUDU_TABLE)
+            ),
+        ),
+        impala_query_string=(
+            'UPSERT INTO four_col_kudu_table\n'
+            'SELECT\n'
+            'four_col_kudu_table.int_col1,\n'
+            'TRIM(four_col_kudu_table.char_col1),\n'
+            'four_col_kudu_table.int_col2,\n'
+            'TRIM(four_col_kudu_table.char_col2)\n'
+            'FROM four_col_kudu_table'
+        ),
+        postgres_query_string=(
+            'INSERT INTO four_col_kudu_table\n'
+            'SELECT\n'
+            'four_col_kudu_table.int_col1,\n'
+            'four_col_kudu_table.char_col1,\n'
+            'four_col_kudu_table.int_col2,\n'
+            'four_col_kudu_table.char_col2\n'
+            'FROM four_col_kudu_table\n'
+            'ON CONFLICT (int_col1, char_col1)\n'
+            'DO UPDATE SET\n'
+            'int_col2 = EXCLUDED.int_col2,\n'
+            'char_col2 = EXCLUDED.char_col2'
+        ),
+    ),
+    InsertStatementTest(
+        testid='upsert select into table with no updatable columns',
+        query=InsertStatement(
+            insert_clause=InsertClause(
+                ONE_COL_KUDU_TABLE,
+                conflict_action=InsertClause.CONFLICT_ACTION_UPDATE),
+            select_query=FakeQuery(
+                select_clause=FakeSelectClause(SIMPLE_TABLE.cols[0]),
+                from_clause=FromClause(SIMPLE_TABLE)
+            ),
+        ),
+        impala_query_string=(
+            'UPSERT INTO one_col_kudu_table\n'
+            'SELECT\n'
+            'fake_table.int_col\n'
+            'FROM fake_table'
+        ),
+        postgres_query_string=(
+            'INSERT INTO one_col_kudu_table\n'
+            'SELECT\n'
+            'fake_table.int_col\n'
+            'FROM fake_table\n'
+            'ON CONFLICT DO NOTHING'
+        ),
+    ),
 ]


[2/7] incubator-impala git commit: IMPALA-4810: Add DECIMAL_V2 query option

Posted by ta...@apache.org.
IMPALA-4810: Add DECIMAL_V2 query option

This query option is going to be used to change both rounding semantics
and establish more appropriate output types for multiply / divide.
The option won't be supported until all the changes are complete,
but this will eventually become the default behavior for DECIMAL.

Change-Id: I6cf25fe5c1766c86ebfe196d49f646e81e50a24e
Reviewed-on: http://gerrit.cloudera.org:8080/5889
Reviewed-by: Michael Ho <kw...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0f8ae355
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0f8ae355
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0f8ae355

Branch: refs/heads/master
Commit: 0f8ae355cee00e1d7b0ee47814d31f7337926bcc
Parents: 7704b64
Author: Zach Amsden <za...@cloudera.com>
Authored: Fri Feb 3 03:40:53 2017 +0000
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Feb 4 01:13:35 2017 +0000

----------------------------------------------------------------------
 be/src/service/query-options.cc            | 4 ++++
 be/src/service/query-options.h             | 6 ++++--
 common/thrift/ImpalaInternalService.thrift | 3 +++
 common/thrift/ImpalaService.thrift         | 8 ++++++--
 4 files changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0f8ae355/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 8be6b19..7227387 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -446,6 +446,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
             iequals(value, "true") || iequals(value, "1"));
         break;
       }
+      case TImpalaQueryOptions::DECIMAL_V2: {
+        query_options->__set_decimal_v2(iequals(value, "true") || iequals(value, "1"));
+        break;
+      }
       default:
         // We hit this DCHECK(false) if we forgot to add the corresponding entry here
         // when we add a new query option.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0f8ae355/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 6531dcb..d4fcd96 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -35,7 +35,7 @@ class TQueryOptions;
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::ENABLE_EXPR_REWRITES + 1);\
+      TImpalaQueryOptions::DECIMAL_V2 + 1);\
   QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -86,7 +86,9 @@ class TQueryOptions;
   QUERY_OPT_FN(prefetch_mode, PREFETCH_MODE)\
   QUERY_OPT_FN(strict_mode, STRICT_MODE)\
   QUERY_OPT_FN(scratch_limit, SCRATCH_LIMIT)\
-  QUERY_OPT_FN(enable_expr_rewrites, ENABLE_EXPR_REWRITES);
+  QUERY_OPT_FN(enable_expr_rewrites, ENABLE_EXPR_REWRITES)\
+  QUERY_OPT_FN(decimal_v2, DECIMAL_V2)\
+  ;
 
 
 /// Converts a TQueryOptions struct into a map of key, value pairs.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0f8ae355/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 5088a1b..e9b962e 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -218,6 +218,9 @@ struct TQueryOptions {
   // Indicates whether the FE should rewrite Exprs for optimization purposes.
   // It's sometimes useful to disable rewrites for testing, e.g., expr-test.cc.
   51: optional bool enable_expr_rewrites = true
+
+  // Indicates whether to use the new decimal semantics.
+  52: optional bool decimal_v2 = false
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0f8ae355/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index ac1cf4a..98c671d 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -242,11 +242,15 @@ enum TImpalaQueryOptions {
   // A limit on the amount of scratch directory space that can be used;
   // Unspecified or a limit of -1 means no limit;
   // Otherwise specified in the same way as MEM_LIMIT.
-  SCRATCH_LIMIT
+  SCRATCH_LIMIT,
 
   // Indicates whether the FE should rewrite Exprs for optimization purposes.
   // It's sometimes useful to disable rewrites for testing, e.g., expr-test.cc.
-  ENABLE_EXPR_REWRITES
+  ENABLE_EXPR_REWRITES,
+
+  // Indicates whether to use the new decimal semantics, which includes better
+  // rounding and output types for multiply / divide
+  DECIMAL_V2,
 }
 
 // The summary of a DML statement.


[7/7] incubator-impala git commit: IMPALA-3524: Don't process spilled partitions with 0 probe rows

Posted by ta...@apache.org.
IMPALA-3524: Don't process spilled partitions with 0 probe rows

In the partitioned hash join node, if a spilled partition has no probe
rows, building the hash table is unnecessary.

For some build types (right outer, right anti, and full outer), we still
need to process the build side to output unmatched rows (in this case, all
rows since there were no probe rows to match).

Testing: Added some cases to spilling.test. Manually tested these cases
for performance, and they all show around a 6% improvement.

Change-Id: I175b32dd9031e51218b38c37693ac3e31dfab47b
Reviewed-on: http://gerrit.cloudera.org:8080/5389
Reviewed-by: Jim Apple <jb...@apache.org>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6a9df540
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6a9df540
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6a9df540

Branch: refs/heads/master
Commit: 6a9df540967e07b09524268d0cc52b7d10835676
Parents: bdd39f6
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Mon Dec 5 15:37:06 2016 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Feb 6 20:22:33 2017 +0000

----------------------------------------------------------------------
 be/src/exec/partitioned-hash-join-node.cc       | 156 +++++++++++++++----
 be/src/exec/partitioned-hash-join-node.h        |  52 +++++--
 .../queries/QueryTest/spilling.test             |  62 ++++++++
 .../tpch/queries/tpch-outer-joins.test          |   3 -
 4 files changed, 233 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a9df540/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 6073486..0c91b47 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -135,6 +135,8 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
 
   num_probe_rows_partitioned_ =
       ADD_COUNTER(runtime_profile(), "ProbeRowsPartitioned", TUnit::UNIT);
+  num_hash_table_builds_skipped_ =
+      ADD_COUNTER(runtime_profile(), "NumHashTableBuildsSkipped", TUnit::UNIT);
   AddCodegenDisabledMessage(state);
   return Status::OK();
 }
@@ -201,6 +203,8 @@ Status PartitionedHashJoinNode::Reset(RuntimeState* state) {
   CloseAndDeletePartitions();
   builder_->Reset();
   memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
+  output_unmatched_batch_.reset();
+  output_unmatched_batch_iter_.reset();
   return ExecNode::Reset(state);
 }
 
@@ -238,6 +242,8 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   if (ht_ctx_ != NULL) ht_ctx_->Close();
   nulls_build_batch_.reset();
+  output_unmatched_batch_.reset();
+  output_unmatched_batch_iter_.reset();
   CloseAndDeletePartitions();
   if (builder_ != NULL) builder_->Close(state);
   Expr::Close(build_expr_ctxs_, state);
@@ -330,13 +336,17 @@ Status PartitionedHashJoinNode::NextSpilledProbeRowBatch(
       // In case of right-outer, right-anti and full-outer joins, we move this partition
       // to the list of partitions that we need to output their unmatched build rows.
       DCHECK(output_build_partitions_.empty());
-      DCHECK(input_partition_->build_partition()->hash_tbl() != NULL)
-          << " id: " << id_
-          << " Build: " << input_partition_->build_partition()->build_rows()->num_rows()
-          << " Probe: " << probe_rows->num_rows() << endl
-          << GetStackTrace();
-      hash_tbl_iterator_ =
-          input_partition_->build_partition()->hash_tbl()->FirstUnmatched(ht_ctx_.get());
+      DCHECK(output_unmatched_batch_iter_.get() == NULL);
+      if (input_partition_->build_partition()->hash_tbl() != NULL) {
+        hash_tbl_iterator_ =
+            input_partition_->build_partition()->hash_tbl()->FirstUnmatched(
+                ht_ctx_.get());
+      } else {
+        output_unmatched_batch_.reset(new RowBatch(
+            child(1)->row_desc(), runtime_state_->batch_size(), builder_->mem_tracker()));
+        output_unmatched_batch_iter_.reset(
+            new RowBatch::Iterator(output_unmatched_batch_.get(), 0));
+      }
       output_build_partitions_.push_back(input_partition_->build_partition());
     } else {
       // In any other case, just close the input build partition.
@@ -365,6 +375,24 @@ Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
   spilled_partitions_.pop_front();
   PhjBuilder::Partition* build_partition = input_partition_->build_partition();
   DCHECK(build_partition->is_spilled());
+  if (input_partition_->probe_rows()->num_rows() == 0) {
+    // If there are no probe rows, there's no need to build the hash table, and
+    // only partitions with NeedToProcessUnmatcheBuildRows() will have been added
+    // to 'spilled_partitions_' in CleanUpHashPartitions().
+    DCHECK(NeedToProcessUnmatchedBuildRows());
+    bool got_read_buffer = false;
+    RETURN_IF_ERROR(input_partition_->build_partition()->build_rows()->PrepareForRead(
+        false, &got_read_buffer));
+    if (!got_read_buffer) {
+      return mem_tracker()->MemLimitExceeded(
+          runtime_state_, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
+    }
+
+    *got_partition = true;
+    UpdateState(PROBING_SPILLED_PARTITION);
+    COUNTER_ADD(num_hash_table_builds_skipped_, 1);
+    return Status::OK();
+  }
 
   // Make sure we have a buffer to read the probe rows before we build the hash table.
   RETURN_IF_ERROR(input_partition_->PrepareForRead());
@@ -614,27 +642,82 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
   return Status::OK();
 }
 
-void PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
+Status PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
   SCOPED_TIMER(probe_timer_);
   DCHECK(NeedToProcessUnmatchedBuildRows());
   DCHECK(!output_build_partitions_.empty());
+  const int start_num_rows = out_batch->num_rows();
+
+  if (output_unmatched_batch_iter_.get() != NULL) {
+    // There were no probe rows so we skipped building the hash table. In this case, all
+    // build rows of the partition are unmatched.
+    RETURN_IF_ERROR(OutputAllBuild(out_batch));
+  } else {
+    // We built and processed the hash table, so sweep over it and output unmatched rows.
+    RETURN_IF_ERROR(OutputUnmatchedBuildFromHashTable(out_batch));
+  }
+
+  num_rows_returned_ += out_batch->num_rows() - start_num_rows;
+  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  return Status::OK();
+}
+
+Status PartitionedHashJoinNode::OutputAllBuild(RowBatch* out_batch) {
+  // This will only be called for partitions that are added to 'output_build_partitions_'
+  // in NextSpilledProbeRowBatch(), which adds one partition that is then processed until
+  // it is done by the loop in GetNext(). So, there must be exactly one partition in
+  // 'output_build_partitions_' here.
+  DCHECK_EQ(output_build_partitions_.size(), 1);
+  ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
+  const int num_conjuncts = conjunct_ctxs_.size();
+  RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
+
+  bool eos = false;
+  while (!eos && !out_batch->AtCapacity()) {
+    if (output_unmatched_batch_iter_->AtEnd()) {
+      output_unmatched_batch_->TransferResourceOwnership(out_batch);
+      output_unmatched_batch_->Reset();
+      RETURN_IF_ERROR(output_build_partitions_.front()->build_rows()->GetNext(
+          output_unmatched_batch_.get(), &eos));
+      output_unmatched_batch_iter_.reset(
+          new RowBatch::Iterator(output_unmatched_batch_.get(), 0));
+    }
+
+    for (; !output_unmatched_batch_iter_->AtEnd() && !out_batch->AtCapacity();
+         output_unmatched_batch_iter_->Next()) {
+      OutputBuildRow(out_batch, output_unmatched_batch_iter_->Get(), &out_batch_iterator);
+      if (ExecNode::EvalConjuncts(
+              conjunct_ctxs, num_conjuncts, out_batch_iterator.Get())) {
+        out_batch->CommitLastRow();
+        out_batch_iterator.Next();
+      }
+    }
+  }
+
+  // If we reached eos and finished the last batch, then there are no other unmatched
+  // build rows for this partition. In that case we need to close the partition.
+  // Otherwise, we reached out_batch capacity and we need to continue to output
+  // unmatched build rows, without closing the partition.
+  if (eos && output_unmatched_batch_iter_->AtEnd()) {
+    output_build_partitions_.front()->Close(out_batch);
+    output_build_partitions_.pop_front();
+    DCHECK(output_build_partitions_.empty());
+    output_unmatched_batch_iter_.reset();
+  }
+  return Status::OK();
+}
+
+Status PartitionedHashJoinNode::OutputUnmatchedBuildFromHashTable(RowBatch* out_batch) {
   ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
   const int num_conjuncts = conjunct_ctxs_.size();
   RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
-  const int start_num_rows = out_batch->num_rows();
 
   while (!out_batch->AtCapacity() && !hash_tbl_iterator_.AtEnd()) {
     // Output remaining unmatched build rows.
     if (!hash_tbl_iterator_.IsMatched()) {
-      TupleRow* build_row = hash_tbl_iterator_.GetRow();
-      DCHECK(build_row != NULL);
-      if (join_op_ == TJoinOp::RIGHT_ANTI_JOIN) {
-        out_batch->CopyRow(build_row, out_batch_iterator.Get());
-      } else {
-        CreateOutputRow(out_batch_iterator.Get(), NULL, build_row);
-      }
-      if (ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts,
-          out_batch_iterator.Get())) {
+      OutputBuildRow(out_batch, hash_tbl_iterator_.GetRow(), &out_batch_iterator);
+      if (ExecNode::EvalConjuncts(
+              conjunct_ctxs, num_conjuncts, out_batch_iterator.Get())) {
         out_batch->CommitLastRow();
         out_batch_iterator.Next();
       }
@@ -646,9 +729,9 @@ void PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
 
   // If we reached the end of the hash table, then there are no other unmatched build
   // rows for this partition. In that case we need to close the partition, and move to
-  // the next. If we have not reached the end of the hash table, it means that we reached
-  // out_batch capacity and we need to continue to output unmatched build rows, without
-  // closing the partition.
+  // the next. If we have not reached the end of the hash table, it means that we
+  // reached out_batch capacity and we need to continue to output unmatched build rows,
+  // without closing the partition.
   if (hash_tbl_iterator_.AtEnd()) {
     output_build_partitions_.front()->Close(out_batch);
     output_build_partitions_.pop_front();
@@ -658,9 +741,17 @@ void PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
           output_build_partitions_.front()->hash_tbl()->FirstUnmatched(ht_ctx_.get());
     }
   }
+  return Status::OK();
+}
 
-  num_rows_returned_ += out_batch->num_rows() - start_num_rows;
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+void PartitionedHashJoinNode::OutputBuildRow(
+    RowBatch* out_batch, TupleRow* build_row, RowBatch::Iterator* out_batch_iterator) {
+  DCHECK(build_row != NULL);
+  if (join_op_ == TJoinOp::RIGHT_ANTI_JOIN) {
+    out_batch->CopyRow(build_row, out_batch_iterator->Get());
+  } else {
+    CreateOutputRow(out_batch_iterator->Get(), NULL, build_row);
+  }
 }
 
 Status PartitionedHashJoinNode::PrepareNullAwareNullProbe() {
@@ -970,13 +1061,24 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(RowBatch* batch) {
       RETURN_IF_ERROR(
           probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
 
-      // Push newly created partitions at the front. This means a depth first walk
-      // (more finely partitioned partitions are processed first). This allows us
-      // to delete blocks earlier and bottom out the recursion earlier.
-      spilled_partitions_.push_front(std::move(probe_hash_partitions_[i]));
+      if (probe_partition->probe_rows()->num_rows() != 0
+          || NeedToProcessUnmatchedBuildRows()) {
+        // Push newly created partitions at the front. This means a depth first walk
+        // (more finely partitioned partitions are processed first). This allows us
+        // to delete blocks earlier and bottom out the recursion earlier.
+        spilled_partitions_.push_front(std::move(probe_hash_partitions_[i]));
+      } else {
+        // There's no more processing to do for this partition, and since there were no
+        // probe rows we didn't return any rows that reference memory from these
+        // partitions, so just free the resources.
+        build_partition->Close(NULL);
+        probe_partition->Close(NULL);
+        COUNTER_ADD(num_hash_table_builds_skipped_, 1);
+      }
     } else {
       DCHECK(probe_partition == NULL);
       if (NeedToProcessUnmatchedBuildRows()) {
+        DCHECK(output_unmatched_batch_iter_.get() == NULL);
         if (output_build_partitions_.empty()) {
           hash_tbl_iterator_ = build_partition->hash_tbl()->FirstUnmatched(ht_ctx_.get());
         }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a9df540/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 504dc7b..6d4e7f4 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -282,12 +282,29 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   int ProcessProbeBatch(const TJoinOp::type join_op, TPrefetchMode::type,
       RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status);
 
-  /// Sweep the hash_tbl_ of the partition that is at the front of
-  /// output_build_partitions_, using hash_tbl_iterator_ and output any unmatched build
-  /// rows. If reaches the end of the hash table it closes that partition, removes it from
-  /// output_build_partitions_ and moves hash_tbl_iterator_ to the beginning of the
-  /// new partition at the front of output_build_partitions_.
-  void OutputUnmatchedBuild(RowBatch* out_batch);
+  /// Used when NeedToProcessUnmatchedBuildRows() is true. Writes all unmatched rows from
+  /// 'output_build_partitions_' to 'out_batch', up to 'out_batch' capacity.
+  Status OutputUnmatchedBuild(RowBatch* out_batch);
+
+  /// Called by OutputUnmatchedBuild() when there isn't a hash table built, which happens
+  /// when a spilled partition had 0 probe rows. In this case, all of the build rows are
+  /// unmatched and we can iterate over the entire build side of the partition, which will
+  /// be the only partition in 'output_build_partitions_'. If it reaches the end of the
+  /// partition, it closes that partition and removes it from 'output_build_partitions_'.
+  Status OutputAllBuild(RowBatch* out_batch);
+
+  /// Called by OutputUnmatchedBuild when there is a hash table built. Sweeps the
+  /// 'hash_tbl_' of the partition that is at the front of 'output_build_partitions_',
+  /// using 'hash_tbl_iterator_' and outputs any unmatched build rows. If it reaches the
+  /// end of the hash table it closes that partition, removes it from
+  /// 'output_build_partitions_' and moves 'hash_tbl_iterator_' to the beginning of the
+  /// new partition at the front of 'output_build_partitions_'.
+  Status OutputUnmatchedBuildFromHashTable(RowBatch* out_batch);
+
+  /// Writes 'build_row' to 'out_batch' at the position of 'out_batch_iterator' in a
+  /// 'join_op_' specific way.
+  void OutputBuildRow(
+      RowBatch* out_batch, TupleRow* build_row, RowBatch::Iterator* out_batch_iterator);
 
   /// Initializes 'null_aware_probe_partition_' and prepares its probe stream for writing.
   Status InitNullAwareProbePartition();
@@ -338,10 +355,12 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// Moves onto the next spilled partition and initializes 'input_partition_'. This
   /// function processes the entire build side of 'input_partition_' and when this
   /// function returns, we are ready to consume the probe side of 'input_partition_'.
-  /// If the build side's hash table fits in memory, we will construct input_partition_'s
-  /// hash table. If it does not, meaning we need to repartition, this function will
-  /// repartition the build rows into 'builder->hash_partitions_' and prepare for
-  /// repartitioning the partition's probe rows.
+  /// If the build side's hash table fits in memory and there are probe rows, we will
+  /// construct input_partition_'s hash table. If it does not fit, meaning we need to
+  /// repartition, this function will repartition the build rows into
+  /// 'builder->hash_partitions_' and prepare for repartitioning the partition's probe
+  /// rows. If there are no probe rows, we just prepare the build side to be read by
+  /// OutputUnmatchedBuild().
   Status PrepareSpilledPartitionForProbe(RuntimeState* state, bool* got_partition);
 
   /// Calls Close() on every probe partition, destroys the partitions and cleans up any
@@ -389,6 +408,10 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// Time spent evaluating other_join_conjuncts for NAAJ.
   RuntimeProfile::Counter* null_aware_eval_timer_;
 
+  /// Number of partitions which had zero probe rows and we therefore didn't build the
+  /// hash table.
+  RuntimeProfile::Counter* num_hash_table_builds_skipped_;
+
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
@@ -451,6 +474,15 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// outputting.
   int64_t null_probe_output_idx_;
 
+  /// Used by OutputAllBuild() to iterate over the entire build side tuple stream of the
+  /// current partition.
+  std::unique_ptr<RowBatch> output_unmatched_batch_;
+
+  /// Stores an iterator into 'output_unmatched_batch_' to start from on the next call to
+  /// OutputAllBuild(), or NULL if there are no partitions without hash tables needing to
+  /// be processed by OutputUnmatchedBuild().
+  std::unique_ptr<RowBatch::Iterator> output_unmatched_batch_iter_;
+
   /// END: Members that must be Reset()
   /////////////////////////////////////////
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a9df540/testdata/workloads/functional-query/queries/QueryTest/spilling.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
index 91b425e..89668e8 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
@@ -628,3 +628,65 @@ BIGINT,BIGINT,BIGINT,INT,DECIMAL,DECIMAL,DECIMAL,DECIMAL,STRING,STRING,STRING,ST
 row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
 row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\)
 ====
+---- QUERY
+# Tests for the case where a spilled partition has 0 probe rows and so we don't build the
+# hash table in a partitioned hash join.
+# INNER JOIN
+set max_block_mgr_memory=10m;
+select straight_join count(*)
+from
+lineitem a, lineitem b
+where
+a.l_partkey = 1 and
+a.l_orderkey = b.l_orderkey;
+---- TYPES
+BIGINT
+---- RESULTS
+173
+---- RUNTIME_PROFILE
+row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
+====
+---- QUERY
+# spilled partition with 0 probe rows, NULL AWARE LEFT ANTI JOIN
+set max_block_mgr_memory=10m;
+select straight_join count(*)
+from
+lineitem a
+where
+a.l_partkey not in (select l_partkey from lineitem where l_partkey > 10)
+and a.l_partkey < 1000;
+---- TYPES
+BIGINT
+---- RESULTS
+287
+---- RUNTIME_PROFILE
+row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
+====
+---- QUERY
+# spilled partition with 0 probe rows, RIGHT OUTER JOIN
+set max_block_mgr_memory=100m;
+select straight_join count(*)
+from
+supplier right outer join lineitem on s_suppkey = l_suppkey
+where s_acctbal > 0 and s_acctbal < 10;
+---- TYPES
+BIGINT
+---- RESULTS
+12138
+---- RUNTIME_PROFILE
+row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
+====
+---- QUERY
+# spilled partition with 0 probe rows, RIGHT ANTI JOIN
+set max_block_mgr_memory=30m;
+with x as (select * from supplier limit 10)
+select straight_join count(*)
+from
+x right anti join lineitem on s_suppkey + 100 = l_suppkey;
+---- TYPES
+BIGINT
+---- RESULTS
+5995258
+---- RUNTIME_PROFILE
+row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
+====
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6a9df540/testdata/workloads/tpch/queries/tpch-outer-joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpch/queries/tpch-outer-joins.test b/testdata/workloads/tpch/queries/tpch-outer-joins.test
index 0da2850..a189a7a 100644
--- a/testdata/workloads/tpch/queries/tpch-outer-joins.test
+++ b/testdata/workloads/tpch/queries/tpch-outer-joins.test
@@ -29,9 +29,6 @@ SELECT straight_join * FROM orders o
 RIGHT OUTER JOIN lineitem l ON o.o_orderkey =  if(l.l_orderkey % 2 = 0, 0, l.l_orderkey)
 ORDER BY l_receiptdate, l_orderkey, l_shipdate
 limit 10
----- CATCH: ANY_OF
-Repartitioning did not reduce the size of a spilled partition
-Memory limit exceeded
 ====
 ---- QUERY
 # Regression test for IMPALA-2612. The following query will cause CastToChar


[6/7] incubator-impala git commit: IMPALA-4887: Skip parquet stats writer test for unsupported filesystems

Posted by ta...@apache.org.
IMPALA-4887: Skip parquet stats writer test for unsupported filesystems

The parquet writer test needs a running Hive instance to compare the
results to. This change disables the test for configurations where we do
not have a local Hive instance.

Change-Id: I0baf14d6f0466d11539c5ada1a1cc1ab7ca11cd6
Reviewed-on: http://gerrit.cloudera.org:8080/5910
Reviewed-by: Jim Apple <jb...@apache.org>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/bdd39f6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/bdd39f6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/bdd39f6a

Branch: refs/heads/master
Commit: bdd39f6a86a582669e6aea4b55b4222700861041
Parents: 0154ace
Author: Lars Volker <lv...@cloudera.com>
Authored: Mon Feb 6 00:22:37 2017 +0100
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Feb 6 04:20:58 2017 +0000

----------------------------------------------------------------------
 tests/query_test/test_insert_parquet.py | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bdd39f6a/tests/query_test/test_insert_parquet.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py
index 58a3d74..73f1aae 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -27,7 +27,7 @@ from tempfile import mkdtemp as make_tmp_dir
 from tests.common.environ import impalad_basedir
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.parametrize import UniqueDatabase
-from tests.common.skip import SkipIfIsilon, SkipIfLocal
+from tests.common.skip import SkipIfIsilon, SkipIfLocal, SkipIfS3
 from tests.common.test_dimensions import create_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.filesystem_utils import get_fs_path
@@ -214,6 +214,9 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
       rmtree(tmp_dir)
 
 
+@SkipIfIsilon.hive
+@SkipIfLocal.hive
+@SkipIfS3.hive
 class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
 
   @classmethod


[3/7] incubator-impala git commit: IMPALA-1670, IMPALA-4141: Support multiple partitions in ALTER TABLE ADD PARTITION

Posted by ta...@apache.org.
IMPALA-1670,IMPALA-4141: Support multiple partitions in ALTER TABLE
ADD PARTITION

Just like Hive, Implala should support multiple partitions in ALTER
TABLE ADD PARTITION statements. The syntax is as follows:

ALTER TABLE table_name ADD [IF NOT EXISTS]
    PARTITION partition_spec1 [location_spec1] [cache_spec1]
    PARTITION partition_spec2 [location_spec2] [cache_spec2]
    ...

Grammar was modified to handle the new syntax. Introduced PartitionDef
class to capture the repeatable part of the statement. TPartitionDef
is the name of the corresponding thrift class.

AlterTableAddPartitionStmt and CatalogOpExecutor classes were also
modified to work with a list of partitions. Duplicate partition specs
are rejected in AlterTableAddPartitionStmt.analyze().

Added FE, E2E and integration tests.

Change-Id: Iddbc951f2931f488f7048c9780260f6b49100750
Reviewed-on: http://gerrit.cloudera.org:8080/4144
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c452595b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c452595b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c452595b

Branch: refs/heads/master
Commit: c452595bff5840f86b35f44d677558d3a940ceab
Parents: 0f8ae35
Author: Attila Jeges <at...@cloudera.com>
Authored: Tue Aug 16 01:05:42 2016 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Feb 4 01:47:23 2017 +0000

----------------------------------------------------------------------
 common/thrift/JniCatalog.thrift                 |  22 +-
 fe/src/main/cup/sql-parser.cup                  |  27 ++-
 .../analysis/AlterTableAddPartitionStmt.java    |  93 ++++----
 .../apache/impala/analysis/PartitionDef.java    | 111 +++++++++
 .../impala/analysis/PartitionKeyValue.java      |  12 +-
 .../apache/impala/analysis/PartitionSpec.java   |  30 ++-
 .../impala/analysis/PartitionSpecBase.java      |   1 +
 .../impala/service/CatalogOpExecutor.java       | 237 +++++++++++++------
 .../apache/impala/analysis/AnalyzeDDLTest.java  |  72 ++++++
 .../impala/analysis/AuthorizationTest.java      |  13 +-
 .../org/apache/impala/analysis/ParserTest.java  |  22 +-
 .../org/apache/impala/analysis/ToSqlTest.java   |  42 ++++
 .../queries/QueryTest/alter-table.test          | 104 ++++++++
 .../queries/QueryTest/grant_revoke.test         |  27 +++
 tests/common/impala_test_suite.py               |  44 ++++
 tests/metadata/test_hms_integration.py          | 169 ++++++++++---
 tests/metadata/test_refresh_partition.py        |  39 +--
 17 files changed, 858 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/common/thrift/JniCatalog.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index d224658..96b2e00 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -171,20 +171,28 @@ struct TAlterTableAddReplaceColsParams {
   2: required bool replace_existing_cols
 }
 
-// Parameters for ALTER TABLE ADD PARTITION commands
-struct TAlterTableAddPartitionParams {
+// Parameters for specifying a single partition in ALTER TABLE ADD PARTITION
+struct TPartitionDef {
   // The partition spec (list of keys and values) to add.
   1: required list<CatalogObjects.TPartitionKeyValue> partition_spec
 
-  // If true, no error is raised if a partition with the same spec already exists.
-  2: required bool if_not_exists
-
   // Optional HDFS storage location for the Partition. If not specified the
   // default storage location is used.
-  3: optional string location
+  2: optional string location
 
   // Optional caching operation to perform on the newly added partition.
-  4: optional THdfsCachingOp cache_op
+  3: optional THdfsCachingOp cache_op
+}
+
+// Parameters for ALTER TABLE ADD PARTITION commands
+struct TAlterTableAddPartitionParams {
+  // If 'if_not_exists' is true, no error is raised when a partition with the same spec
+  // already exists. If multiple partitions are specified, the statement will ignore
+  // those that exist and add the rest.
+  1: required bool if_not_exists
+
+  // The list of partitions to add
+  2: required list<TPartitionDef> partitions
 }
 
 enum TRangePartitionOperationType {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/fe/src/main/cup/sql-parser.cup
----------------------------------------------------------------------
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 33f0591..adef592 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -392,6 +392,8 @@ nonterminal PartitionKeyValue partition_key_value;
 nonterminal PartitionKeyValue static_partition_key_value;
 nonterminal Qualifier union_op;
 
+nonterminal PartitionDef partition_def;
+nonterminal List<PartitionDef> partition_def_list;
 nonterminal AlterTableStmt alter_tbl_stmt;
 nonterminal StatementBase alter_view_stmt;
 nonterminal ComputeStatsStmt compute_stats_stmt;
@@ -914,16 +916,31 @@ opt_kw_role ::=
   | /* empty */
   ;
 
+partition_def ::=
+  partition_spec:partition location_val:location cache_op_val:cache_op
+  {: RESULT = new PartitionDef(partition, location, cache_op); :}
+  ;
+
+partition_def_list ::=
+  partition_def:item
+  {:
+    List<PartitionDef> list = Lists.newArrayList(item);
+    RESULT = list;
+  :}
+  | partition_def_list:list partition_def:item
+  {:
+    list.add(item);
+    RESULT = list;
+  :}
+  ;
+
 alter_tbl_stmt ::=
   KW_ALTER KW_TABLE table_name:table replace_existing_cols_val:replace KW_COLUMNS
   LPAREN column_def_list:col_defs RPAREN
   {: RESULT = new AlterTableAddReplaceColsStmt(table, col_defs, replace); :}
   | KW_ALTER KW_TABLE table_name:table KW_ADD if_not_exists_val:if_not_exists
-    partition_spec:partition location_val:location cache_op_val:cache_op
-  {:
-    RESULT = new AlterTableAddPartitionStmt(table, partition,
-        location, if_not_exists, cache_op);
-  :}
+    partition_def_list:partitions
+  {: RESULT = new AlterTableAddPartitionStmt(table, if_not_exists, partitions); :}
   | KW_ALTER KW_TABLE table_name:table KW_DROP opt_kw_column ident_or_default:col_name
   {: RESULT = new AlterTableDropColStmt(table, col_name); :}
   | KW_ALTER KW_TABLE table_name:table KW_ADD if_not_exists_val:if_not_exists

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
index b946436..151f245 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
@@ -17,16 +17,20 @@
 
 package org.apache.impala.analysis;
 
-import org.apache.impala.authorization.Privilege;
-import org.apache.impala.catalog.HdfsTable;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
+
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.common.AnalysisException;
-import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.thrift.TAlterTableAddPartitionParams;
 import org.apache.impala.thrift.TAlterTableParams;
 import org.apache.impala.thrift.TAlterTableType;
-import org.apache.hadoop.fs.permission.FsAction;
+
+import java.util.List;
+import java.util.Set;
 
 import com.google.common.base.Preconditions;
 
@@ -34,48 +38,44 @@ import com.google.common.base.Preconditions;
  * Represents an ALTER TABLE ADD PARTITION statement.
  */
 public class AlterTableAddPartitionStmt extends AlterTableStmt {
-  private final HdfsUri location_;
   private final boolean ifNotExists_;
-  private final PartitionSpec partitionSpec_;
-  private final HdfsCachingOp cacheOp_;
+  private final List<PartitionDef> partitions_;
 
   public AlterTableAddPartitionStmt(TableName tableName,
-      PartitionSpec partitionSpec, HdfsUri location, boolean ifNotExists,
-      HdfsCachingOp cacheOp) {
+      boolean ifNotExists, List<PartitionDef> partitions) {
     super(tableName);
-    Preconditions.checkNotNull(partitionSpec);
-    location_ = location;
+    Preconditions.checkNotNull(partitions);
+    Preconditions.checkState(!partitions.isEmpty());
+    partitions_ = partitions;
+    // If 'ifNotExists' is true, no error is raised if a partition with the same spec
+    // already exists. If multiple partitions are specified, the statement will ignore
+    // those that exist and add the rest.
     ifNotExists_ = ifNotExists;
-    partitionSpec_ = partitionSpec;
-    partitionSpec_.setTableName(tableName);
-    cacheOp_ = cacheOp;
+    for (PartitionDef p: partitions_) {
+      p.setTableName(tableName);
+      if (!ifNotExists_) p.setPartitionShouldNotExist();
+    }
   }
 
   public boolean getIfNotExists() { return ifNotExists_; }
-  public HdfsUri getLocation() { return location_; }
 
   @Override
   public String toSql() {
-    StringBuilder sb = new StringBuilder("ALTER TABLE " + getTbl());
-    sb.append(" ADD ");
-    if (ifNotExists_) {
-      sb.append("IF NOT EXISTS ");
-    }
-    sb.append(" " + partitionSpec_.toSql());
-    if (location_ != null) sb.append(String.format(" LOCATION '%s'", location_));
-    if (cacheOp_ != null) sb.append(cacheOp_.toSql());
+    StringBuilder sb = new StringBuilder("ALTER TABLE ");
+    if (getDb() != null) sb.append(getDb() + ".");
+    sb.append(getTbl()).append(" ADD");
+    if (ifNotExists_) sb.append(" IF NOT EXISTS");
+    for (PartitionDef p: partitions_) sb.append(" " + p.toSql());
     return sb.toString();
   }
 
   @Override
   public TAlterTableParams toThrift() {
-    TAlterTableParams params = super.toThrift();
-    params.setAlter_type(TAlterTableType.ADD_PARTITION);
     TAlterTableAddPartitionParams addPartParams = new TAlterTableAddPartitionParams();
-    addPartParams.setPartition_spec(partitionSpec_.toThrift());
-    addPartParams.setLocation(location_ == null ? null : location_.toString());
     addPartParams.setIf_not_exists(ifNotExists_);
-    if (cacheOp_ != null) addPartParams.setCache_op(cacheOp_.toThrift());
+    for (PartitionDef p: partitions_) addPartParams.addToPartitions(p.toThrift());
+    TAlterTableParams params = super.toThrift();
+    params.setAlter_type(TAlterTableType.ADD_PARTITION);
     params.setAdd_partition_params(addPartParams);
     return params;
   }
@@ -86,34 +86,21 @@ public class AlterTableAddPartitionStmt extends AlterTableStmt {
     Table table = getTargetTable();
     if (table instanceof KuduTable) {
       throw new AnalysisException("ALTER TABLE ADD PARTITION is not supported for " +
-          "Kudu tables: " + partitionSpec_.toSql());
+          "Kudu tables: " + table.getTableName());
     }
-    if (!ifNotExists_) partitionSpec_.setPartitionShouldNotExist();
-    partitionSpec_.setPrivilegeRequirement(Privilege.ALTER);
-    partitionSpec_.analyze(analyzer);
-    if (location_ != null) {
-      location_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE);
+    if (partitions_.size() > CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC) {
+      throw new AnalysisException(
+          String.format("One ALTER TABLE ADD PARTITION cannot add more than %d " +
+          "partitions.", CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC));
     }
+    Set<String> partitionSpecs = Sets.newHashSet();
+    for (PartitionDef p: partitions_) {
+      p.analyze(analyzer);
 
-    boolean shouldCache = false;
-    if (cacheOp_ != null) {
-      cacheOp_.analyze(analyzer);
-      shouldCache = cacheOp_.shouldCache();
-    } else if (table instanceof HdfsTable) {
-      shouldCache = ((HdfsTable)table).isMarkedCached();
-    }
-    if (shouldCache) {
-      if (!(table instanceof HdfsTable)) {
-        throw new AnalysisException("Caching must target a HDFS table: " +
-            table.getFullName());
-      }
-      HdfsTable hdfsTable = (HdfsTable)table;
-      if ((location_ != null && !FileSystemUtil.isPathCacheable(location_.getPath())) ||
-          (location_ == null && !hdfsTable.isLocationCacheable())) {
-        throw new AnalysisException(String.format("Location '%s' cannot be cached. " +
-            "Please retry without caching: ALTER TABLE %s ADD PARTITION ... UNCACHED",
-            (location_ != null) ? location_.toString() : hdfsTable.getLocation(),
-            table.getFullName()));
+      // Make sure no duplicate partition specs are specified
+      if (!partitionSpecs.add(p.getPartitionSpec().toCanonicalString())) {
+        throw new AnalysisException(String.format("Duplicate partition spec: (%s)",
+            Joiner.on(", ").join(p.getPartitionSpec().getPartitionSpecKeyValues())));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/fe/src/main/java/org/apache/impala/analysis/PartitionDef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/PartitionDef.java b/fe/src/main/java/org/apache/impala/analysis/PartitionDef.java
new file mode 100644
index 0000000..6a1175d
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/PartitionDef.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.impala.authorization.Privilege;
+import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.catalog.Table;
+import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.thrift.TPartitionDef;
+
+/**
+ * Represents a partition definition used in ALTER TABLE ADD PARTITION consisting of
+ * partition key-value pairs and an optional location and optional caching options.
+ */
+public class PartitionDef implements ParseNode {
+  private final PartitionSpec partitionSpec_;
+  private final HdfsUri location_;
+  private final HdfsCachingOp cacheOp_;
+
+  public PartitionDef(PartitionSpec partitionSpec, HdfsUri location,
+      HdfsCachingOp cacheOp) {
+    Preconditions.checkNotNull(partitionSpec);
+    partitionSpec_ = partitionSpec;
+    location_ = location;
+    cacheOp_ = cacheOp;
+  }
+
+  public void setTableName(TableName tableName) {
+    partitionSpec_.setTableName(tableName);
+  }
+  public void setPartitionShouldNotExist() {
+    partitionSpec_.setPartitionShouldNotExist();
+  }
+
+  public HdfsUri getLocation() { return location_; }
+  public PartitionSpec getPartitionSpec() { return partitionSpec_; }
+
+  @Override
+  public String toSql() {
+    StringBuilder sb = new StringBuilder(partitionSpec_.toSql());
+    if (location_ != null) sb.append(String.format(" LOCATION '%s'", location_));
+    if (cacheOp_ != null) sb.append(" " + cacheOp_.toSql());
+    return sb.toString();
+  }
+
+  public TPartitionDef toThrift() {
+    TPartitionDef params = new TPartitionDef();
+    params.setPartition_spec(partitionSpec_.toThrift());
+    if (location_ != null) params.setLocation(location_.toString());
+    if (cacheOp_ != null) params.setCache_op(cacheOp_.toThrift());
+    return params;
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    partitionSpec_.setPrivilegeRequirement(Privilege.ALTER);
+    partitionSpec_.analyze(analyzer);
+
+    if (location_ != null) {
+      location_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE);
+    }
+
+    Table table;
+    try {
+      table = analyzer.getTable(partitionSpec_.getTableName(), Privilege.ALTER,
+          false);
+    } catch (TableLoadingException e) {
+      throw new AnalysisException(e.getMessage(), e);
+    }
+
+    Preconditions.checkState(table instanceof HdfsTable);
+    HdfsTable hdfsTable = (HdfsTable)table;
+
+    boolean shouldCache;
+    if (cacheOp_ != null) {
+      cacheOp_.analyze(analyzer);
+      shouldCache = cacheOp_.shouldCache();
+    } else {
+      shouldCache = hdfsTable.isMarkedCached();
+    }
+    if (shouldCache) {
+      if ((location_ != null && !FileSystemUtil.isPathCacheable(location_.getPath())) ||
+          (location_ == null && !hdfsTable.isLocationCacheable())) {
+        throw new AnalysisException(String.format("Location '%s' cannot be cached. " +
+            "Please retry without caching: ALTER TABLE %s ADD PARTITION ... UNCACHED",
+            (location_ != null) ? location_.toString() : hdfsTable.getLocation(),
+            hdfsTable.getFullName()));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/fe/src/main/java/org/apache/impala/analysis/PartitionKeyValue.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/PartitionKeyValue.java b/fe/src/main/java/org/apache/impala/analysis/PartitionKeyValue.java
index 4289108..bc387f0 100644
--- a/fe/src/main/java/org/apache/impala/analysis/PartitionKeyValue.java
+++ b/fe/src/main/java/org/apache/impala/analysis/PartitionKeyValue.java
@@ -17,8 +17,9 @@
 
 package org.apache.impala.analysis;
 
-import org.apache.impala.common.AnalysisException;
 import com.google.common.base.Preconditions;
+import org.apache.impala.common.AnalysisException;
+import java.util.Comparator;
 
 /**
  * Representation of a single column:value element in the PARTITION (...) clause of an
@@ -85,4 +86,13 @@ public class PartitionKeyValue {
     }
     return literalValue.getStringValue();
   }
+
+  public static Comparator<PartitionKeyValue> getColNameComparator() {
+    return new Comparator<PartitionKeyValue>() {
+      @Override
+      public int compare(PartitionKeyValue t, PartitionKeyValue o) {
+        return t.colName_.compareTo(o.colName_);
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/fe/src/main/java/org/apache/impala/analysis/PartitionSpec.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/PartitionSpec.java b/fe/src/main/java/org/apache/impala/analysis/PartitionSpec.java
index c131f84..2ea53c4 100644
--- a/fe/src/main/java/org/apache/impala/analysis/PartitionSpec.java
+++ b/fe/src/main/java/org/apache/impala/analysis/PartitionSpec.java
@@ -17,8 +17,11 @@
 
 package org.apache.impala.analysis;
 
-import java.util.List;
-import java.util.Set;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.impala.catalog.Column;
@@ -26,11 +29,9 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TPartitionKeyValue;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
 
 /**
  * Represents a partition spec - a collection of partition key/values.
@@ -46,9 +47,7 @@ public class PartitionSpec extends PartitionSpecBase {
     this.partitionSpec_ = ImmutableList.copyOf(partitionSpec);
   }
 
-  public List<PartitionKeyValue> getPartitionSpecKeyValues() {
-    return partitionSpec_;
-  }
+  public List<PartitionKeyValue> getPartitionSpecKeyValues() { return partitionSpec_; }
 
   public boolean partitionExists() {
     Preconditions.checkNotNull(partitionExists_);
@@ -149,4 +148,15 @@ public class PartitionSpec extends PartitionSpecBase {
     }
     return String.format("PARTITION (%s)", Joiner.on(", ").join(partitionSpecStr));
   }
+
+  /**
+   * Utility method that returns the concatenated string of key=value pairs ordered by
+   * key. Since analyze() ensures that there are no duplicate keys in partition specs,
+   * this method provides a uniquely comparable string representation for this object.
+   */
+  public String toCanonicalString() {
+    List<PartitionKeyValue> sortedPartitionSpec = Lists.newArrayList(partitionSpec_);
+    Collections.sort(sortedPartitionSpec, PartitionKeyValue.getColNameComparator());
+    return Joiner.on(",").join(sortedPartitionSpec);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/fe/src/main/java/org/apache/impala/analysis/PartitionSpecBase.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/PartitionSpecBase.java b/fe/src/main/java/org/apache/impala/analysis/PartitionSpecBase.java
index e76936e..5ead2d8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/PartitionSpecBase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/PartitionSpecBase.java
@@ -40,6 +40,7 @@ public abstract class PartitionSpecBase implements ParseNode {
   public String getTbl() { return tableName_.getTbl(); }
 
   public void setTableName(TableName tableName) {this.tableName_ = tableName; }
+  public TableName getTableName() { return tableName_; }
 
   // The value Hive is configured to use for NULL partition key values.
   // Set during analysis.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 76571cc..c11f990 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -17,7 +17,14 @@
 
 package org.apache.impala.service;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -27,6 +34,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -129,6 +137,7 @@ import org.apache.impala.thrift.TGrantRevokeRoleParams;
 import org.apache.impala.thrift.THdfsCachingOp;
 import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TPartitionKeyValue;
+import org.apache.impala.thrift.TPartitionDef;
 import org.apache.impala.thrift.TPartitionStats;
 import org.apache.impala.thrift.TPrivilege;
 import org.apache.impala.thrift.TResetMetadataRequest;
@@ -147,12 +156,6 @@ import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 /**
  * Class used to execute Catalog Operations, including DDL and refresh/invalidate
  * metadata requests. Acts as a bridge between the Thrift catalog operation requests
@@ -234,7 +237,9 @@ public class CatalogOpExecutor {
 
   // The maximum number of partitions to update in one Hive Metastore RPC.
   // Used when persisting the results of COMPUTE STATS statements.
-  private final static short MAX_PARTITION_UPDATES_PER_RPC = 500;
+  // It is also used as an upper limit for the number of partitions allowed in one ADD
+  // PARTITION statement.
+  public final static short MAX_PARTITION_UPDATES_PER_RPC = 500;
 
   public CatalogOpExecutor(CatalogServiceCatalog catalog) {
     catalog_ = catalog;
@@ -378,6 +383,8 @@ public class CatalogOpExecutor {
           catalog_.getLock().writeLock().unlock();
         }
       }
+
+      Table refreshedTable = null;
       // Get a new catalog version to assign to the table being altered.
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       boolean reloadMetadata = true;
@@ -396,15 +403,10 @@ public class CatalogOpExecutor {
           reloadTableSchema = true;
           break;
         case ADD_PARTITION:
-          TAlterTableAddPartitionParams addPartParams =
-              params.getAdd_partition_params();
-          // Create and add HdfsPartition object to the corresponding HdfsTable and
-          // load its block metadata. Get the new table object with an updated catalog
-          // version. If the partition already exists in Hive and "IfNotExists" is
-          // true, then return without populating the response object.
-          Table refreshedTable = alterTableAddPartition(tbl,
-              addPartParams.getPartition_spec(), addPartParams.isIf_not_exists(),
-              addPartParams.getLocation(), addPartParams.getCache_op());
+          // Create and add HdfsPartition objects to the corresponding HdfsTable and load
+          // their block metadata. Get the new table object with an updated catalog
+          // version.
+          refreshedTable = alterTableAddPartitions(tbl, params.getAdd_partition_params());
           if (refreshedTable != null) {
             refreshedTable.setCatalogVersion(newCatalogVersion);
             addTableToCatalogUpdate(refreshedTable, response.result);
@@ -1905,38 +1907,145 @@ public class CatalogOpExecutor {
   }
 
   /**
-   * Adds a new partition to the given table in Hive. Also creates and adds
-   * a new HdfsPartition to the corresponding HdfsTable.
-   * If cacheOp is not null, the partition's location will be cached according
-   * to the cacheOp. If cacheOp is null, the new partition will inherit the
-   * the caching properties of the parent table.
-   * Returns null if the partition already exists in Hive and "IfNotExists"
-   * is true. Otherwise, returns the table object with an updated catalog version.
+   * Adds new partitions to the given table in HMS. Also creates and adds new
+   * HdfsPartitions to the corresponding HdfsTable. Returns the table object with an
+   * updated catalog version or null if the table is not altered because all the
+   * partitions already exist and IF NOT EXISTS is specified.
+   * If IF NOT EXISTS is not used and there is a conflict with the partitions that already
+   * exist in HMS or catalog cache, then:
+   * - HMS and catalog cache are left intact, and
+   * - ImpalaRuntimeException is thrown.
+   * If IF NOT EXISTS is used, conflicts are handled as follows:
+   * 1. If a partition exists in catalog cache, ignore it.
+   * 2. If a partition exists in HMS but not in catalog cache, reload partition from HMS.
+   * Caching directives are only applied to new partitions that were absent from both the
+   * catalog cache and the HMS.
    */
-  private Table alterTableAddPartition(Table tbl, List<TPartitionKeyValue> partitionSpec,
-      boolean ifNotExists, String location, THdfsCachingOp cacheOp)
-      throws ImpalaException {
+  private Table alterTableAddPartitions(Table tbl,
+      TAlterTableAddPartitionParams addPartParams) throws ImpalaException {
     Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
-    TableName tableName = tbl.getTableName();
-    if (ifNotExists && catalog_.containsHdfsPartition(tableName.getDb(),
-        tableName.getTbl(), partitionSpec)) {
-      LOG.trace(String.format("Skipping partition creation because (%s) already exists" +
-          " and ifNotExists is true.", Joiner.on(", ").join(partitionSpec)));
-      return null;
-    }
 
-    org.apache.hadoop.hive.metastore.api.Partition partition = null;
-    Table result = null;
-    List<Long> cacheIds = null;
+    TableName tableName = tbl.getTableName();
     org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy();
-    Long parentTblCacheDirId =
-        HdfsCachingUtil.getCacheDirectiveId(msTbl.getParameters());
+    boolean ifNotExists = addPartParams.isIf_not_exists();
+    List<Partition> hmsPartitionsToAdd = Lists.newArrayList();
+    Map<List<String>, THdfsCachingOp> partitionCachingOpMap = Maps.newHashMap();
+    for (TPartitionDef partParams: addPartParams.getPartitions()) {
+      List<TPartitionKeyValue> partitionSpec = partParams.getPartition_spec();
+      if (catalog_.containsHdfsPartition(tableName.getDb(), tableName.getTbl(),
+          partitionSpec)) {
+        String partitionSpecStr = Joiner.on(", ").join(partitionSpec);
+        if (!ifNotExists) {
+          throw new ImpalaRuntimeException(String.format("Partition already " +
+              "exists: (%s)", partitionSpecStr));
+        }
+        LOG.trace(String.format("Skipping partition creation because (%s) already " +
+            "exists and IF NOT EXISTS was specified.", partitionSpecStr));
+        continue;
+      }
+
+      Partition hmsPartition = createHmsPartition(partitionSpec, msTbl, tableName,
+          partParams.getLocation());
+      hmsPartitionsToAdd.add(hmsPartition);
+
+      THdfsCachingOp cacheOp = partParams.getCache_op();
+      if (cacheOp != null) partitionCachingOpMap.put(hmsPartition.getValues(), cacheOp);
+    }
 
-    partition = createHmsPartition(partitionSpec, msTbl, tableName, location);
+    if (hmsPartitionsToAdd.isEmpty()) return null;
 
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      // Add the new partition.
-      partition = msClient.getHiveClient().add_partition(partition);
+      // Add partitions in bulk
+      List<Partition> addedHmsPartitions = null;
+      try {
+        addedHmsPartitions = msClient.getHiveClient().add_partitions(hmsPartitionsToAdd,
+            ifNotExists, true);
+      } catch (TException e) {
+        throw new ImpalaRuntimeException(
+            String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partitions"), e);
+      }
+
+      // Handle HDFS cache. This is done in a separate round bacause we have to apply
+      // caching only to newly added partitions.
+      alterTableCachePartitions(msTbl, msClient, tableName, addedHmsPartitions,
+          partitionCachingOpMap);
+
+      // If 'ifNotExists' is true, add_partitions() may fail to add all the partitions to
+      // HMS because some of them may already exist there. In that case, we load in the
+      // catalog the partitions that already exist in HMS but aren't in the catalog yet.
+      if (hmsPartitionsToAdd.size() != addedHmsPartitions.size()) {
+        List<Partition> difference = computeDifference(hmsPartitionsToAdd,
+            addedHmsPartitions);
+        addedHmsPartitions.addAll(
+            getPartitionsFromHms(msTbl, msClient, tableName, difference));
+      }
+
+      for (Partition partition: addedHmsPartitions) {
+        // Create and add the HdfsPartition to catalog. Return the table object with an
+        // updated catalog version.
+        addHdfsPartition(tbl, partition);
+      }
+      return tbl;
+    }
+  }
+
+  /**
+   * Returns the list of Partition objects from 'aList' that cannot be found in 'bList'.
+   * Partition objects are distinguished by partition values only.
+   */
+  private List<Partition> computeDifference(List<Partition> aList,
+      List<Partition> bList) {
+    Set<List<String>> bSet = Sets.newHashSet();
+    for (Partition b: bList) bSet.add(b.getValues());
+
+    List<Partition> diffList = Lists.newArrayList();
+    for (Partition a: aList) {
+      if (!bSet.contains(a.getValues())) diffList.add(a);
+    }
+    return diffList;
+  }
+
+  /**
+   * Returns a list of partitions retrieved from HMS for each 'hmsPartitions' element.
+   */
+  private List<Partition> getPartitionsFromHms(
+      org.apache.hadoop.hive.metastore.api.Table msTbl, MetaStoreClient msClient,
+      TableName tableName, List<Partition> hmsPartitions)
+      throws ImpalaException {
+    List<String> partitionCols = Lists.newArrayList();
+    for (FieldSchema fs: msTbl.getPartitionKeys()) partitionCols.add(fs.getName());
+
+    List<String> partitionNames = Lists.newArrayListWithCapacity(hmsPartitions.size());
+    for (Partition part: hmsPartitions) {
+      String partName = org.apache.hadoop.hive.common.FileUtils.makePartName(
+          partitionCols, part.getValues());
+      partitionNames.add(partName);
+    }
+    try {
+      return msClient.getHiveClient().getPartitionsByNames(tableName.getDb(),
+          tableName.getTbl(), partitionNames);
+    } catch (TException e) {
+      throw new ImpalaRuntimeException("Metadata inconsistency has occured. Please run "
+          + "'invalidate metadata <tablename>' to resolve the problem.", e);
+    }
+  }
+
+  /**
+   * Applies HDFS caching ops on 'hmsPartitions' and updates their metadata in Hive
+   * Metastore.
+   * 'partitionCachingOpMap' maps partitions (identified by their partition values) to
+   * their corresponding HDFS caching ops.
+   */
+  private void alterTableCachePartitions(org.apache.hadoop.hive.metastore.api.Table msTbl,
+      MetaStoreClient msClient, TableName tableName, List<Partition> hmsPartitions,
+      Map<List<String>, THdfsCachingOp> partitionCachingOpMap)
+      throws ImpalaException {
+    // Handle HDFS cache
+    List<Long> cacheIds = Lists.newArrayList();
+    List<Partition> hmsPartitionsToCache = Lists.newArrayList();
+    Long parentTblCacheDirId = HdfsCachingUtil.getCacheDirectiveId(msTbl.getParameters());
+    for (Partition partition: hmsPartitions) {
+      THdfsCachingOp cacheOp = partitionCachingOpMap.get(partition.getValues());
       String cachePoolName = null;
       Short replication = null;
       if (cacheOp == null && parentTblCacheDirId != null) {
@@ -1964,28 +2073,16 @@ public class CatalogOpExecutor {
       if (cachePoolName != null) {
         long id = HdfsCachingUtil.submitCachePartitionDirective(partition,
             cachePoolName, replication);
-        cacheIds = Lists.<Long>newArrayList(id);
-        // Update the partition metadata to include the cache directive id.
-        msClient.getHiveClient().alter_partition(partition.getDbName(),
-            partition.getTableName(), partition);
-      }
-      updateLastDdlTime(msTbl, msClient);
-    } catch (AlreadyExistsException e) {
-      if (!ifNotExists) {
-        throw new ImpalaRuntimeException(
-            String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partition"), e);
+        cacheIds.add(id);
+        hmsPartitionsToCache.add(partition);
       }
-      LOG.trace(String.format("Ignoring '%s' when adding partition to %s because" +
-          " ifNotExists is true.", e, tableName));
-    } catch (TException e) {
-      throw new ImpalaRuntimeException(
-          String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partition"), e);
     }
-    if (cacheIds != null) catalog_.watchCacheDirs(cacheIds, tableName.toThrift());
-    // Return the table object with an updated catalog version after creating the
-    // partition.
-    result = addHdfsPartition(tbl, partition);
-    return result;
+
+    // Update the partition metadata to include the cache directive id.
+    if (!cacheIds.isEmpty()) {
+      applyAlterHmsPartitions(msTbl, msClient, tableName, hmsPartitionsToCache);
+      catalog_.watchCacheDirs(cacheIds, tableName.toThrift());
+    }
   }
 
   /**
@@ -2699,15 +2796,21 @@ public class CatalogOpExecutor {
   private void applyAlterPartition(Table tbl, HdfsPartition partition)
       throws ImpalaException {
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      TableName tableName = tbl.getTableName();
-      msClient.getHiveClient().alter_partition(
-          tableName.getDb(), tableName.getTbl(), partition.toHmsPartition());
-      org.apache.hadoop.hive.metastore.api.Table msTbl =
-          tbl.getMetaStoreTable().deepCopy();
+      applyAlterHmsPartitions(tbl.getMetaStoreTable().deepCopy(), msClient,
+          tbl.getTableName(), Arrays.asList(partition.toHmsPartition()));
+    }
+  }
+
+  private void applyAlterHmsPartitions(org.apache.hadoop.hive.metastore.api.Table msTbl,
+      MetaStoreClient msClient, TableName tableName, List<Partition> hmsPartitions)
+      throws ImpalaException {
+    try {
+      msClient.getHiveClient().alter_partitions(tableName.getDb(), tableName.getTbl(),
+          hmsPartitions);
       updateLastDdlTime(msTbl, msClient);
     } catch (TException e) {
       throw new ImpalaRuntimeException(
-          String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_partition"), e);
+          String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_partitions"), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 5a08f98..29c59e8 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -44,6 +44,7 @@ import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.FrontendTestBase;
 import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.thrift.TDescribeTableParams;
 import org.apache.impala.util.MetaStoreUtil;
@@ -260,6 +261,77 @@ public class AnalyzeDDLTest extends FrontendTestBase {
   }
 
   @Test
+  public void TestAlterTableAddMultiplePartitions() {
+    for (String cl: new String[]{"if not exists", ""}) {
+      // Add multiple partitions.
+      AnalyzesOk("alter table functional.alltypes add " + cl +
+          " partition(year=2050, month=10)" +
+          " partition(year=2050, month=11)" +
+          " partition(year=2050, month=12)");
+      // Duplicate partition specifications.
+      AnalysisError("alter table functional.alltypes add " + cl +
+          " partition(year=2050, month=10)" +
+          " partition(year=2050, month=11)" +
+          " partition(Month=10, YEAR=2050)",
+          "Duplicate partition spec: (month=10, year=2050)");
+
+      // Multiple partitions with locations and caching.
+      AnalyzesOk("alter table functional.alltypes add " + cl +
+          " partition(year=2050, month=10) location" +
+          " '/test-warehouse/alltypes/y2050m10' cached in 'testPool'" +
+          " partition(year=2050, month=11) location" +
+          " 'hdfs://localhost:20500/test-warehouse/alltypes/y2050m11'" +
+          " cached in 'testPool' with replication = 7" +
+          " partition(year=2050, month=12) location" +
+          " 'file:///test-warehouse/alltypes/y2050m12' uncached");
+      // One of the partitions points to an invalid URI.
+      AnalysisError("alter table functional.alltypes add " + cl +
+          " partition(year=2050, month=10) location" +
+          " '/test-warehouse/alltypes/y2050m10' cached in 'testPool'" +
+          " partition(year=2050, month=11) location" +
+          " 'hdfs://localhost:20500/test-warehouse/alltypes/y2050m11'" +
+          " cached in 'testPool' with replication = 7" +
+          " partition(year=2050, month=12) location" +
+          " 'fil:///test-warehouse/alltypes/y2050m12' uncached",
+          "No FileSystem for scheme: fil");
+      // One of the partitions is cached in a non-existent pool.
+      AnalysisError("alter table functional.alltypes add " + cl +
+          " partition(year=2050, month=10) location" +
+          " '/test-warehouse/alltypes/y2050m10' cached in 'testPool'" +
+          " partition(year=2050, month=11) location" +
+          " 'hdfs://localhost:20500/test-warehouse/alltypes/y2050m11'" +
+          " cached in 'nonExistentTestPool' with replication = 7" +
+          " partition(year=2050, month=12) location" +
+          " 'file:///test-warehouse/alltypes/y2050m12' uncached",
+          "The specified cache pool does not exist: nonExistentTestPool");
+    }
+
+    // Test the limit for the number of partitions
+    StringBuilder stmt = new StringBuilder("alter table functional.alltypes add");
+    int year;
+    int month;
+    for (int i = 0; i < CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC; ++i) {
+      year = i/12 + 2050;
+      month = i%12 + 1;
+      stmt.append(String.format(" partition(year=%d, month=%d)", year, month));
+    }
+    AnalyzesOk(stmt.toString());
+    // Over the limit by one partition
+    year = CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC/12 + 2050;
+    month = CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC%12 + 1;
+    stmt.append(String.format(" partition(year=%d, month=%d)", year, month));
+    AnalysisError(stmt.toString(),
+        String.format("One ALTER TABLE ADD PARTITION cannot add more than %d partitions.",
+        CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC));
+
+    // If 'IF NOT EXISTS' is not used, ALTER TABLE ADD PARTITION cannot add a preexisting
+    // partition to a table.
+    AnalysisError("alter table functional.alltypes add partition(year=2050, month=1)" +
+        "partition(year=2010, month=1) partition(year=2050, month=2)",
+        "Partition spec already exists: (year=2010, month=1)");
+  }
+
+  @Test
   public void TestAlterTableAddReplaceColumns() throws AnalysisException {
     AnalyzesOk("alter table functional.alltypes add columns (new_col int)");
     AnalyzesOk("alter table functional.alltypes add columns (c1 string comment 'hi')");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java b/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
index 9821694..25ec45c 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
@@ -1219,7 +1219,6 @@ public class AuthorizationTest {
     AuthzOk("ALTER TABLE functional_seq_snap.alltypes SET CACHED IN 'testPool'");
     AuthzOk("ALTER TABLE functional_seq_snap.alltypes RECOVER PARTITIONS");
 
-
     // Alter table and set location to a path the user does not have access to.
     AuthzError("ALTER TABLE functional_seq_snap.alltypes SET LOCATION " +
         "'hdfs://localhost:20500/test-warehouse/no_access'",
@@ -1234,6 +1233,18 @@ public class AuthorizationTest {
         "User '%s' does not have privileges to access: " +
         "hdfs://localhost:20500/test-warehouse/no_access");
 
+    // Add multiple partitions. User has access to location path.
+    AuthzOk("ALTER TABLE functional_seq_snap.alltypes ADD " +
+        "PARTITION(year=2011, month=1) " +
+        "PARTITION(year=2011, month=2) " +
+        "LOCATION 'hdfs://localhost:20500/test-warehouse/new_table'");
+    // For one new partition location is set to a path the user does not have access to.
+    AuthzError("ALTER TABLE functional_seq_snap.alltypes ADD " +
+        "PARTITION(year=2011, month=3) " +
+        "PARTITION(year=2011, month=4) LOCATION '/test-warehouse/no_access'",
+        "User '%s' does not have privileges to access: " +
+        "hdfs://localhost:20500/test-warehouse/no_access");
+
     // Different filesystem, user has permission to base path.
     AuthzError("ALTER TABLE functional_seq_snap.alltypes SET LOCATION " +
         "'hdfs://localhost:20510/test-warehouse/new_table'",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index 2b37bf3..14cd036 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -2046,12 +2046,25 @@ public class ParserTest extends FrontendTestBase {
     ParsesOk("ALTER TABLE Foo ADD PARTITION (i=NULL, j=2, k=NULL)");
     ParsesOk("ALTER TABLE Foo ADD PARTITION (i=abc, j=(5*8+10), k=!true and false)");
 
+    // Multiple partition specs
+    ParsesOk("ALTER TABLE Foo ADD PARTITION (i=1, s='one') " +
+        "PARTITION (i=2, s='two') PARTITION (i=3, s='three')");
+    ParsesOk("ALTER TABLE TestDb.Foo ADD PARTITION (i=1, s='one') LOCATION 'a/b' " +
+        "PARTITION (i=2, s='two') LOCATION 'c/d' " +
+        "PARTITION (i=3, s='three') " +
+        "PARTITION (i=4, s='four') LOCATION 'e/f'");
+    ParsesOk("ALTER TABLE TestDb.Foo ADD IF NOT EXISTS " +
+        "PARTITION (i=1, s='one') " +
+        "PARTITION (i=2, s='two') LOCATION 'c/d'");
+    ParserError("ALTER TABLE TestDb.Foo ADD " +
+        "PARTITION (i=1, s='one') " +
+        "IF NOT EXISTS PARTITION (i=2, s='two') LOCATION 'c/d'");
+
     // Location needs to be a string literal
     ParserError("ALTER TABLE TestDb.Foo ADD PARTITION (i=1, s='Hello') LOCATION a/b");
 
     // Caching ops
     ParsesOk("ALTER TABLE Foo ADD PARTITION (j=2) CACHED IN 'pool'");
-    ParsesOk("ALTER TABLE Foo ADD PARTITION (j=2) CACHED IN 'pool'");
     ParserError("ALTER TABLE Foo ADD PARTITION (j=2) CACHED 'pool'");
     ParserError("ALTER TABLE Foo ADD PARTITION (j=2) CACHED IN");
     ParserError("ALTER TABLE Foo ADD PARTITION (j=2) CACHED");
@@ -2066,6 +2079,13 @@ public class ParserTest extends FrontendTestBase {
     ParserError("ALTER TABLE Foo ADD PARTITION (j=2) CACHED IN 'pool' LOCATION 'a/b'");
     ParserError("ALTER TABLE Foo ADD PARTITION (j=2) UNCACHED LOCATION 'a/b'");
 
+    // Multiple partition specs with caching ops
+    ParsesOk("ALTER TABLE Foo ADD PARTITION (j=2) CACHED IN 'pool' " +
+        "PARTITION (j=3) UNCACHED " +
+        "PARTITION (j=4) CACHED IN 'pool' WITH replication = 3 " +
+        "PARTITION (j=5) LOCATION 'a/b' CACHED IN 'pool' " +
+        "PARTITION (j=5) LOCATION 'c/d' CACHED IN 'pool' with replication = 3");
+
     ParserError("ALTER TABLE Foo ADD IF EXISTS PARTITION (i=1, s='Hello')");
     ParserError("ALTER TABLE TestDb.Foo ADD (i=1, s='Hello')");
     ParserError("ALTER TABLE TestDb.Foo ADD (i=1)");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index 2b52a68..98b55f9 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -1048,6 +1048,48 @@ public class ToSqlTest extends FrontendTestBase {
   }
 
   @Test
+  public void alterTableAddPartitionTest() {
+    // Add partition
+    testToSql(
+        "alter table functional.alltypes add partition (year=2050, month=1)",
+        "ALTER TABLE functional.alltypes ADD PARTITION (year=2050, month=1)");
+    // Add multiple partitions
+    testToSql(
+        "alter table functional.alltypes add partition (year=2050, month=1) " +
+        "partition (year=2050, month=2)",
+        "ALTER TABLE functional.alltypes ADD PARTITION (year=2050, month=1) " +
+        "PARTITION (year=2050, month=2)");
+    // with IF NOT EXISTS
+    testToSql(
+        "alter table functional.alltypes add if not exists " +
+        "partition (year=2050, month=1) " +
+        "partition (year=2050, month=2)",
+        "ALTER TABLE functional.alltypes ADD IF NOT EXISTS " +
+        "PARTITION (year=2050, month=1) " +
+        "PARTITION (year=2050, month=2)");
+    // with location
+    testToSql(
+        "alter table functional.alltypes add if not exists " +
+        "partition (year=2050, month=1) location 'hdfs://localhost:20500/y2050m1' " +
+        "partition (year=2050, month=2) location '/y2050m2'",
+        "ALTER TABLE functional.alltypes ADD IF NOT EXISTS "+
+        "PARTITION (year=2050, month=1) LOCATION 'hdfs://localhost:20500/y2050m1' " +
+        "PARTITION (year=2050, month=2) LOCATION 'hdfs://localhost:20500/y2050m2'");
+    // and caching
+    testToSql(
+        "alter table functional.alltypes add if not exists " +
+        "partition (year=2050, month=1) location 'hdfs://localhost:20500/y2050m1' " +
+        "cached in 'testPool' with replication=3 " +
+        "partition (year=2050, month=2) location '/y2050m2' " +
+        "uncached",
+        "ALTER TABLE functional.alltypes ADD IF NOT EXISTS "+
+        "PARTITION (year=2050, month=1) LOCATION 'hdfs://localhost:20500/y2050m1' " +
+        "CACHED IN 'testPool' WITH REPLICATION = 3 " +
+        "PARTITION (year=2050, month=2) LOCATION 'hdfs://localhost:20500/y2050m2' " +
+        "UNCACHED");
+  }
+
+  @Test
   public void testAnalyticExprs() {
     testToSql(
         "select sum(int_col) over (partition by id order by tinyint_col "

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/testdata/workloads/functional-query/queries/QueryTest/alter-table.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/alter-table.test b/testdata/workloads/functional-query/queries/QueryTest/alter-table.test
index 3230b9e..e067bc1 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/alter-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/alter-table.test
@@ -1003,3 +1003,107 @@ select * from i4155_alter;
 ---- TYPES
 INT, STRING
 ====
+---- QUERY
+# IMPALA-1670: Support adding multiple partitions in ALTER TABLE ADD PARTITION
+create table i1670A_alter (s string) partitioned by (i integer);
+alter table i1670A_alter add
+partition (i=1) location '/i1' cached in 'testPool' with replication=3
+partition (i=2) location '/i2'
+partition (i=3) uncached;
+show partitions i1670A_alter;
+---- RESULTS
+'1',-1,0,'0B','0B','3','TEXT','false',regex:.*/i1
+'2',-1,0,'0B','NOT CACHED','NOT CACHED','TEXT','false',regex:.*/i2
+'3',-1,0,'0B','NOT CACHED','NOT CACHED','TEXT','false',regex:.*/i=3
+'Total',-1,0,'0B','0B','','','',''
+---- TYPES
+STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+# IMPALA-1670: Set up i1670C_alter table for the next test case.
+create table i1670C_alter (s string) partitioned by (i integer);
+alter table i1670C_alter add
+partition (i=2) location '/i2A' cached in 'testPool' with replication=2
+partition (i=4) location '/i4A' uncached;
+show partitions i1670C_alter;
+---- RESULTS
+'2',-1,0,'0B','0B','2','TEXT','false',regex:.*/i2A
+'4',-1,0,'0B','NOT CACHED','NOT CACHED','TEXT','false',regex:.*/i4A
+'Total',-1,0,'0B','0B','','','',''
+---- TYPES
+STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+# IMPALA-1670: If 'IF NOT EXISTS' is used ALTER TABLE ADD PARTITION works with preexisting
+# partitions. Location and caching options of existing partitions are not modified.
+alter table i1670C_alter add if not exists
+partition (i=1) location '/i1B'
+partition (i=2) location '/i2B' uncached
+partition (i=3) location '/i3B' cached in 'testPool' with replication=3
+partition (i=4) location '/i4B' cached in 'testPool' with replication=4;
+show partitions i1670C_alter;
+---- RESULTS
+'1',-1,0,'0B','NOT CACHED','NOT CACHED','TEXT','false',regex:.*/i1B
+'2',-1,0,'0B','0B','2','TEXT','false',regex:.*/i2A
+'3',-1,0,'0B','0B','3','TEXT','false',regex:.*/i3B
+'4',-1,0,'0B','NOT CACHED','NOT CACHED','TEXT','false',regex:.*/i4A
+'Total',-1,0,'0B','0B','','','',''
+---- TYPES
+STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+# IMPALA-1670: Partitions without explicit CACHED IN/UNCACHED clause inherit cacheop from
+# the parent table
+create table i1670D_alter (s string) partitioned by (i integer)
+cached in 'testPool' with replication=7;
+alter table i1670D_alter add
+partition (i=1) cached in 'testPool' with replication=5
+partition (i=2)
+partition (i=3) uncached
+partition (i=4);
+show partitions i1670D_alter;
+---- RESULTS
+'1',-1,0,'0B','0B','5','TEXT','false',regex:.*/i=1
+'2',-1,0,'0B','0B','7','TEXT','false',regex:.*/i=2
+'3',-1,0,'0B','NOT CACHED','NOT CACHED','TEXT','false',regex:.*/i=3
+'4',-1,0,'0B','0B','7','TEXT','false',regex:.*/i=4
+'Total',-1,0,'0B','0B','','','',''
+---- TYPES
+STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+# IMPALA-1670: After INVALIDATE METADATA Impala can access previously added partitions and
+# partition data.
+create table i1670E_alter (a int) partitioned by (x int);
+alter table i1670E_alter add partition (x=1)
+partition (x=2) uncached
+partition (x=3) location '/x3' cached in 'testPool' with replication=7;
+insert into i1670E_alter partition(x=1) values (1), (2), (3);
+insert into i1670E_alter partition(x=2) values (1), (2), (3), (4);
+insert into i1670E_alter partition(x=3) values (1);
+invalidate metadata i1670E_alter;
+====
+---- QUERY
+show partitions i1670E_alter;
+---- RESULTS
+'1',-1,1,regex:.*,'NOT CACHED','NOT CACHED','TEXT','false',regex:.*/x=1
+'2',-1,1,regex:.*,'NOT CACHED','NOT CACHED','TEXT','false',regex:.*/x=2
+'3',-1,1,regex:.*,regex:.*,'7','TEXT','false',regex:.*/x3
+'Total',-1,3,regex:.*,regex:.*,'','','',''
+---- TYPES
+STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+select x, a from i1670E_alter order by x, a;
+---- RESULTS
+1,1
+1,2
+1,3
+2,1
+2,2
+2,3
+2,4
+3,1
+---- TYPES
+INT, INT
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/testdata/workloads/functional-query/queries/QueryTest/grant_revoke.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/grant_revoke.test b/testdata/workloads/functional-query/queries/QueryTest/grant_revoke.test
index fe340c2..8beb04f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/grant_revoke.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/grant_revoke.test
@@ -234,10 +234,37 @@ show tables in grant_rev_db
 STRING
 ====
 ---- QUERY
+# IMPALA-1670: User does not have privileges to access URI when adding partitions
+create table grant_rev_db.test_tbl_partitioned(i int) partitioned by (j int);
+alter table grant_rev_db.test_tbl_partitioned add
+partition (j=1)
+partition (j=2) location '$FILESYSTEM_PREFIX/test-warehouse/grant_rev_test_prt';
+---- CATCH
+does not have privileges to access: $NAMENODE/test-warehouse/grant_rev_test_prt
+====
+---- QUERY
+grant all on uri '$FILESYSTEM_PREFIX/test-warehouse/grant_rev_test_prt'
+to grant_revoke_test_ALL_URI;
+====
+---- QUERY
+# Should now have privileges to add partitions
+alter table grant_rev_db.test_tbl_partitioned add
+partition (j=1)
+partition (j=2) location '$FILESYSTEM_PREFIX/test-warehouse/grant_rev_test_prt';
+show partitions grant_rev_db.test_tbl_partitioned;
+---- RESULTS
+'1',-1,0,'0B','NOT CACHED','NOT CACHED','TEXT','false',regex:.*/j=1
+'2',-1,0,'0B','NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/grant_rev_test_prt'
+'Total',-1,0,'0B','0B','','','',''
+---- TYPES
+STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
 show grant role grant_revoke_test_ALL_URI
 ---- RESULTS
 'URI','','','','$NAMENODE/test-warehouse/grant_rev_test_tbl2','ALL',FALSE,regex:.+
 'URI','','','','$NAMENODE/test-warehouse/GRANT_REV_TEST_TBL3','ALL',FALSE,regex:.+
+'URI','','','','$NAMENODE/test-warehouse/grant_rev_test_prt','ALL',FALSE,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index e3b98a4..e90c5ac 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -197,6 +197,41 @@ class ImpalaTestSuite(BaseTestSuite):
       except Exception as e:
         LOG.info('Unexpected exception when executing ' + query_str + ' : ' + str(e))
 
+  def get_impala_partition_info(self, table_name, *include_fields):
+    """
+    Find information about partitions of a table, as returned by a SHOW PARTITION
+    statement. Return a list that contains one tuple for each partition.
+
+    If 'include_fields' is not specified, the tuples will contain all the fields returned
+    by SHOW PARTITION. Otherwise, return only those fields whose names are listed in
+    'include_fields'. Field names are compared case-insensitively.
+    """
+    exec_result = self.client.execute('show partitions %s' % table_name)
+    fieldSchemas = exec_result.schema.fieldSchemas
+    fields_dict = {}
+    for idx, fs in enumerate(fieldSchemas):
+      fields_dict[fs.name.lower()] = idx
+
+    rows = exec_result.get_data().split('\n')
+    rows.pop()
+    fields_idx = []
+    for fn in include_fields:
+      fn = fn.lower()
+      assert fn in fields_dict, 'Invalid field: %s' % fn
+      fields_idx.append(fields_dict[fn])
+
+    result = []
+    for row in rows:
+      fields = row.split('\t')
+      if not fields_idx:
+        result_fields = fields
+      else:
+        result_fields = []
+        for i in fields_idx:
+          result_fields.append(fields[i])
+      result.append(tuple(result_fields))
+    return result
+
   def __verify_exceptions(self, expected_strs, actual_str, use_db):
     """
     Verifies that at least one of the strings in 'expected_str' is a substring of the
@@ -574,6 +609,15 @@ class ImpalaTestSuite(BaseTestSuite):
       raise RuntimeError(stderr)
     return stdout
 
+  def hive_partition_names(self, table_name):
+    """Find the names of the partitions of a table, as Hive sees them.
+
+    The return format is a list of strings. Each string represents a partition
+    value of a given column in a format like 'column1=7/column2=8'.
+    """
+    return self.run_stmt_in_hive(
+        'show partitions %s' % table_name).split('\n')[1:-1]
+
   @classmethod
   def create_table_info_dimension(cls, exploration_strategy):
     # If the user has specified a specific set of table formats to run against, then

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/tests/metadata/test_hms_integration.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_hms_integration.py b/tests/metadata/test_hms_integration.py
index 8dca2f6..05bacfc 100644
--- a/tests/metadata/test_hms_integration.py
+++ b/tests/metadata/test_hms_integration.py
@@ -233,31 +233,6 @@ class TestHmsIntegration(ImpalaTestSuite):
       result[stat_names[i]] = stat_values[i]
     return result
 
-  def impala_partition_names(self, table_name):
-    """Find the names of the partitions of a table, as Impala sees them.
-
-    The return format is a list of lists of strings. Each string represents
-    a partition value of a given column.
-    """
-    rows = self.client.execute('show partitions %s' %
-                               table_name).get_data().split('\n')
-    rows.pop()
-    result = []
-    for row in rows:
-      fields = row.split('\t')
-      name = fields[0:-8]
-      result.append(name)
-    return result
-
-  def hive_partition_names(self, table_name):
-    """Find the names of the partitions of a table, as Hive sees them.
-
-    The return format is a list of strings. Each string represents a partition
-    value of a given column in a format like 'column1=7/column2=8'.
-    """
-    return self.run_stmt_in_hive(
-        'show partitions %s' % table_name).split('\n')[1:-1]
-
   def impala_columns(self, table_name):
     """
     Returns a dict with column names as the keys and dicts of type and comments
@@ -290,14 +265,22 @@ class TestHmsIntegration(ImpalaTestSuite):
                     for i in range(0, 16)])
 
   def assert_sql_error(self, engine, command, *strs_in_error):
-    reached_unreachable = False
+    """
+    Passes 'command' to 'engine' callable (e.g. execute method of a BeeswaxConnection
+    object) and makes sure that it raises an exception.
+    It also verifies that the string representation of the exception contains all the
+    strings listed in 'strs_in_error'.
+
+    If the call doesn't raise an exception or the exception doesn't contain one of the
+    strings in 'strs_in_error', it throws AssertError exception.
+    """
+
     try:
       engine(command)
-      reached_unreachable = True
     except Exception as e:
       for str_in_error in strs_in_error:
         assert str_in_error in str(e)
-    if reached_unreachable:
+    else:
       assert False, '%s should have triggered an error containing %s' % (
           command, strs_in_error)
 
@@ -352,7 +335,7 @@ class TestHmsIntegration(ImpalaTestSuite):
             table_name)
         self.client.execute('compute incremental stats %s' % table_name)
         # Impala can see the partition's name
-        assert [['333', '5309']] == self.impala_partition_names(table_name)
+        assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
         # Impala's compute stats didn't alter Hive's knowledge of the partition
         assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
     self.add_hive_partition_table_stats_helper(vector, DbWrapper, TableWrapper)
@@ -393,7 +376,7 @@ class TestHmsIntegration(ImpalaTestSuite):
         self.client.execute(
             'insert into table %s partition (y=42, z=867) values (2)'
             % table_name)
-        assert [['42', '867']] == self.impala_partition_names(table_name)
+        assert [('42', '867')] == self.get_impala_partition_info(table_name, 'y', 'z')
         assert ['y=42/z=867'] == self.hive_partition_names(table_name)
 
   @pytest.mark.execute_serially
@@ -447,7 +430,7 @@ class TestHmsIntegration(ImpalaTestSuite):
 
       with self.ImpalaTableWrapper(self, db_name + '.' + self.unique_string(),
                                    '(x int) partitioned by (y int)') as table_name:
-        assert [] == self.impala_partition_names(table_name)
+        assert [] == self.get_impala_partition_info(table_name, 'y')
         self.run_stmt_in_hive(
             'insert into table %s partition (y=33) values (44)'
             % table_name)
@@ -691,3 +674,127 @@ class TestHmsIntegration(ImpalaTestSuite):
         self.assert_sql_error(self.client.execute,
                               'describe %s' % table_name,
                               'Could not resolve path')
+
+  @pytest.mark.execute_serially
+  def test_add_overlapping_partitions(self, vector):
+    """
+    IMPALA-1670, IMPALA-4141: Test interoperability with Hive when adding overlapping
+    partitions to a table
+    """
+    with self.ImpalaDbWrapper(self, self.unique_string()) as db_name:
+      # Create a table in Impala.
+      with self.ImpalaTableWrapper(self, db_name + '.' + self.unique_string(),
+                                   '(a int) partitioned by (x int)') as table_name:
+        # Trigger metadata load. No partitions exist yet in Impala.
+        assert [] == self.get_impala_partition_info(table_name, 'x')
+
+        # Add partition in Hive.
+        self.run_stmt_in_hive("alter table %s add partition (x=2)" % table_name)
+        # Impala is not aware of the new partition.
+        assert [] == self.get_impala_partition_info(table_name, 'x')
+
+        # Try to add partitions with caching in Impala, one of them (x=2) exists in HMS.
+        self.assert_sql_error(self.client.execute,
+            "alter table %s add partition (x=1) uncached "
+            "partition (x=2) cached in 'testPool' with replication=2 "
+            "partition (x=3) cached in 'testPool' with replication=3" % table_name,
+            "Partition already exists")
+        # No partitions were added in Impala.
+        assert [] == self.get_impala_partition_info(table_name, 'x')
+
+        # It should succeed with IF NOT EXISTS.
+        self.client.execute("alter table %s add if not exists partition (x=1) uncached "
+            "partition (x=2) cached in 'testPool' with replication=2 "
+            "partition (x=3) cached in 'testPool' with replication=3" % table_name)
+
+        # Hive sees all the partitions.
+        assert ['x=1', 'x=2', 'x=3'] == self.hive_partition_names(table_name)
+
+        # Impala sees the partition that has already existed in HMS (x=2) and the newly
+        # added partitions (x=1) and (x=3).
+        # Caching has been applied only to newly added partitions (x=1) and (x=3), the
+        # preexisting partition (x=2) was not modified.
+        partitions = self.get_impala_partition_info(table_name, 'x', 'Bytes Cached',
+            'Cache Replication')
+        assert [('1', 'NOT CACHED', 'NOT CACHED'),
+            ('2', 'NOT CACHED', 'NOT CACHED'),
+            ('3', '0B', '3')] == partitions
+
+        # Try to add location to a partition that is already in catalog cache (x=1).
+        self.client.execute("alter table %s add if not exists "\
+            "partition (x=1) location '/_X_1'" % table_name)
+        # (x=1) partition's location hasn't changed
+        (x1_value, x1_location) = self.get_impala_partition_info(table_name, 'x',
+            'Location')[0]
+        assert '1' == x1_value
+        assert x1_location.endswith("/x=1");
+
+  @pytest.mark.execute_serially
+  def test_add_preexisting_partitions_with_data(self, vector):
+    """
+    IMPALA-1670, IMPALA-4141: After addding partitions that already exist in HMS, Impala
+    can access the partition data.
+    """
+    with self.ImpalaDbWrapper(self, self.unique_string()) as db_name:
+      # Create a table in Impala.
+      with self.ImpalaTableWrapper(self, db_name + '.' + self.unique_string(),
+                                   '(a int) partitioned by (x int)') as table_name:
+        # Trigger metadata load. No partitions exist yet in Impala.
+        assert [] == self.get_impala_partition_info(table_name, 'x')
+
+        # Add partitions in Hive.
+        self.run_stmt_in_hive("alter table %s add partition (x=1) "
+            "partition (x=2) "
+            "partition (x=3)" % table_name)
+        # Insert rows in Hive
+        self.run_stmt_in_hive("insert into %s partition(x=1) values (1), (2), (3)"
+            % table_name)
+        self.run_stmt_in_hive("insert into %s partition(x=2) values (1), (2), (3), (4)"
+            % table_name)
+        self.run_stmt_in_hive("insert into %s partition(x=3) values (1)"
+            % table_name)
+        # No partitions exist yet in Impala.
+        assert [] == self.get_impala_partition_info(table_name, 'x')
+
+        # Add the same partitions in Impala with IF NOT EXISTS.
+        self.client.execute("alter table %s add if not exists partition (x=1) "\
+            "partition (x=2) "
+            "partition (x=3)" % table_name)
+        # Impala sees the partitions
+        assert [('1',), ('2',), ('3',)] == self.get_impala_partition_info(table_name, 'x')
+        # Data exists in Impala
+        assert ['1\t1', '1\t2', '1\t3',
+            '2\t1', '2\t2', '2\t3', '2\t4',
+            '3\t1'] ==\
+            self.client.execute('select x, a from %s order by x, a' %
+            table_name).get_data().split('\n')
+
+  @pytest.mark.execute_serially
+  def test_impala_partitions_accessible_in_hive(self, vector):
+    """
+    IMPALA-1670, IMPALA-4141: Partitions added in Impala are accessible through Hive
+    """
+    with self.ImpalaDbWrapper(self, self.unique_string()) as db_name:
+      # Create a table in Impala.
+      with self.ImpalaTableWrapper(self, db_name + '.' + self.unique_string(),
+                                   '(a int) partitioned by (x int)') as table_name:
+        # Add partitions in Impala.
+        self.client.execute("alter table %s add partition (x=1) "
+            "partition (x=2) "
+            "partition (x=3)" % table_name)
+        # Insert rows in Impala
+        self.client.execute("insert into %s partition(x=1) values (1), (2), (3)"
+            % table_name)
+        self.client.execute("insert into %s partition(x=2) values (1), (2), (3), (4)"
+            % table_name)
+        self.client.execute("insert into %s partition(x=3) values (1)"
+            % table_name)
+
+        # Hive sees the partitions
+        assert ['x=1', 'x=2', 'x=3'] == self.hive_partition_names(table_name)
+        # Data exists in Hive
+        data = self.run_stmt_in_hive('select x, a from %s order by x, a' % table_name)
+        assert ['x,a',
+            '1,1', '1,2', '1,3',
+            '2,1', '2,2', '2,3', '2,4',
+            '3,1'] == data.strip().split('\n')

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c452595b/tests/metadata/test_refresh_partition.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_refresh_partition.py b/tests/metadata/test_refresh_partition.py
index 2489c81..4602ebc 100644
--- a/tests/metadata/test_refresh_partition.py
+++ b/tests/metadata/test_refresh_partition.py
@@ -43,29 +43,6 @@ class TestRefreshPartition(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(
         create_uncompressed_text_dimension(cls.get_workload()))
 
-  def impala_partition_names(self, table_name):
-    """
-    Find the names of the partitions of a table, as Impala sees them.
-    The return format is a list of lists of strings. Each string represents
-    a partition value of a given column.
-    """
-    rows = self.client.execute('show partitions %s' %
-                               table_name).get_data().split('\n')
-    """
-    According to the output of 'show partitions' query, the first (n-8)
-    columns are the columns on which the table is partitioned
-    """
-    return [row.split('\t')[0:-8] for row in rows[:-1]]
-
-  def hive_partition_names(self, table_name):
-    """
-    Find the names of the partitions of a table, as Hive sees them.
-    The return format is a list of strings. Each string represents a partition
-    value of a given column in a format like 'column1=7/column2=8'.
-    """
-    return self.run_stmt_in_hive(
-        'show partitions %s' % table_name).split('\n')[1:-1]
-
   def test_add_hive_partition_and_refresh(self, vector, unique_database):
     """
     Partition added in Hive can be viewed in Impala after refreshing
@@ -75,14 +52,14 @@ class TestRefreshPartition(ImpalaTestSuite):
     self.client.execute(
         'create table %s (x int) partitioned by (y int, z int)' %
         table_name)
-    assert [] == self.impala_partition_names(table_name)
+    assert [] == self.get_impala_partition_info(table_name, 'y', 'z')
     self.run_stmt_in_hive(
         'alter table %s add partition (y=333, z=5309)' % table_name)
     # Make sure Impala can't see the partition yet
-    assert [] == self.impala_partition_names(table_name)
+    assert [] == self.get_impala_partition_info(table_name, 'y', 'z')
     self.client.execute('refresh %s partition (y=333, z=5309)' % table_name)
     # Impala can see the partition
-    assert [['333', '5309']] == self.impala_partition_names(table_name)
+    assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
     # Impala's refresh didn't alter Hive's knowledge of the partition
     assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
 
@@ -97,14 +74,14 @@ class TestRefreshPartition(ImpalaTestSuite):
         table_name)
     self.client.execute(
         'alter table %s add partition (y=333, z=5309)' % table_name)
-    assert [['333', '5309']] == self.impala_partition_names(table_name)
+    assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
     self.run_stmt_in_hive(
         'alter table %s drop partition (y=333, z=5309)' % table_name)
     # Make sure Impala can still see the partition
-    assert [['333', '5309']] == self.impala_partition_names(table_name)
+    assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
     self.client.execute('refresh %s partition (y=333, z=5309)' % table_name)
     # Impala can see the partition is not there anymore
-    assert [] == self.impala_partition_names(table_name)
+    assert [] == self.get_impala_partition_info(table_name, 'y', 'z')
     # Impala's refresh didn't alter Hive's knowledge of the partition
     assert [] == self.hive_partition_names(table_name)
 
@@ -142,10 +119,10 @@ class TestRefreshPartition(ImpalaTestSuite):
         table_name)
     self.client.execute(
         'alter table %s add partition (y=333, z=5309)' % table_name)
-    assert [['333', '5309']] == self.impala_partition_names(table_name)
+    assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
     assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
     self.client.execute('refresh %s partition (y=71, z=8857)' % table_name)
-    assert [['333', '5309']] == self.impala_partition_names(table_name)
+    assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
     assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
 
   def test_remove_data_and_refresh(self, vector, unique_database):


[4/7] incubator-impala git commit: IMPALA-3785: Record query handle for invalid handle

Posted by ta...@apache.org.
IMPALA-3785: Record query handle for invalid handle

Add the query handle to error messages for Invalid Query Handle
for beeswax and HS2 interfaces.

Change-Id: Ibc113b3673e1b90f81e80e841740b8006bfd31ba
Reviewed-on: http://gerrit.cloudera.org:8080/5748
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/39987d9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/39987d9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/39987d9b

Branch: refs/heads/master
Commit: 39987d9bd130cb83483215d31197ec3c9a719b9d
Parents: c452595
Author: Zachary Amsden <za...@cloudera.com>
Authored: Thu Jan 19 21:38:56 2017 +0000
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Feb 4 04:17:06 2017 +0000

----------------------------------------------------------------------
 be/src/service/impala-beeswax-server.cc | 17 +++++++++----
 be/src/service/impala-hs2-server.cc     | 36 ++++++++++++++++++----------
 be/src/util/debug-util-test.cc          | 11 +++++++++
 tests/comparison/leopard/report.py      |  2 +-
 tests/hs2/test_hs2.py                   | 17 +++++++++++++
 5 files changed, 64 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/39987d9b/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index ad4963b..bd4ee67 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -189,8 +189,9 @@ void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata,
   QueryHandleToTUniqueId(handle, &query_id);
   VLOG_QUERY << "get_results_metadata(): query_id=" << PrintId(query_id);
   shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
-  if (exec_state.get() == NULL) {
-    RaiseBeeswaxException("Invalid query handle", SQLSTATE_GENERAL_ERROR);
+  if (UNLIKELY(exec_state.get() == nullptr)) {
+    RaiseBeeswaxException(Substitute("Invalid query handle: $0", PrintId(query_id)),
+      SQLSTATE_GENERAL_ERROR);
   }
 
   {
@@ -249,7 +250,8 @@ beeswax::QueryState::type ImpalaServer::get_state(const QueryHandle& handle) {
     return entry->second->query_state();
   } else {
     VLOG_QUERY << "ImpalaServer::get_state invalid handle";
-    RaiseBeeswaxException("Invalid query handle", SQLSTATE_GENERAL_ERROR);
+    RaiseBeeswaxException(Substitute("Invalid query handle: $0", PrintId(query_id)),
+      SQLSTATE_GENERAL_ERROR);
   }
   // dummy to keep compiler happy
   return beeswax::QueryState::FINISHED;
@@ -454,7 +456,9 @@ inline void ImpalaServer::QueryHandleToTUniqueId(const QueryHandle& handle,
 Status ImpalaServer::FetchInternal(const TUniqueId& query_id,
     const bool start_over, const int32_t fetch_size, beeswax::Results* query_results) {
   shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
-  if (exec_state == NULL) return Status("Invalid query handle");
+  if (UNLIKELY(exec_state == nullptr)) {
+    return Status(Substitute("Invalid query handle: $0", PrintId(query_id)));
+  }
 
   // Make sure QueryExecState::Wait() has completed before fetching rows. Wait() ensures
   // that rows are ready to be fetched (e.g., Wait() opens QueryExecState::output_exprs_,
@@ -512,7 +516,10 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id,
 Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id,
     TInsertResult* insert_result) {
   shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
-  if (exec_state == NULL) return Status("Invalid query handle");
+  if (UNLIKELY(exec_state == nullptr)) {
+    return Status(Substitute("Invalid query handle: $0", PrintId(query_id)));
+  }
+
   Status query_status;
   {
     lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/39987d9b/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 5313b7d..51a04f2 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -180,7 +180,9 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
 Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size,
     bool fetch_first, TFetchResultsResp* fetch_results) {
   shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
-  if (exec_state == NULL) return Status("Invalid query handle");
+  if (UNLIKELY(exec_state == nullptr)) {
+    return Status(Substitute("Invalid query handle: $0", PrintId(query_id)));
+  }
 
   // FetchResults doesn't have an associated session handle, so we presume that this
   // request should keep alive the same session that orignated the query.
@@ -632,9 +634,10 @@ void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
   VLOG_ROW << "GetOperationStatus(): query_id=" << PrintId(query_id);
 
   shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
-  if (exec_state.get() == nullptr) {
+  if (UNLIKELY(exec_state.get() == nullptr)) {
     // No handle was found
-    HS2_RETURN_ERROR(return_val, "Invalid query handle", SQLSTATE_GENERAL_ERROR);
+    HS2_RETURN_ERROR(return_val,
+      Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR);
   }
 
   ScopedSessionState session_handle(this);
@@ -667,9 +670,10 @@ void ImpalaServer::CancelOperation(TCancelOperationResp& return_val,
   VLOG_QUERY << "CancelOperation(): query_id=" << PrintId(query_id);
 
   shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
-  if (exec_state.get() == NULL) {
+  if (UNLIKELY(exec_state.get() == nullptr)) {
     // No handle was found
-    HS2_RETURN_ERROR(return_val, "Invalid query handle", SQLSTATE_GENERAL_ERROR);
+    HS2_RETURN_ERROR(return_val,
+      Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR);
   }
   ScopedSessionState session_handle(this);
   const TUniqueId session_id = exec_state->session_id();
@@ -688,9 +692,10 @@ void ImpalaServer::CloseOperation(TCloseOperationResp& return_val,
   VLOG_QUERY << "CloseOperation(): query_id=" << PrintId(query_id);
 
   shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
-  if (exec_state.get() == NULL) {
+  if (UNLIKELY(exec_state.get() == nullptr)) {
     // No handle was found
-    HS2_RETURN_ERROR(return_val, "Invalid query handle", SQLSTATE_GENERAL_ERROR);
+    HS2_RETURN_ERROR(return_val,
+      Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR);
   }
   ScopedSessionState session_handle(this);
   const TUniqueId session_id = exec_state->session_id();
@@ -715,18 +720,22 @@ void ImpalaServer::GetResultSetMetadata(TGetResultSetMetadataResp& return_val,
   // Look up the session ID (which takes session_state_map_lock_) before taking the query
   // exec state lock.
   TUniqueId session_id;
-  if (!GetSessionIdForQuery(query_id, &session_id)) {
-    HS2_RETURN_ERROR(return_val, "Invalid query handle", SQLSTATE_GENERAL_ERROR);
+  if (UNLIKELY(!GetSessionIdForQuery(query_id, &session_id))) {
+    // No handle was found
+    HS2_RETURN_ERROR(return_val,
+      Substitute("Unable to find session ID for query handle: $0", PrintId(query_id)),
+      SQLSTATE_GENERAL_ERROR);
   }
   ScopedSessionState session_handle(this);
   HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id),
       SQLSTATE_GENERAL_ERROR);
 
   shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
-  if (exec_state.get() == NULL) {
+  if (UNLIKELY(exec_state.get() == nullptr)) {
     VLOG_QUERY << "GetResultSetMetadata(): invalid query handle";
     // No handle was found
-    HS2_RETURN_ERROR(return_val, "Invalid query handle", SQLSTATE_GENERAL_ERROR);
+    HS2_RETURN_ERROR(return_val,
+      Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR);
   }
   {
     // make sure we release the lock on exec_state if we see any error
@@ -798,9 +807,10 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) {
       request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
 
   shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
-  if (exec_state.get() == NULL) {
+  if (UNLIKELY(exec_state.get() == nullptr)) {
     // No handle was found
-    HS2_RETURN_ERROR(return_val, "Invalid query handle", SQLSTATE_GENERAL_ERROR);
+    HS2_RETURN_ERROR(return_val,
+      Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR);
   }
 
   // GetLog doesn't have an associated session handle, so we presume that this request

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/39987d9b/be/src/util/debug-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/debug-util-test.cc b/be/src/util/debug-util-test.cc
index 002d628..636d9d4 100644
--- a/be/src/util/debug-util-test.cc
+++ b/be/src/util/debug-util-test.cc
@@ -20,12 +20,23 @@
 #include <iostream>
 
 #include "testutil/gtest-util.h"
+#include "util/benchmark.h"
 #include "util/debug-util.h"
 
 #include "common/names.h"
 
 namespace impala {
 
+TEST(DebugUtil, UniqueID) {
+  TUniqueId unique_id;
+  unique_id.hi = 0xfeedbeeff00d7777ULL;
+  unique_id.lo = 0x2020202020202020ULL;
+  std::string str("feedbeeff00d7777:2020202020202020");
+  EXPECT_EQ(str, PrintId(unique_id));
+  unique_id.lo = 0x20ULL;
+  EXPECT_EQ("feedbeeff00d7777:20", PrintId(unique_id));
+}
+
 string RecursionStack(int level) {
   if (level == 0) return GetStackTrace();
   return RecursionStack(level - 1);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/39987d9b/tests/comparison/leopard/report.py
----------------------------------------------------------------------
diff --git a/tests/comparison/leopard/report.py b/tests/comparison/leopard/report.py
index 68487c3..2fd4d77 100644
--- a/tests/comparison/leopard/report.py
+++ b/tests/comparison/leopard/report.py
@@ -55,7 +55,7 @@ class Report(object):
     ur'^Column \d+ in row \d+ does not match': 'mismatch',
     ur'^Could not connect': 'could_not_connect',
     ur'^IllegalStateException': 'IllegalStateException',
-    ur'^Invalid query handle': 'invalid_query_handle',
+    ur'^Invalid query handle: ': 'invalid_query_handle',
     ur'^Known issue:': 'known_issue',
     ur'^Operation is in ERROR_STATE': 'error_state',
     ur'^Query timed out after \d+ seconds': 'timeout',

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/39987d9b/tests/hs2/test_hs2.py
----------------------------------------------------------------------
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index 0ed9cd3..c183001 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -227,6 +227,23 @@ class TestHS2(HS2TestSuite):
            len(operation_handle.operationId.secret))
     assert err_msg in get_operation_status_resp.status.errorMessage
 
+  @needs_session()
+  def test_invalid_query_handle(self):
+    operation_handle = TCLIService.TOperationHandle()
+    operation_handle.operationId = TCLIService.THandleIdentifier()
+    operation_handle.operationId.guid = "\x01\x23\x45\x67\x89\xab\xcd\xef76543210"
+    operation_handle.operationId.secret = "PasswordIsPencil"
+    operation_handle.operationType = TCLIService.TOperationType.EXECUTE_STATEMENT
+    operation_handle.hasResultSet = False
+
+    get_operation_status_resp = self.get_operation_status(operation_handle)
+    TestHS2.check_response(get_operation_status_resp, \
+        TCLIService.TStatusCode.ERROR_STATUS)
+
+    print get_operation_status_resp.status.errorMessage
+    err_msg = "Invalid query handle: efcdab8967452301:3031323334353637"
+    assert err_msg in get_operation_status_resp.status.errorMessage
+
   @pytest.mark.execute_serially
   def test_socket_close_forces_session_close(self):
     """Test that closing the underlying socket forces the associated session to close.