You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/12/22 23:46:34 UTC

[01/15] incubator-impala git commit: IMPALA-4636: Correct Suse Linux distro string

Repository: incubator-impala
Updated Branches:
  refs/heads/hadoop-next fe33f181e -> 76ebe3cc6


IMPALA-4636: Correct Suse Linux distro string

The string should be suselinux12, not sles12.

I tested this by sourcing the config script on a SLES12 SP1 instance.
Before the change, KUDU_IS_SUPPORTED == false. With the change, the
value is set to true.

Change-Id: I28897eb4e0bbac77e1e542c3db4834a987348f7a
Reviewed-on: http://gerrit.cloudera.org:8080/5519
Reviewed-by: Jim Apple <jb...@apache.org>
Reviewed-by: Michael Brown <mi...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d6eb1b10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d6eb1b10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d6eb1b10

Branch: refs/heads/hadoop-next
Commit: d6eb1b107d584cd181468fa8096d7f2ad59dd708
Parents: 44ae9fc
Author: David Knupp <dk...@cloudera.com>
Authored: Wed Dec 14 20:01:21 2016 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Dec 15 21:51:00 2016 +0000

----------------------------------------------------------------------
 bin/impala-config.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6eb1b10/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index c68d413..a9dd90b 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -141,7 +141,7 @@ if [[ -z "${KUDU_IS_SUPPORTED-}" ]]; then
       # Remove spaces, trim minor versions, and convert to lowercase.
       DISTRO_VERSION="$(tr -d ' \n' <<< "$DISTRO_VERSION" | cut -d. -f1 | tr "A-Z" "a-z")"
       case "$DISTRO_VERSION" in
-        centos6 | centos7 | debian7 | debian8 | sles12 | ubuntu* )
+        centos6 | centos7 | debian7 | debian8 | suselinux12 | ubuntu* )
             KUDU_IS_SUPPORTED=true;;
       esac
     fi


[14/15] incubator-impala git commit: IMPALA-4649: addendum - avoid overwriting MAKE_ARGS

Posted by ta...@apache.org.
IMPALA-4649: addendum - avoid overwriting MAKE_ARGS

Re-tested to make sure that IMPALA_MAKE_FLAGS is effective

Change-Id: If57dc60e16bab591bce3646d43de8d45444719a7
Reviewed-on: http://gerrit.cloudera.org:8080/5558
Tested-by: Impala Public Jenkins
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5eef1bf8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5eef1bf8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5eef1bf8

Branch: refs/heads/hadoop-next
Commit: 5eef1bf82e08134cd85d916a208bf51450ac2b2a
Parents: b3636c9
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Dec 21 06:28:57 2016 -0800
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu Dec 22 23:41:27 2016 +0000

----------------------------------------------------------------------
 bin/make_impala.sh | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5eef1bf8/bin/make_impala.sh
----------------------------------------------------------------------
diff --git a/bin/make_impala.sh b/bin/make_impala.sh
index 97525c3..70d088f 100755
--- a/bin/make_impala.sh
+++ b/bin/make_impala.sh
@@ -170,7 +170,6 @@ if [ $CMAKE_ONLY -eq 1 ]; then
   exit 0
 fi
 
-MAKE_ARGS=-j${IMPALA_BUILD_THREADS:-4}
 if [ $BUILD_FE_ONLY -eq 1 ]; then
   ${MAKE_CMD} ${MAKE_ARGS} fe
 elif [ $BUILD_EVERYTHING -eq 1 ]; then


[05/15] incubator-impala git commit: IMPALA-4647: fix full data load with ninja

Posted by ta...@apache.org.
IMPALA-4647: fix full data load with ninja

This issues is that MAKE_CMD wasn't exported, so
testdata/bin/copy-udfs-udas.sh tried to use "make" despite Makefiles not
being generated.

Testing:
Was able to do a full data load locally after applying this fix.

Change-Id: Iba00d0ffbb6a93f26f4e2d1d311167d5e4dfa99f
Reviewed-on: http://gerrit.cloudera.org:8080/5476
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/26f0b461
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/26f0b461
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/26f0b461

Branch: refs/heads/hadoop-next
Commit: 26f0b4612c0d30e45a5850fdf576b4fc4ce86daf
Parents: c40958f
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Dec 12 10:18:11 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Dec 17 00:26:12 2016 +0000

----------------------------------------------------------------------
 buildall.sh | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/26f0b461/buildall.sh
----------------------------------------------------------------------
diff --git a/buildall.sh b/buildall.sh
index fc66acb..291f19e 100755
--- a/buildall.sh
+++ b/buildall.sh
@@ -59,7 +59,8 @@ CODE_COVERAGE=0
 BUILD_ASAN=0
 BUILD_FE_ONLY=0
 BUILD_TIDY=0
-MAKE_CMD=make
+# Export MAKE_CMD so it is visible in scripts that invoke make, e.g. copy-udfs-udas.sh
+export MAKE_CMD=make
 LZO_CMAKE_ARGS=
 
 # Defaults that can be picked up from the environment, but are overridable through the


[15/15] incubator-impala git commit: Merge remote-tracking branch 'origin/master' into hadoop-next

Posted by ta...@apache.org.
Merge remote-tracking branch 'origin/master' into hadoop-next

Change-Id: Ib37056d1f273f6efb5b87c9b48512f8b14ab497b


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/76ebe3cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/76ebe3cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/76ebe3cc

Branch: refs/heads/hadoop-next
Commit: 76ebe3cc636dffdfb3d592aa7893cf2b629d0d76
Parents: fe33f18 5eef1bf
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Dec 22 15:44:09 2016 -0800
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu Dec 22 15:44:58 2016 -0800

----------------------------------------------------------------------
 be/src/gutil/walltime.cc                        |   12 -
 be/src/gutil/walltime.h                         |   20 +-
 be/src/util/parquet-reader.cc                   |   48 +-
 be/src/util/stopwatch.h                         |    2 +-
 be/src/util/time.h                              |    6 +-
 be/src/util/webserver.cc                        |    4 +-
 bin/impala-config.sh                            |    2 +-
 bin/make_impala.sh                              |    1 -
 buildall.sh                                     |    3 +-
 fe/src/main/cup/sql-parser.cup                  |   65 +-
 .../org/apache/impala/analysis/InsertStmt.java  |   90 +-
 .../apache/impala/analysis/PartitionSet.java    |    8 +-
 .../org/apache/impala/analysis/PlanHint.java    |   75 ++
 .../org/apache/impala/analysis/SelectList.java  |   24 +-
 .../org/apache/impala/analysis/TableRef.java    |   51 +-
 .../org/apache/impala/analysis/ToSqlUtils.java  |    5 +-
 .../org/apache/impala/catalog/HdfsTable.java    |    4 +-
 .../org/apache/impala/planner/KuduScanNode.java |   11 +-
 .../java/org/apache/impala/planner/Planner.java |   29 +-
 fe/src/main/jflex/sql-scanner.flex              |   51 +-
 .../apache/impala/analysis/AnalyzeDDLTest.java  |    2 +-
 .../impala/analysis/AnalyzeStmtsTest.java       |   51 +-
 .../org/apache/impala/analysis/ParserTest.java  |   84 +-
 .../org/apache/impala/analysis/ToSqlTest.java   |    4 +-
 infra/python/deps/requirements.txt              |    2 +-
 testdata/bin/check-hbase-nodes.py               |   32 +-
 .../queries/PlannerTest/insert.test             |   73 ++
 .../queries/PlannerTest/kudu.test               |   12 +
 .../queries/QueryTest/insert.test               |   26 +
 .../partition-ddl-predicates-all-fs.test        |   20 +
 tests/stress/concurrent_select.py               | 1005 +++++++++++++-----
 tests/util/parse_util.py                        |    2 +-
 32 files changed, 1328 insertions(+), 496 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/76ebe3cc/bin/impala-config.sh
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/76ebe3cc/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/76ebe3cc/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
----------------------------------------------------------------------


[11/15] incubator-impala git commit: IMPALA-4702: Fix command line help for webserver_private_key_file

Posted by ta...@apache.org.
IMPALA-4702: Fix command line help for webserver_private_key_file

The command line help for 'webserver_private_key_file' should refer to
'webserver_certificate_file' instead of 'ssl_server_certificate'.

Change-Id: Ie2820951b381240d5dab7c98e07e92332234462a
Reviewed-on: http://gerrit.cloudera.org:8080/5552
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ba8c135e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ba8c135e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ba8c135e

Branch: refs/heads/hadoop-next
Commit: ba8c135ef8a107eca6744ec063e4366d55374e07
Parents: 73146a0
Author: Lars Volker <lv...@cloudera.com>
Authored: Tue Dec 20 14:21:32 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Dec 21 04:08:38 2016 +0000

----------------------------------------------------------------------
 be/src/util/webserver.cc | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba8c135e/be/src/util/webserver.cc
----------------------------------------------------------------------
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 7cdd026..d971852 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -80,8 +80,8 @@ DEFINE_string(webserver_certificate_file, "",
     "The location of the debug webserver's SSL certificate file, in .pem format. If "
     "empty, webserver SSL support is not enabled");
 DEFINE_string(webserver_private_key_file, "", "The full path to the private key used as a"
-    " counterpart to the public key contained in --ssl_server_certificate. If "
-    "--ssl_server_certificate is set, this option must be set as well.");
+    " counterpart to the public key contained in --webserver_certificate_file. If "
+    "--webserver_certificate_file is set, this option must be set as well.");
 DEFINE_string(webserver_private_key_password_cmd, "", "A Unix command whose output "
     "returns the password used to decrypt the Webserver's certificate private key file "
     "specified in --webserver_private_key_file. If the .PEM key file is not "


[13/15] incubator-impala git commit: IMPALA-4033: Treat string-partition key values as case sensitive.

Posted by ta...@apache.org.
IMPALA-4033: Treat string-partition key values as case sensitive.

This commit makes ADD PARTITION operations treat string partition-key
values as case sensitive in consistent with other related partition DDL
operations.

Change-Id: I6fbe67d99df8a50a16a18456fde85d03d622c7a1
Reviewed-on: http://gerrit.cloudera.org:8080/5535
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b3636c97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b3636c97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b3636c97

Branch: refs/heads/hadoop-next
Commit: b3636c97d4b872e1640955974409c57459d655e0
Parents: 226a2e6
Author: Amos Bird <am...@gmail.com>
Authored: Fri Dec 16 14:23:22 2016 +0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Dec 22 10:45:39 2016 +0000

----------------------------------------------------------------------
 .../apache/impala/analysis/PartitionSet.java    |  8 ++------
 .../org/apache/impala/catalog/HdfsTable.java    |  4 ++--
 .../apache/impala/analysis/AnalyzeDDLTest.java  |  2 +-
 .../partition-ddl-predicates-all-fs.test        | 20 ++++++++++++++++++++
 4 files changed, 25 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b3636c97/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java b/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
index 3ba2ad2..d5f0e70 100644
--- a/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
+++ b/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
@@ -137,12 +137,11 @@ public class PartitionSet extends PartitionSpecBase {
     }
   }
 
-  // Transform <COL> = NULL into IsNull expr; <String COL> = '' into IsNull expr and
-  // <String COL> = 'String Value' into lower case.
+  // Transform <COL> = NULL into IsNull expr; <String COL> = '' into IsNull expr.
   // The reason is that COL = NULL is allowed for selecting the NULL
   // partition, but a COL = NULL predicate can never be true, so we
   // need to transform such predicates before feeding them into the
-  // partition pruner. Same logic goes to String transformation.
+  // partition pruner.
   private List<Expr> transformPartitionConjuncts(Analyzer analyzer, List<Expr> conjuncts)
       throws AnalysisException {
     List<Expr> transformedConjuncts = Lists.newArrayList();
@@ -162,9 +161,6 @@ public class PartitionSet extends PartitionSpecBase {
           } else if (leftChild != null && stringChild != null) {
             if (stringChild.getStringValue().isEmpty()) {
               result = new IsNullPredicate(leftChild, false);
-            } else {
-              stringChild = new StringLiteral(stringChild.getStringValue().toLowerCase());
-              result.setChild(1, stringChild);
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b3636c97/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index c2c569f..904c90f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -570,7 +570,7 @@ public class HdfsTable extends Table {
     for (FieldSchema fs: getMetaStoreTable().getPartitionKeys()) {
       for (TPartitionKeyValue kv: partitionSpec) {
         if (fs.getName().toLowerCase().equals(kv.getName().toLowerCase())) {
-          targetValues.add(kv.getValue().toLowerCase());
+          targetValues.add(kv.getValue());
           // Same key was specified twice
           if (!keys.add(kv.getName().toLowerCase())) {
             return null;
@@ -604,7 +604,7 @@ public class HdfsTable extends Table {
           // backwards compatibility with Hive, and is clearly broken.
           if (value.isEmpty()) value = getNullPartitionKeyValue();
         }
-        if (!targetValues.get(i).equals(value.toLowerCase())) {
+        if (!targetValues.get(i).equals(value)) {
           matchFound = false;
           break;
         }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b3636c97/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 935edc5..5a08f98 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -424,7 +424,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalyzesOk("alter table functional.stringpartitionkey PARTITION " +
                "(string_col='partition1') set fileformat parquet");
     AnalyzesOk("alter table functional.stringpartitionkey PARTITION " +
-               "(string_col='PaRtiTion1') set location '/a/b/c'");
+               "(string_col='partition1') set location '/a/b/c'");
     AnalyzesOk("alter table functional.alltypes PARTITION (year=2010, month=11) " +
                "set tblproperties('a'='1')");
     AnalyzesOk("alter table functional.alltypes PARTITION (year<=2010, month=11) " +

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b3636c97/testdata/workloads/functional-query/queries/QueryTest/partition-ddl-predicates-all-fs.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/partition-ddl-predicates-all-fs.test b/testdata/workloads/functional-query/queries/QueryTest/partition-ddl-predicates-all-fs.test
index 36104c3..1aa5c51 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/partition-ddl-predicates-all-fs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/partition-ddl-predicates-all-fs.test
@@ -133,3 +133,23 @@ show partitions p1
 ---- TYPES
 STRING, STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
 ====
+---- QUERY
+# Tests case-sensitivity of string-typed partition columns.
+alter table p1 add partition (j=2,k="D");
+alter table p1 add partition (j=2,k="E");
+alter table p1 add partition (j=2,k="F");
+====
+---- QUERY
+show partitions p1
+---- RESULTS
+'NULL','g',-1,0,regex:.+,regex:.+,regex:.+,regex:.+,regex:.+,regex:.*/test-warehouse/.+/p1/j=__HIVE_DEFAULT_PARTITION__/k=g
+'2','D',-1,0,regex:.+,regex:.+,regex:.+,'TEXT',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=D
+'2','E',-1,0,regex:.+,regex:.+,regex:.+,'TEXT',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=E
+'2','F',-1,0,regex:.+,regex:.+,regex:.+,'TEXT',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=F
+'2','d',-1,0,regex:.+,regex:.+,regex:.+,'PARQUET',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=d
+'2','e',-1,0,regex:.+,regex:.+,regex:.+,'PARQUET',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=e
+'2','f',-1,0,regex:.+,regex:.+,regex:.+,'PARQUET',regex:.+,regex:.*/test-warehouse/.+/p1/j=2/k=f
+'Total','',0,0,regex:.+,regex:.+,'','','',''
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====


[06/15] incubator-impala git commit: IMPALA-4686: Fix schema output for INT96 columns in parquet-reader tool

Posted by ta...@apache.org.
IMPALA-4686: Fix schema output for INT96 columns in parquet-reader tool

Instead of manually mapping the types we can just look them up in the
thrift map.

Testing: I tested this change manually by compiling the tool and running
it on a parquet file that had a INT96 column. Here is the relevant
output:

Schema:
id  INT32
bool_col  BOOLEAN
tinyint_col  INT32
smallint_col  INT32
int_col  INT32
bigint_col  INT64
float_col  FLOAT
double_col  DOUBLE
date_string_col  BYTE_ARRAY
string_col  BYTE_ARRAY
timestamp_col  INT96
year  INT32
month  INT32

We only use this tool in one test currently, which calls it to make sure
that a parquet-file can be parsed by it. This implies that we have tests
that it compiles, but we don't make use of its output currently.

Change-Id: I5d92f5556554c71461a93fe0d598bb69f91cce51
Reviewed-on: http://gerrit.cloudera.org:8080/5536
Reviewed-by: Lars Volker <lv...@cloudera.com>
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/68131b31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/68131b31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/68131b31

Branch: refs/heads/hadoop-next
Commit: 68131b311577e6e16bfe5ef08b17a2312cc65296
Parents: 26f0b46
Author: Lars Volker <lv...@cloudera.com>
Authored: Fri Dec 16 11:54:40 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Dec 17 00:32:46 2016 +0000

----------------------------------------------------------------------
 be/src/util/parquet-reader.cc | 19 +++----------------
 1 file changed, 3 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/68131b31/be/src/util/parquet-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index d48ef16..afbb33d 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -86,22 +86,9 @@ bool DeserializeThriftMsg(uint8_t* buf, uint32_t* len, bool compact,
 }
 
 string TypeMapping(Type::type t) {
-  switch (t) {
-    case Type::BOOLEAN:
-      return "BOOLEAN";
-    case Type::INT32:
-      return "INT32";
-    case Type::INT64:
-      return "INT64";
-    case Type::FLOAT:
-      return "FLOAT";
-    case Type::DOUBLE:
-      return "DOUBLE";
-    case Type::BYTE_ARRAY:
-      return "BYTE_ARRAY";
-    default:
-      return "UNKNOWN";
-  }
+  auto it = _Type_VALUES_TO_NAMES.find(t);
+  if (it != _Type_VALUES_TO_NAMES.end()) return it->second;
+  return "UNKNOWN";
 }
 
 void AppendSchema(const vector<SchemaElement>& schema, int level,


[03/15] incubator-impala git commit: IMPALA-4662: Fix NULL literal handling in Kudu IN list predicates

Posted by ta...@apache.org.
IMPALA-4662: Fix NULL literal handling in Kudu IN list predicates

The KuduScanNode attempts to push IN list predicates to the
Kudu scan, but NULL literals cannot be pushed. The code in
KuduScanNode needed to check if the Literals in the
InPredicate is a NullLiteral, in which case the entire IN
list should not be pushed to Kudu.

The same handling is already in place for binary predicate
pushdown.

Change-Id: Iaf2c10a326373ad80aef51a85cec64071daefa7b
Reviewed-on: http://gerrit.cloudera.org:8080/5505
Reviewed-by: Michael Brown <mi...@cloudera.com>
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c2faf4a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c2faf4a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c2faf4a8

Branch: refs/heads/hadoop-next
Commit: c2faf4a8a13ba89f2f4f2bc4fbd878cffda53d1f
Parents: 54194af
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Wed Dec 14 15:58:59 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Dec 15 23:00:24 2016 +0000

----------------------------------------------------------------------
 .../java/org/apache/impala/planner/KuduScanNode.java    | 11 ++++++++---
 .../functional-planner/queries/PlannerTest/kudu.test    | 12 ++++++++++++
 2 files changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c2faf4a8/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 580f5e0..cdb620c 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -309,7 +309,7 @@ public class KuduScanNode extends ScanNode {
     SlotRef ref = (SlotRef) predicate.getChild(0);
     LiteralExpr literal = (LiteralExpr) predicate.getChild(1);
 
-    // Cannot push prediates with null literal values (KUDU-1595).
+    // Cannot push predicates with null literal values (KUDU-1595).
     if (literal instanceof NullLiteral) return false;
 
     String colName = ref.getDesc().getColumn().getName();
@@ -379,8 +379,13 @@ public class KuduScanNode extends ScanNode {
     List<Object> values = Lists.newArrayList();
     for (int i = 1; i < predicate.getChildren().size(); ++i) {
       if (!(predicate.getChild(i).isLiteral())) return false;
-      Object value = getKuduInListValue((LiteralExpr) predicate.getChild(i));
-      Preconditions.checkNotNull(value == null);
+      LiteralExpr literal = (LiteralExpr) predicate.getChild(i);
+
+      // Cannot push predicates with null literal values (KUDU-1595).
+      if (literal instanceof NullLiteral) return false;
+
+      Object value = getKuduInListValue(literal);
+      Preconditions.checkNotNull(value);
       values.add(value);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c2faf4a8/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 776882d..9f2270f 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -314,3 +314,15 @@ PLAN-ROOT SINK
    predicates: CAST(a.id AS STRING) > '123'
    kudu predicates: a.id > 10
 ====
+# IMPALA-4662: Kudu analysis failure for NULL literal in IN list
+# NULL literal in values list results in applying predicate at scan node
+select id from functional_kudu.alltypestiny where
+id in (1, null) and string_col in (null) and bool_col in (null) and double_col in (null)
+and float_col in (null) and tinyint_col in (null) and smallint_col in (null) and
+bigint_col in (null)
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN KUDU [functional_kudu.alltypestiny]
+   predicates: id IN (1, NULL), bigint_col IN (NULL), bool_col IN (NULL), double_col IN (NULL), float_col IN (NULL), smallint_col IN (NULL), string_col IN (NULL), tinyint_col IN (NULL)
+====


[12/15] incubator-impala git commit: IMPALA-4684: Handle Zookeeper ConnentionLoss exceptions

Posted by ta...@apache.org.
IMPALA-4684: Handle Zookeeper ConnentionLoss exceptions

This is the second patch to address IMPALA-4684. The first patch exposed
a transient Zookeeper connection error on RHEL7. This patch introduces a
retry (up to 3 times), and somewhat better logging.

Tested by running tests against an RHEL7 instance and confirming that
all HBase nodes start up.

Change-Id: I44b4eec342addcfe489f94c332bbe14225c9968c
Reviewed-on: http://gerrit.cloudera.org:8080/5554
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/226a2e63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/226a2e63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/226a2e63

Branch: refs/heads/hadoop-next
Commit: 226a2e63321e9bcf4ba906512fc40e35b98db252
Parents: ba8c135
Author: David Knupp <dk...@cloudera.com>
Authored: Tue Dec 20 15:06:15 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Dec 22 01:18:56 2016 +0000

----------------------------------------------------------------------
 testdata/bin/check-hbase-nodes.py | 34 ++++++++++++++++++++++++++--------
 1 file changed, 26 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/226a2e63/testdata/bin/check-hbase-nodes.py
----------------------------------------------------------------------
diff --git a/testdata/bin/check-hbase-nodes.py b/testdata/bin/check-hbase-nodes.py
index 28ac0c1..ffe7a7c 100755
--- a/testdata/bin/check-hbase-nodes.py
+++ b/testdata/bin/check-hbase-nodes.py
@@ -30,7 +30,7 @@ import time
 
 from contextlib import closing
 from kazoo.client import KazooClient
-from kazoo.exceptions import NoNodeError
+from kazoo.exceptions import NoNodeError, ConnectionLoss
 from kazoo.handlers.threading import KazooTimeoutError
 
 LOGGER = logging.getLogger('hbase_check')
@@ -43,6 +43,7 @@ HDFS_HOST = '127.0.0.1:5070'
 ZK_HOSTS = '127.0.0.1:2181'
 HBASE_NODES = ['/hbase/master', '/hbase/rs']
 ADMIN_USER = 'admin'
+MAX_ZOOKEEPER_CONNECTION_RETRIES = 3
 
 
 def parse_args():
@@ -128,14 +129,31 @@ def check_znodes_list_for_errors(nodes, zookeeper_hosts, timeout):
         timeout_seconds: Number of seconds to attempt to get node
 
     Returns:
-        0 success, or else the number of unresponsive nodes
+        0 success, or else the number of errors
     """
-    with closing(connect_to_zookeeper(zookeeper_hosts, timeout)) as zk_client:
-        try:
-            errors = sum([check_znode(node, zk_client, timeout) for node in nodes])
-        finally:
-            zk_client.stop()
-    return errors
+    connection_retries = 0
+
+    while True:
+        with closing(connect_to_zookeeper(zookeeper_hosts, timeout)) as zk_client:
+            try:
+                return sum([check_znode(node, zk_client, timeout) for node in nodes])
+            except ConnectionLoss as e:
+                connection_retries += 1
+                if connection_retries > MAX_ZOOKEEPER_CONNECTION_RETRIES:
+                    LOGGER.error("Max connection retries exceeded: {0}".format(str(e)))
+                    raise
+                else:
+                    err_msg = ("Zookeeper connection loss: retrying connection "
+                               "({0} of {1} attempts)")
+                    LOGGER.warn(err_msg.format(connection_retries,
+                                               MAX_ZOOKEEPER_CONNECTION_RETRIES))
+                    time.sleep(1)
+            except Exception as e:
+                LOGGER.error("Unexpected error checking HBase node: {0}".format(str(e)))
+                raise
+            finally:
+                LOGGER.info("Stopping Zookeeper client")
+                zk_client.stop()
 
 
 def is_hdfs_running(host, admin_user):


[08/15] incubator-impala git commit: IMPALA-4467: Add support for DML statements in stress test

Posted by ta...@apache.org.
IMPALA-4467: Add support for DML statements in stress test

- Add support for insert, upsert, update and and delete statements.
- Add support for compute stats with mt_dop query options.
- Update impyla version in order to be able to have access to query
  error text for DML queries.
- Made flake8 fixes. flake8 on this file is clean.

For every Kudu table in the databases, we make a copy and add a
'_original' suffix to the table name. The DML queries will only make
modifications to the non original table, the original table will never
be modified. The orignal tables could be used to bring the non-original
table to the inital state. Two flags were added for doing this:
--reset-databases-before-binary-search and
--reset-databases-after-binary-search.

The DML queries are generated based on the mod values passed in with the
following flag: --dml-mod-values 11 13 17. For each mod value 4 DML
queries are generated. The DML operations will touch table rows where
primary_key % mod_value = 0. So, the larger the mod value, the more rows
would be affected. The DML queries are generated in such a way that the
data for the insert, upsert, and update queries is taken from the table
with the _original suffix. The stress test generates DML queries for
only kudu databases. For example, --tpch-kudu-db=tpch_100_kudu
--tpch-db=tpch_100 --generate-dml-queries would only generate queries
for the tpch_100_kudu database.

Here's an example of a full call with the new options that runs the
stress test on the local mini cluster:
./concurrent_select.py \
    --tpch-kudu-db=tpch_kudu \
    --generate-dml-queries \
    --dml-mod-values 11 13 17 \
    --generate-compute-stats-queries \
    --select-probability=0.5 \
    --mem-limit-padding-pct=25 \
    --mem-limit-padding-abs=50 \
    --reset-databases-before-binary-search \
    --reset-databases-after-binary-search

Change-Id: Ia2aafdc6851cc0e1677a3c668d3350e47c4bfe40
Reviewed-on: http://gerrit.cloudera.org:8080/5093
Reviewed-by: Taras Bobrovytsky <tb...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/2159beee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/2159beee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/2159beee

Branch: refs/heads/hadoop-next
Commit: 2159beee89d463be9b69886c95ad73271db49280
Parents: ce9b332
Author: Taras Bobrovytsky <tb...@cloudera.com>
Authored: Mon Nov 14 17:07:06 2016 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Dec 20 01:33:01 2016 +0000

----------------------------------------------------------------------
 infra/python/deps/requirements.txt |    2 +-
 tests/stress/concurrent_select.py  | 1005 ++++++++++++++++++++++---------
 tests/util/parse_util.py           |    2 +-
 3 files changed, 725 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2159beee/infra/python/deps/requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index c3b8648..9831d21 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -46,7 +46,7 @@ Flask == 0.10.1
 hdfs == 2.0.2
   docopt == 0.6.2
   execnet == 1.4.0
-impyla == 0.11.2
+impyla == 0.14.0
   bitarray == 0.8.1
   sasl == 0.1.3
   six == 1.9.0

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2159beee/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index fb24f65..690c999 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -48,7 +48,8 @@
 #  7) If a query errored, verify that memory was overcommitted during execution and the
 #     error is a mem limit exceeded error. There is no other reason a query should error
 #     and any such error will cause the stress test to stop.
-#  8) Verify the result set hash of successful queries.
+#  8) Verify the result set hash of successful queries if there are no DML queries in the
+#     current run.
 
 from __future__ import print_function
 
@@ -74,6 +75,7 @@ from time import sleep, time
 
 import tests.util.test_file_parser as test_file_parser
 from tests.comparison.cluster import Timeout
+from tests.comparison.db_types import Int, TinyInt, SmallInt, BigInt
 from tests.comparison.model_translator import SqlWriter
 from tests.comparison.query_generator import QueryGenerator
 from tests.comparison.query_profile import DefaultProfile
@@ -91,7 +93,7 @@ MEM_LIMIT_EQ_THRESHOLD_MB = 50
 MEM_ESTIMATE_PATTERN = re.compile(r"Estimated.*Memory=(\d+.?\d*)(T|G|M|K)?B")
 
 # The version of the file format containing the collected query runtime info.
-RUNTIME_INFO_FILE_VERSION = 2
+RUNTIME_INFO_FILE_VERSION = 3
 
 
 def create_and_start_daemon_thread(fn, name):
@@ -112,7 +114,8 @@ def print_stacks(*_):
   thread_names = dict([(t.ident, t.name) for t in threading.enumerate()])
   stacks = list()
   for thread_id, stack in sys._current_frames().items():
-    stacks.append("\n# Thread: %s(%d)"
+    stacks.append(
+        "\n# Thread: %s(%d)"
         % (thread_names.get(thread_id, "No name"), thread_id))
     for filename, lineno, name, line in traceback.extract_stack(stack):
       stacks.append('File: "%s", line %d, in %s' % (filename, lineno, name))
@@ -128,8 +131,8 @@ signal.signal(signal.SIGUSR1, print_stacks)
 
 def print_crash_info_if_exists(impala, start_time):
   """If any impalads are found not running, they will assumed to have crashed and an
-     error message will be printed to stderr for each stopped impalad. Returns a value
-     that evaluates to True if any impalads are stopped.
+  error message will be printed to stderr for each stopped impalad. Returns a value
+  that evaluates to True if any impalads are stopped.
   """
   max_attempts = 5
   for remaining_attempts in xrange(max_attempts - 1, -1, -1):
@@ -137,11 +140,12 @@ def print_crash_info_if_exists(impala, start_time):
       crashed_impalads = impala.find_crashed_impalads(start_time)
       break
     except Timeout as e:
-      LOG.info("Timeout checking if impalads crashed: %s." % e +
-          (" Will retry." if remaining_attempts else ""))
+      LOG.info(
+          "Timeout checking if impalads crashed: %s."
+          % e + (" Will retry." if remaining_attempts else ""))
   else:
-    LOG.error("Aborting after %s failed attempts to check if impalads crashed",
-        max_attempts)
+    LOG.error(
+        "Aborting after %s failed attempts to check if impalads crashed", max_attempts)
     raise e
   for message in crashed_impalads.itervalues():
     print(message, file=sys.stderr)
@@ -164,20 +168,20 @@ class QueryReport(object):
 
 class MemBroker(object):
   """Provides memory usage coordination for clients running in different processes.
-     The broker fulfills reservation requests by blocking as needed so total memory
-     used by clients never exceeds the total available memory (including an
-     'overcommitable' amount).
+  The broker fulfills reservation requests by blocking as needed so total memory
+  used by clients never exceeds the total available memory (including an
+  'overcommitable' amount).
 
-     The lock built in to _available is also used to protect access to other members.
+  The lock built in to _available is also used to protect access to other members.
 
-     The state stored in this class is actually an encapsulation of part of the state
-     of the StressRunner class below. The state here is separated for clarity.
+  The state stored in this class is actually an encapsulation of part of the state
+  of the StressRunner class below. The state here is separated for clarity.
   """
 
   def __init__(self, real_mem_mb, overcommitable_mem_mb):
     """'real_mem_mb' memory should be the amount of memory that each impalad is able
-       to use. 'overcommitable_mem_mb' is the amount of memory that will be dispensed
-       over the 'real' amount.
+    to use. 'overcommitable_mem_mb' is the amount of memory that will be dispensed
+    over the 'real' amount.
     """
     self._total_mem_mb = real_mem_mb + overcommitable_mem_mb
     self._available = Value("i", self._total_mem_mb)
@@ -210,16 +214,16 @@ class MemBroker(object):
   @contextmanager
   def reserve_mem_mb(self, mem_mb):
     """Blocks until the requested amount of memory is available and taken for the caller.
-       This function should be used in a 'with' block. The taken memory will
-       automatically be released when the 'with' context exits. A numeric id is returned
-       so clients can compare against 'last_overcommitted_reservation_id' to see if
-       memory was overcommitted since the reservation was obtained.
-
-       with broker.reserve_mem_mb(100) as reservation_id:
-         # Run query using 100 MB of memory
-         if <query failed>:
-           # Immediately check broker.was_overcommitted(reservation_id) to see if
-           # memory was overcommitted.
+    This function should be used in a 'with' block. The taken memory will
+    automatically be released when the 'with' context exits. A numeric id is returned
+    so clients can compare against 'last_overcommitted_reservation_id' to see if
+    memory was overcommitted since the reservation was obtained.
+
+    with broker.reserve_mem_mb(100) as reservation_id:
+      # Run query using 100 MB of memory
+      if <query failed>:
+        # Immediately check broker.was_overcommitted(reservation_id) to see if
+        # memory was overcommitted.
     """
     reservation_id = self._wait_until_reserved(mem_mb)
     try:
@@ -232,8 +236,9 @@ class MemBroker(object):
       with self._available.get_lock():
         if req <= self._available.value:
           self._available.value -= req
-          LOG.debug("Reserved %s MB; %s MB available; %s MB overcommitted", req,
-              self._available.value, self.overcommitted_mem_mb)
+          LOG.debug(
+              "Reserved %s MB; %s MB available; %s MB overcommitted",
+              req, self._available.value, self.overcommitted_mem_mb)
           reservation_id = self._next_reservation_id.value
           increment(self._next_reservation_id)
           if self.overcommitted_mem_mb > 0:
@@ -244,23 +249,24 @@ class MemBroker(object):
   def _release(self, req):
     with self._available.get_lock():
       self._available.value += req
-      LOG.debug("Released %s MB; %s MB available; %s MB overcommitted", req,
-          self._available.value, self.overcommitted_mem_mb)
+      LOG.debug(
+          "Released %s MB; %s MB available; %s MB overcommitted",
+          req, self._available.value, self.overcommitted_mem_mb)
 
   def was_overcommitted(self, reservation_id):
     """Returns True if memory was overcommitted since the given reservation was made.
-       For an accurate return value, this should be called just after the query ends
-       or while the query is still running.
+    For an accurate return value, this should be called just after the query ends
+    or while the query is still running.
     """
     return reservation_id <= self._last_overcommitted_reservation_id.value
 
 
 class StressRunner(object):
   """This class contains functionality related to producing/consuming queries for the
-     purpose of stress testing Impala.
+  purpose of stress testing Impala.
 
-     Queries will be executed in separate processes since python threading is limited
-     to the use of a single CPU.
+  Queries will be executed in separate processes since python threading is limited
+  to the use of a single CPU.
   """
 
   # This is the point at which the work queue will block because it is full.
@@ -270,6 +276,8 @@ class StressRunner(object):
     self.use_kerberos = False
     self.common_query_options = {}
     self._mem_broker = None
+    self._verify_results = True
+    self._select_probability = None
 
     # Synchronized blocking work queue for producer/consumers.
     self._query_queue = Queue(self.WORK_QUEUE_CAPACITY)
@@ -306,7 +314,8 @@ class StressRunner(object):
     self._num_successive_errors = Value("i", 0)
     self.result_hash_log_dir = gettempdir()
 
-    self._status_headers = [" Done", "Running", "Mem Lmt Ex", "Time Out", "Cancel",
+    self._status_headers = [
+        " Done", "Running", "Mem Lmt Ex", "Time Out", "Cancel",
         "Err", "Next Qry Mem Lmt", "Tot Qry Mem Lmt", "Tracked Mem", "RSS Mem"]
 
     self._num_queries_to_run = None
@@ -315,25 +324,31 @@ class StressRunner(object):
     self._query_consumer_thread = None
     self._mem_polling_thread = None
 
-  def run_queries(self, queries, impala, num_queries_to_run, mem_overcommit_pct,
-      should_print_status):
+  def run_queries(
+      self, queries, impala, num_queries_to_run, mem_overcommit_pct, should_print_status,
+      verify_results, select_probability
+  ):
     """Runs queries randomly chosen from 'queries' and stops after 'num_queries_to_run'
-       queries have completed.
-
-       Before a query is run, a mem limit will be chosen. 'spill_probability' determines
-       the likelihood of choosing a mem limit that will cause spilling. To induce
-       spilling, a value is randomly chosen below the min memory needed to avoid spilling
-       but above the min memory needed with spilling. So the min/max query memory
-       requirements must be determined before calling this method.
-
-       If 'mem_overcommit_pct' is zero, an exception will be raised if any queries
-       fail for any reason other than cancellation (controlled by the 'cancel_probability'
-       property), since each query should have enough memory to run successfully. If
-       non-zero, failures due to insufficient memory will be ignored if memory was
-       overcommitted at any time during execution.
-
-       If a query completes without error, the result will be verified. An error
-       will be raised upon a result mismatch.
+    queries have completed. 'select_probability' should be float between 0 and 1, it
+    determines the likelihood of choosing a select query (as opposed to a DML query,
+    for example).
+
+    Before a query is run, a mem limit will be chosen. 'spill_probability' determines
+    the likelihood of choosing a mem limit that will cause spilling. To induce
+    spilling, a value is randomly chosen below the min memory needed to avoid spilling
+    but above the min memory needed with spilling. So the min/max query memory
+    requirements must be determined before calling this method.
+
+    If 'mem_overcommit_pct' is zero, an exception will be raised if any queries
+    fail for any reason other than cancellation (controlled by the 'cancel_probability'
+    property), since each query should have enough memory to run successfully. If
+    non-zero, failures due to insufficient memory will be ignored if memory was
+    overcommitted at any time during execution.
+
+    If a query completes without error, the result will be verified if 'verify_results'
+    is True. An error will be raised upon a result mismatch. 'verify_results' should be
+    false for the case where the expected results are not known in advance, if we are
+    running DML queries, for example.
     """
     # TODO: The state from a previous run should be cleared out. This isn't really a
     #       problem now because the one caller (main()) never calls a second time.
@@ -346,9 +361,13 @@ class StressRunner(object):
     # If there is a crash, start looking for errors starting from this time.
     start_time = datetime.now()
 
-    self._mem_broker = MemBroker(impala.min_impalad_mem_mb,
+    self._mem_broker = MemBroker(
+        impala.min_impalad_mem_mb,
         int(impala.min_impalad_mem_mb * mem_overcommit_pct / 100))
 
+    self._verify_results = verify_results
+    self._select_probability = select_probability
+
     # Print the status to show the state before starting.
     if should_print_status:
       self._print_status_header()
@@ -363,9 +382,11 @@ class StressRunner(object):
 
     # Wait for everything to finish.
     sleep_secs = 0.1
-    while self._query_producer_thread.is_alive() \
-        or self._query_consumer_thread.is_alive() \
-        or self._query_runners:
+    while (
+        self._query_producer_thread.is_alive() or
+        self._query_consumer_thread.is_alive() or
+        self._query_runners
+    ):
       if self._query_producer_thread.error or self._query_consumer_thread.error:
         # This is bad enough to abort early. A failure here probably means there's a
         # bug in this script. The mem poller could be checked for an error too. It is
@@ -382,9 +403,12 @@ class StressRunner(object):
                 sys.exit(runner.exitcode)
               LOG.info("No crashes detected")
               checked_for_crashes = True
-            if self._num_successive_errors.value \
-                >= self.num_successive_errors_needed_to_abort:
-              print("Aborting due to %s successive errors encountered"
+            if (
+                self._num_successive_errors.value >=
+                self.num_successive_errors_needed_to_abort
+            ):
+              print(
+                  "Aborting due to %s successive errors encountered"
                   % self._num_successive_errors.value, file=sys.stderr)
               sys.exit(1)
           del self._query_runners[idx]
@@ -392,9 +416,11 @@ class StressRunner(object):
       if should_print_status:
         last_report_secs += sleep_secs
         if last_report_secs > 5:
-          if not self._query_producer_thread.is_alive() \
-              or not self._query_consumer_thread.is_alive() \
-              or not self._query_runners:
+          if (
+              not self._query_producer_thread.is_alive() or
+              not self._query_consumer_thread.is_alive() or
+              not self._query_runners
+          ):
             LOG.debug("Producer is alive: %s" % self._query_producer_thread.is_alive())
             LOG.debug("Consumer is alive: %s" % self._query_consumer_thread.is_alive())
             LOG.debug("Queue size: %s" % self._query_queue.qsize())
@@ -412,15 +438,32 @@ class StressRunner(object):
 
   def _start_producing_queries(self, queries):
     def enqueue_queries():
+      # Generate a dict(query type -> list of queries).
+      queries_by_type = {}
+      for query in queries:
+        if query.query_type not in queries_by_type:
+          queries_by_type[query.query_type] = []
+        queries_by_type[query.query_type].append(query)
       try:
         for _ in xrange(self._num_queries_to_run):
-          self._query_queue.put(choice(queries))
+          # First randomly determine a query type, then choose a random query of that
+          # type.
+          if (
+              QueryType.SELECT in queries_by_type and
+              (len(queries_by_type.keys()) == 1 or random() < self._select_probability)
+          ):
+            result = choice(queries_by_type[QueryType.SELECT])
+          else:
+            query_type = choice([
+                key for key in queries_by_type if key != QueryType.SELECT])
+            result = choice(queries_by_type[query_type])
+          self._query_queue.put(result)
       except Exception as e:
         LOG.error("Error producing queries: %s", e)
         current_thread().error = e
         raise e
-    self._query_producer_thread = create_and_start_daemon_thread(enqueue_queries,
-        "Query Producer")
+    self._query_producer_thread = create_and_start_daemon_thread(
+        enqueue_queries, "Query Producer")
 
   def _start_consuming_queries(self, impala):
     def start_additional_runners_if_needed():
@@ -467,9 +510,11 @@ class StressRunner(object):
             query_sumbission_is_locked = False
             ready_to_unlock = None
 
-          if not query_sumbission_is_locked \
-              and self.leak_check_interval_mins \
-              and time() > self._next_leak_check_unix_time.value:
+          if (
+              not query_sumbission_is_locked and
+              self.leak_check_interval_mins and
+              time() > self._next_leak_check_unix_time.value
+          ):
             assert self._num_queries_running <= len(self._query_runners), \
                 "Each running query should belong to a runner"
             LOG.debug("Stopping query submission")
@@ -505,8 +550,8 @@ class StressRunner(object):
         if query_sumbission_is_locked:
           LOG.debug("Resuming query submission")
           self._submit_query_lock.release()
-    self._mem_polling_thread = create_and_start_daemon_thread(poll_mem_usage,
-        "Mem Usage Poller")
+    self._mem_polling_thread = create_and_start_daemon_thread(
+        poll_mem_usage, "Mem Usage Poller")
 
   def _get_mem_usage_values(self, reset=False):
     reported = None
@@ -534,7 +579,7 @@ class StressRunner(object):
 
   def _start_single_runner(self, impalad):
     """Consumer function to take a query of the queue and run it. This is intended to
-       run in a separate process so validating the result set can use a full CPU.
+    run in a separate process so validating the result set can use a full CPU.
     """
     LOG.debug("New query runner started")
     runner = QueryRunner()
@@ -564,7 +609,8 @@ class StressRunner(object):
         mem_limit = query.required_mem_mb_without_spilling
         solo_runtime = query.solo_runtime_secs_without_spilling
       else:
-        mem_limit = randrange(query.required_mem_mb_with_spilling,
+        mem_limit = randrange(
+            query.required_mem_mb_with_spilling,
             query.required_mem_mb_without_spilling + 1)
         solo_runtime = query.solo_runtime_secs_with_spilling
 
@@ -583,8 +629,8 @@ class StressRunner(object):
         if should_cancel:
           timeout = randrange(1, max(int(solo_runtime), 2))
         else:
-          timeout = solo_runtime * max(10, self._num_queries_started.value
-              - self._num_queries_finished.value)
+          timeout = solo_runtime * max(
+              10, self._num_queries_started.value - self._num_queries_finished.value)
         report = runner.run_query(query, timeout, mem_limit)
         LOG.debug("Got execution report for query")
         if report.timed_out and should_cancel:
@@ -609,26 +655,34 @@ class StressRunner(object):
           # The server may fail to respond to clients if the load is high. An error
           # message with "connect()...Connection timed out" comes from the impalad so
           # that will not be ignored.
-          if ("Connection timed out" in error_msg and "connect()" not in error_msg) \
-              or "ECONNRESET" in error_msg \
-              or "couldn't get a client" in error_msg \
-              or "timeout: timed out" in error_msg:
+          if (
+              ("Connection timed out" in error_msg and "connect()" not in error_msg) or
+              "ECONNRESET" in error_msg or
+              "couldn't get a client" in error_msg or
+              "timeout: timed out" in error_msg
+          ):
             self._num_successive_errors.value = 0
             continue
           increment(self._num_successive_errors)
           increment(self._num_other_errors)
           raise Exception("Query failed: %s" % str(report.non_mem_limit_error))
-        if report.mem_limit_exceeded \
-            and not self._mem_broker.was_overcommitted(reservation_id):
+        if (
+            report.mem_limit_exceeded and
+            not self._mem_broker.was_overcommitted(reservation_id)
+        ):
           increment(self._num_successive_errors)
-          raise Exception("Unexpected mem limit exceeded; mem was not overcommitted\n"
+          raise Exception(
+              "Unexpected mem limit exceeded; mem was not overcommitted\n"
               "Profile: %s" % report.profile)
-        if not report.mem_limit_exceeded \
-            and not report.timed_out \
-            and report.result_hash != query.result_hash:
+        if (
+            not report.mem_limit_exceeded and
+            not report.timed_out and
+            (self._verify_results and report.result_hash != query.result_hash)
+        ):
           increment(self._num_successive_errors)
           increment(self._num_result_mismatches)
-          raise Exception("Result hash mismatch; expected %s, got %s\nQuery: %s"
+          raise Exception(
+              "Result hash mismatch; expected %s, got %s\nQuery: %s"
               % (query.result_hash, report.result_hash, query.sql))
         self._num_successive_errors.value = 0
 
@@ -665,18 +719,34 @@ class QueryTimeout(Exception):
   pass
 
 
+class QueryType(object):
+  COMPUTE_STATS, DELETE, INSERT, SELECT, UPDATE, UPSERT = range(6)
+
+
 class Query(object):
   """Contains a SQL statement along with expected runtime information."""
 
   def __init__(self):
     self.name = None
     self.sql = None
+    # In order to be able to make good estimates for DML queries in the binary search,
+    # we need to bring the table to a good initial state before excuting the sql. Running
+    # set_up_sql accomplishes this task.
+    self.set_up_sql = None
     self.db_name = None
     self.result_hash = None
     self.required_mem_mb_with_spilling = None
     self.required_mem_mb_without_spilling = None
     self.solo_runtime_secs_with_spilling = None
     self.solo_runtime_secs_without_spilling = None
+    # Query options to set before running the query.
+    self.options = {}
+    # Determines the order in which we will populate query runtime info. Queries with the
+    # lowest population_order property will be handled first.
+    self.population_order = 0
+    # Type of query. Can have the following values: SELECT, COMPUTE_STATS, INSERT, UPDATE,
+    # UPSERT, DELETE.
+    self.query_type = QueryType.SELECT
 
   def __repr__(self):
     return dedent("""
@@ -686,13 +756,17 @@ class Query(object):
         Solo Runtime: %(solo_runtime_secs_with_spilling)s
         Solo Runtime no-spilling: %(solo_runtime_secs_without_spilling)s
         DB: %(db_name)s
-        SQL: %(sql)s>""".strip() % self.__dict__)
+        Options: %(options)s
+        Set up SQL: %(set_up_sql)s>
+        SQL: %(sql)s>
+        Population order: %(population_order)r>
+        """.strip() % self.__dict__)
 
 
 class QueryRunner(object):
   """Encapsulates functionality to run a query and provide a runtime report."""
 
-  SPILLED_PATTERNS= [re.compile("ExecOption:.*Spilled"), re.compile("SpilledRuns: [^0]")]
+  SPILLED_PATTERNS = [re.compile("ExecOption:.*Spilled"), re.compile("SpilledRuns: [^0]")]
   BATCH_SIZE = 1024
 
   def __init__(self):
@@ -711,8 +785,11 @@ class QueryRunner(object):
       self.impalad_conn.close()
       self.impalad_conn = None
 
-  def run_query(self, query, timeout_secs, mem_limit_mb):
-    """Run a query and return an execution report."""
+  def run_query(self, query, timeout_secs, mem_limit_mb, run_set_up=False):
+    """Run a query and return an execution report. If 'run_set_up' is True, set up sql
+    will be executed before the main query. This should be the case during the binary
+    search phase of the stress test.
+    """
     if not self.impalad_conn:
       raise Exception("connect() must first be called")
 
@@ -721,23 +798,32 @@ class QueryRunner(object):
     try:
       with self.impalad_conn.cursor() as cursor:
         start_time = time()
+        if query.db_name:
+          LOG.debug("Using %s database", query.db_name)
+          cursor.execute("USE %s" % query.db_name)
+        if run_set_up and query.set_up_sql:
+          LOG.debug("Running set up query:\n%s", self.set_up_sql)
+          cursor.execute(query.set_up_sql)
         for query_option, value in self.common_query_options.iteritems():
           cursor.execute(
               "SET {query_option}={value}".format(query_option=query_option, value=value))
+        for query_option, value in query.options.iteritems():
+          cursor.execute(
+              "SET {query_option}={value}".format(query_option=query_option, value=value))
         cursor.execute("SET ABORT_ON_ERROR=1")
         LOG.debug("Setting mem limit to %s MB", mem_limit_mb)
         cursor.execute("SET MEM_LIMIT=%sM" % mem_limit_mb)
-        if query.db_name:
-          LOG.debug("Using %s database", query.db_name)
-          cursor.execute("USE %s" % query.db_name)
-        LOG.debug("Running query with %s MB mem limit at %s with timeout secs %s:\n%s",
+        LOG.debug(
+            "Running query with %s MB mem limit at %s with timeout secs %s:\n%s",
             mem_limit_mb, self.impalad.host_name, timeout_secs, query.sql)
         error = None
         try:
-          cursor.execute_async("/* Mem: %s MB. Coordinator: %s. */\n"
+          cursor.execute_async(
+              "/* Mem: %s MB. Coordinator: %s. */\n"
               % (mem_limit_mb, self.impalad.host_name) + query.sql)
-          LOG.debug("Query id is %s",
-              op_handle_to_query_id(cursor._last_operation_handle))
+          LOG.debug(
+              "Query id is %s", op_handle_to_query_id(cursor._last_operation.handle if
+                                                      cursor._last_operation else None))
           sleep_secs = 0.1
           secs_since_log = 0
           while cursor.is_executing():
@@ -749,24 +835,32 @@ class QueryRunner(object):
               LOG.debug("Waiting for query to execute")
             sleep(sleep_secs)
             secs_since_log += sleep_secs
-          try:
-            report.result_hash = self._hash_result(cursor, timeout_unix_time, query)
-          except QueryTimeout:
-            self._cancel(cursor, report)
-            return report
+          if query.query_type == QueryType.SELECT:
+            try:
+              report.result_hash = self._hash_result(cursor, timeout_unix_time, query)
+            except QueryTimeout:
+              self._cancel(cursor, report)
+              return report
+          else:
+            # If query is in error state, this will raise an exception
+            cursor._wait_to_finish()
         except Exception as error:
-          LOG.debug("Error running query with id %s: %s",
-              op_handle_to_query_id(cursor._last_operation_handle), error)
+          LOG.debug(
+              "Error running query with id %s: %s",
+              op_handle_to_query_id(cursor._last_operation.handle if
+                                    cursor._last_operation else None), error)
           self._check_for_mem_limit_exceeded(report, cursor, error)
         if report.non_mem_limit_error or report.mem_limit_exceeded:
           return report
         report.runtime_secs = time() - start_time
-        if self.check_if_mem_was_spilled:
+        if cursor.execution_failed() or self.check_if_mem_was_spilled:
           # Producing a query profile can be somewhat expensive. A v-tune profile of
           # impalad showed 10% of cpu time spent generating query profiles.
           report.profile = cursor.get_profile()
-          report.mem_was_spilled = any([pattern.search(report.profile) is not None
-              for pattern in  QueryRunner.SPILLED_PATTERNS])
+          report.mem_was_spilled = any([
+              pattern.search(report.profile) is not None
+              for pattern in QueryRunner.SPILLED_PATTERNS])
+          report.mem_limit_exceeded = "Memory limit exceeded" in report.profile
     except Exception as error:
       # A mem limit error would have been caught above, no need to check for that here.
       report.non_mem_limit_error = error
@@ -776,7 +870,7 @@ class QueryRunner(object):
     report.timed_out = True
 
     # Copy the operation handle in case another thread causes the handle to be reset.
-    operation_handle = cursor._last_operation_handle
+    operation_handle = cursor._last_operation.handle if cursor._last_operation else None
     if not operation_handle:
       return
 
@@ -795,14 +889,15 @@ class QueryRunner(object):
 
   def _check_for_mem_limit_exceeded(self, report, cursor, caught_exception):
     """To be called after a query failure to check for signs of failed due to a
-       mem limit. The report will be updated accordingly.
+    mem limit. The report will be updated accordingly.
     """
-    if cursor._last_operation_handle:
+    if cursor._last_operation:
       try:
         report.profile = cursor.get_profile()
       except Exception as e:
-        LOG.debug("Error getting profile for query with id %s: %s",
-            op_handle_to_query_id(cursor._last_operation_handle), e)
+        LOG.debug(
+            "Error getting profile for query with id %s: %s",
+            op_handle_to_query_id(cursor._last_operation.handle), e)
     caught_msg = str(caught_exception).lower().strip()
 
     # Exceeding a mem limit may result in the message "cancelled".
@@ -817,25 +912,32 @@ class QueryRunner(object):
     # exceeding the mem_limit could be something like:
     #   Metadata states that in group hdfs://<node>:8020<path> there are <X> rows,
     #   but only <Y> rows were read.
-    if "metadata states that in group" in caught_msg \
-        and "rows were read" in caught_msg:
+    if (
+        "metadata states that in group" in caught_msg and
+        "rows were read" in caught_msg
+    ):
       report.mem_limit_exceeded = True
       return
 
-    LOG.debug("Non-mem limit error for query with id %s: %s",
-        op_handle_to_query_id(cursor._last_operation_handle), caught_exception,
+    LOG.debug(
+        "Non-mem limit error for query with id %s: %s",
+        op_handle_to_query_id(
+            cursor._last_operation.handle if cursor._last_operation else None),
+        caught_exception,
         exc_info=True)
     report.non_mem_limit_error = caught_exception
 
   def _hash_result(self, cursor, timeout_unix_time, query):
     """Returns a hash that is independent of row order. 'query' is only used for debug
-       logging purposes (if the result is not as expected a log file will be left for
-       investigations).
+    logging purposes (if the result is not as expected a log file will be left for
+    investigations).
     """
-    query_id = op_handle_to_query_id(cursor._last_operation_handle)
+    query_id = op_handle_to_query_id(cursor._last_operation.handle if
+                                     cursor._last_operation else None)
 
     # A value of 1 indicates that the hash thread should continue to work.
     should_continue = Value("i", 1)
+
     def hash_result_impl():
       result_log = None
       try:
@@ -848,12 +950,16 @@ class QueryRunner(object):
         result_log.write("\n")
         current_thread().result = 1
         while should_continue.value:
-          LOG.debug("Fetching result for query with id %s",
-              op_handle_to_query_id(cursor._last_operation_handle))
+          LOG.debug(
+              "Fetching result for query with id %s",
+              op_handle_to_query_id(
+                  cursor._last_operation.handle if cursor._last_operation else None))
           rows = cursor.fetchmany(self.BATCH_SIZE)
           if not rows:
-            LOG.debug("No more results for query with id %s",
-                op_handle_to_query_id(cursor._last_operation_handle))
+            LOG.debug(
+                "No more results for query with id %s",
+                op_handle_to_query_id(
+                    cursor._last_operation.handle if cursor._last_operation else None))
             return
           for row in rows:
             for idx, val in enumerate(row):
@@ -882,12 +988,14 @@ class QueryRunner(object):
       finally:
         if result_log is not None:
           result_log.close()
-          if current_thread().error is not None \
-              and current_thread().result == query.result_hash:
+          if (
+              current_thread().error is not None and
+              current_thread().result == query.result_hash
+          ):
             os.remove(result_log.name)
 
-    hash_thread = create_and_start_daemon_thread(hash_result_impl,
-        "Fetch Results %s" % query_id)
+    hash_thread = create_and_start_daemon_thread(
+        hash_result_impl, "Fetch Results %s" % query_id)
     hash_thread.join(max(timeout_unix_time - time(), 0))
     if hash_thread.is_alive():
       should_continue.value = 0
@@ -900,11 +1008,12 @@ class QueryRunner(object):
 def load_tpc_queries(workload, load_in_kudu=False):
   """Returns a list of TPC queries. 'workload' should either be 'tpch' or 'tpcds'.
   If 'load_in_kudu' is True, it loads only queries specified for the Kudu storage
-  engine."""
+  engine.
+  """
   LOG.info("Loading %s queries", workload)
   queries = list()
-  query_dir = os.path.join(os.path.dirname(__file__), "..", "..",
-      "testdata", "workloads", workload, "queries")
+  query_dir = os.path.join(
+      os.path.dirname(__file__), "..", "..", "testdata", "workloads", workload, "queries")
   engine = 'kudu-' if load_in_kudu else ''
   file_name_pattern = re.compile(r"%s-%s(q\d+).test$" % (workload, engine))
   for query_file in os.listdir(query_dir):
@@ -914,7 +1023,8 @@ def load_tpc_queries(workload, load_in_kudu=False):
     file_path = os.path.join(query_dir, query_file)
     file_queries = load_queries_from_test_file(file_path)
     if len(file_queries) != 1:
-      raise Exception("Expected exactly 1 query to be in file %s but got %s"
+      raise Exception(
+          "Expected exactly 1 query to be in file %s but got %s"
           % (file_path, len(file_queries)))
     query = file_queries[0]
     query.name = match.group(1)
@@ -934,13 +1044,15 @@ def load_queries_from_test_file(file_path, db_name=None):
   return queries
 
 
-def load_random_queries_and_populate_runtime_info(query_generator, model_translator,
-    tables, db_name, impala, use_kerberos, query_count, query_timeout_secs,
-    result_hash_log_dir):
+def load_random_queries_and_populate_runtime_info(
+    query_generator, model_translator, tables, db_name, impala, use_kerberos, query_count,
+    query_timeout_secs, result_hash_log_dir
+):
   """Returns a list of random queries. Each query will also have its runtime info
-     populated. The runtime info population also serves to validate the query.
+  populated. The runtime info population also serves to validate the query.
   """
   LOG.info("Generating random queries")
+
   def generate_candidates():
     while True:
       query_model = query_generator.create_query(tables)
@@ -949,20 +1061,26 @@ def load_random_queries_and_populate_runtime_info(query_generator, model_transla
       query.sql = sql
       query.db_name = db_name
       yield query
-  return populate_runtime_info_for_random_queries(impala, use_kerberos,
-      generate_candidates(), query_count, query_timeout_secs, result_hash_log_dir)
+  return populate_runtime_info_for_random_queries(
+      impala, use_kerberos, generate_candidates(), query_count, query_timeout_secs,
+      result_hash_log_dir)
 
 
-def populate_runtime_info_for_random_queries(impala, use_kerberos, candidate_queries,
-    query_count, query_timeout_secs, result_hash_log_dir):
+def populate_runtime_info_for_random_queries(
+    impala, use_kerberos, candidate_queries,
+    query_count, query_timeout_secs, result_hash_log_dir
+):
   """Returns a list of random queries. Each query will also have its runtime info
-     populated. The runtime info population also serves to validate the query.
+  populated. The runtime info population also serves to validate the query.
   """
   start_time = datetime.now()
   queries = list()
+  # TODO(IMPALA-4632): Consider running reset_databases() here if we want to extend DML
+  #                    functionality to random stress queries as well.
   for query in candidate_queries:
     try:
-      populate_runtime_info(query, impala, use_kerberos, result_hash_log_dir,
+      populate_runtime_info(
+          query, impala, use_kerberos, result_hash_log_dir,
           timeout_secs=query_timeout_secs)
       queries.append(query)
     except Exception as e:
@@ -970,27 +1088,30 @@ def populate_runtime_info_for_random_queries(impala, use_kerberos, candidate_que
       # query generator bugs).
       if print_crash_info_if_exists(impala, start_time):
         raise e
-      LOG.warn("Error running query (the test will continue)\n%s\n%s", e, query.sql,
-          exc_info=True)
+      LOG.warn(
+          "Error running query (the test will continue)\n%s\n%s",
+          e, query.sql, exc_info=True)
     if len(queries) == query_count:
       break
   return queries
 
 
-def populate_runtime_info(query, impala, use_kerberos, result_hash_log_dir,
-    timeout_secs=maxint, samples=1, max_conflicting_samples=0):
+def populate_runtime_info(
+    query, impala, use_kerberos, result_hash_log_dir,
+    timeout_secs=maxint, samples=1, max_conflicting_samples=0
+):
   """Runs the given query by itself repeatedly until the minimum memory is determined
-     with and without spilling. Potentially all fields in the Query class (except
-     'sql') will be populated by this method. 'required_mem_mb_without_spilling' and
-     the corresponding runtime field may still be None if the query could not be run
-     without spilling.
-
-     'samples' and 'max_conflicting_samples' control the reliability of the collected
-     information. The problem is that memory spilling or usage may differ (by a large
-     amount) from run to run due to races during execution. The parameters provide a way
-     to express "X out of Y runs must have resulted in the same outcome". Increasing the
-     number of samples and decreasing the tolerance (max conflicts) increases confidence
-     but also increases the time to collect the data.
+  with and without spilling. Potentially all fields in the Query class (except
+  'sql') will be populated by this method. 'required_mem_mb_without_spilling' and
+  the corresponding runtime field may still be None if the query could not be run
+  without spilling.
+
+  'samples' and 'max_conflicting_samples' control the reliability of the collected
+  information. The problem is that memory spilling or usage may differ (by a large
+  amount) from run to run due to races during execution. The parameters provide a way
+  to express "X out of Y runs must have resulted in the same outcome". Increasing the
+  number of samples and decreasing the tolerance (max conflicts) increases confidence
+  but also increases the time to collect the data.
   """
   LOG.info("Collecting runtime info for query %s: \n%s", query.name, query.sql)
   runner = QueryRunner()
@@ -1015,12 +1136,16 @@ def populate_runtime_info(query, impala, use_kerberos, result_hash_log_dir,
   def update_runtime_info():
     required_mem = min(mem_limit, impala.min_impalad_mem_mb)
     if report.mem_was_spilled:
-      if query.required_mem_mb_with_spilling is None \
-          or required_mem < query.required_mem_mb_with_spilling:
+      if (
+          query.required_mem_mb_with_spilling is None or
+          required_mem < query.required_mem_mb_with_spilling
+      ):
         query.required_mem_mb_with_spilling = required_mem
         query.solo_runtime_secs_with_spilling = report.runtime_secs
-    elif query.required_mem_mb_without_spilling is None \
-        or required_mem < query.required_mem_mb_without_spilling:
+    elif (
+        query.required_mem_mb_without_spilling is None or
+        required_mem < query.required_mem_mb_without_spilling
+    ):
       query.required_mem_mb_without_spilling = required_mem
       query.solo_runtime_secs_without_spilling = report.runtime_secs
 
@@ -1028,7 +1153,7 @@ def populate_runtime_info(query, impala, use_kerberos, result_hash_log_dir,
     reports_by_outcome = defaultdict(list)
     leading_outcome = None
     for remaining_samples in xrange(samples - 1, -1, -1):
-      report = runner.run_query(query, timeout_secs, mem_limit)
+      report = runner.run_query(query, timeout_secs, mem_limit, run_set_up=True)
       if report.timed_out:
         raise QueryTimeout()
       if report.non_mem_limit_error:
@@ -1038,8 +1163,9 @@ def populate_runtime_info(query, impala, use_kerberos, result_hash_log_dir,
         if query.result_hash is None:
           query.result_hash = report.result_hash
         elif query.result_hash != report.result_hash:
-          raise Exception("Result hash mismatch; expected %s, got %s"
-              % (query.result_hash, report.result_hash))
+          raise Exception(
+              "Result hash mismatch; expected %s, got %s" %
+              (query.result_hash, report.result_hash))
 
       if report.mem_limit_exceeded:
         outcome = "EXCEEDED"
@@ -1055,8 +1181,10 @@ def populate_runtime_info(query, impala, use_kerberos, result_hash_log_dir,
         leading_outcome = outcome
       if len(reports_by_outcome[leading_outcome]) + max_conflicting_samples == samples:
         break
-      if len(reports_by_outcome[leading_outcome]) + remaining_samples \
-          < samples - max_conflicting_samples:
+      if (
+          len(reports_by_outcome[leading_outcome]) + remaining_samples <
+          samples - max_conflicting_samples
+      ):
         return
       if desired_outcome \
           and len(reports_by_outcome[desired_outcome]) + remaining_samples \
@@ -1076,8 +1204,8 @@ def populate_runtime_info(query, impala, use_kerberos, result_hash_log_dir,
         if report and report.mem_limit_exceeded:
           limit_exceeded_mem = mem_limit
         if mem_limit == impala.min_impalad_mem_mb:
-          LOG.warn("Query could not be run even when using all available memory\n%s",
-              query.sql)
+          LOG.warn(
+              "Query couldn't be run even when using all available memory\n%s", query.sql)
           return
         mem_limit = min(2 * mem_limit, impala.min_impalad_mem_mb)
         continue
@@ -1097,8 +1225,8 @@ def populate_runtime_info(query, impala, use_kerberos, result_hash_log_dir,
       old_required_mem_mb_without_spilling = None
     else:
       mem_limit = (lower_bound + upper_bound) / 2
-    should_break = mem_limit / float(upper_bound) > MEM_LIMIT_EQ_THRESHOLD_PC \
-            or upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB
+    should_break = mem_limit / float(upper_bound) > MEM_LIMIT_EQ_THRESHOLD_PC or \
+        upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB
     report = get_report(desired_outcome=("NOT_SPILLED" if spill_mem else None))
     if not report:
       lower_bound = mem_limit
@@ -1120,13 +1248,13 @@ def populate_runtime_info(query, impala, use_kerberos, result_hash_log_dir,
         break
       lower_bound = upper_bound = impala.min_impalad_mem_mb
   # This value may be updated during the search for the absolute minimum.
-  LOG.info("Minimum memory to avoid spilling is %s MB"
-      % query.required_mem_mb_without_spilling)
+  LOG.info(
+      "Minimum memory to avoid spilling: %s MB" % query.required_mem_mb_without_spilling)
 
   LOG.info("Finding absolute minimum memory required")
   lower_bound = limit_exceeded_mem
-  upper_bound = min(spill_mem or maxint, non_spill_mem or maxint,
-      impala.min_impalad_mem_mb)
+  upper_bound = min(
+      spill_mem or maxint, non_spill_mem or maxint, impala.min_impalad_mem_mb)
   while True:
     if old_required_mem_mb_with_spilling:
       mem_limit = old_required_mem_mb_with_spilling
@@ -1147,13 +1275,16 @@ def populate_runtime_info(query, impala, use_kerberos, result_hash_log_dir,
         query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling
       break
   LOG.info("Minimum memory is %s MB" % query.required_mem_mb_with_spilling)
-  if query.required_mem_mb_without_spilling is not None \
-      and query.required_mem_mb_without_spilling is not None \
-      and query.required_mem_mb_without_spilling < query.required_mem_mb_with_spilling:
+  if (
+      query.required_mem_mb_without_spilling is not None and
+      query.required_mem_mb_without_spilling is not None and
+      query.required_mem_mb_without_spilling < query.required_mem_mb_with_spilling
+  ):
     # Query execution is not deterministic and sometimes a query will run without spilling
     # at a lower mem limit than it did with spilling. In that case, just use the lower
     # value.
-    LOG.info("A lower memory limit to avoid spilling was found while searching for"
+    LOG.info(
+        "A lower memory limit to avoid spilling was found while searching for"
         " the absolute minimum memory.")
     query.required_mem_mb_with_spilling = query.required_mem_mb_without_spilling
     query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling
@@ -1162,12 +1293,15 @@ def populate_runtime_info(query, impala, use_kerberos, result_hash_log_dir,
 
 def estimate_query_mem_mb_usage(query, query_runner):
   """Runs an explain plan then extracts and returns the estimated memory needed to run
-     the query.
+  the query.
   """
   with query_runner.impalad_conn.cursor() as cursor:
     LOG.debug("Using %s database", query.db_name)
     if query.db_name:
       cursor.execute('USE ' + query.db_name)
+    if query.query_type == QueryType.COMPUTE_STATS:
+      # Running "explain" on compute stats is not supported by Impala.
+      return
     LOG.debug("Explaining query\n%s", query.sql)
     cursor.execute('EXPLAIN ' + query.sql)
     first_val = cursor.fetchone()[0]
@@ -1186,13 +1320,16 @@ def save_runtime_info(path, query, impala):
       store = json.load(file)
     _check_store_version(store)
   if not store:
-    store = {"host_names": list(), "db_names": dict(),
-        "version": RUNTIME_INFO_FILE_VERSION}
+    store = {
+        "host_names": list(), "db_names": dict(), "version": RUNTIME_INFO_FILE_VERSION}
   with open(path, "w+") as file:
     store["host_names"] = sorted([i.host_name for i in impala.impalads])
     queries = store["db_names"].get(query.db_name, dict())
-    queries[query.sql] = query
+    query_by_options = queries.get(query.sql, dict())
+    query_by_options[str(sorted(query.options.items()))] = query
+    queries[query.sql] = query_by_options
     store["db_names"][query.db_name] = queries
+
     class JsonEncoder(json.JSONEncoder):
       def default(self, obj):
         data = dict(obj.__dict__)
@@ -1200,61 +1337,66 @@ def save_runtime_info(path, query, impala):
         if "sql" in data:
           del data["sql"]
         return data
-    json.dump(store, file, cls=JsonEncoder, sort_keys=True, indent=2,
-        separators=(',', ': '))
+    json.dump(
+        store, file, cls=JsonEncoder, sort_keys=True, indent=2, separators=(',', ': '))
 
 
 def load_runtime_info(path, impala=None):
   """Reads the query runtime information at 'path' and returns a
-     dict<db_name, dict<sql, Query>>. Returns an empty dict if the hosts in the 'impala'
-     instance do not match the data in 'path'.
+  dict<db_name, dict<sql, Query>>. Returns an empty dict if the hosts in the 'impala'
+  instance do not match the data in 'path'.
   """
-  queries_by_db_and_sql = defaultdict(dict)
+  queries_by_db_and_sql = defaultdict(lambda: defaultdict(dict))
   if not os.path.exists(path):
     return queries_by_db_and_sql
   with open(path) as file:
     store = json.load(file)
     _check_store_version(store)
-    if impala and \
-        store.get("host_names") != sorted([i.host_name for i in impala.impalads]):
+    if (
+        impala and
+        store.get("host_names") != sorted([i.host_name for i in impala.impalads])
+    ):
       return queries_by_db_and_sql
     for db_name, queries_by_sql in store["db_names"].iteritems():
-      for sql, json_query in queries_by_sql.iteritems():
-        query = Query()
-        query.__dict__.update(json_query)
-        query.sql = sql
-        queries_by_db_and_sql[db_name][sql] = query
+      for sql, queries_by_options in queries_by_sql.iteritems():
+        for options, json_query in queries_by_options.iteritems():
+          query = Query()
+          query.__dict__.update(json_query)
+          query.sql = sql
+          queries_by_db_and_sql[db_name][sql][options] = query
   return queries_by_db_and_sql
 
 
 def _check_store_version(store):
   """Clears 'store' if the version is too old or raises an error if the version is too
-     new.
+  new.
   """
   if store["version"] < RUNTIME_INFO_FILE_VERSION:
     LOG.warn("Runtime file info version is old and will be ignored")
     store.clear()
   elif store["version"] > RUNTIME_INFO_FILE_VERSION:
-    raise Exception("Unexpected runtime file info version %s expected %s"
+    raise Exception(
+        "Unexpected runtime file info version %s expected %s"
         % (store["version"], RUNTIME_INFO_FILE_VERSION))
 
 
 def print_runtime_info_comparison(old_runtime_info, new_runtime_info):
   # TODO: Provide a way to call this from the CLI. This was hard coded to run from main()
   #       when it was used.
-  print(",".join(["Database", "Query",
-    "Old Mem MB w/Spilling",
-    "New Mem MB w/Spilling",
-    "Diff %",
-    "Old Runtime w/Spilling",
-    "New Runtime w/Spilling",
-    "Diff %",
-    "Old Mem MB wout/Spilling",
-    "New Mem MB wout/Spilling",
-    "Diff %",
-    "Old Runtime wout/Spilling",
-    "New Runtime wout/Spilling",
-    "Diff %"]))
+  print(",".join([
+      "Database", "Query",
+      "Old Mem MB w/Spilling",
+      "New Mem MB w/Spilling",
+      "Diff %",
+      "Old Runtime w/Spilling",
+      "New Runtime w/Spilling",
+      "Diff %",
+      "Old Mem MB wout/Spilling",
+      "New Mem MB wout/Spilling",
+      "Diff %",
+      "Old Runtime wout/Spilling",
+      "New Runtime wout/Spilling",
+      "Diff %"]))
   for db_name, old_queries in old_runtime_info.iteritems():
     new_queries = new_runtime_info.get(db_name)
     if not new_queries:
@@ -1267,8 +1409,10 @@ def print_runtime_info_comparison(old_runtime_info, new_runtime_info):
       sys.stdout.write(",")
       sys.stdout.write(old_query["name"])
       sys.stdout.write(",")
-      for attr in ["required_mem_mb_with_spilling", "solo_runtime_secs_with_spilling",
-          "required_mem_mb_without_spilling", "solo_runtime_secs_without_spilling"]:
+      for attr in [
+          "required_mem_mb_with_spilling", "solo_runtime_secs_with_spilling",
+          "required_mem_mb_without_spilling", "solo_runtime_secs_without_spilling"
+      ]:
         old_value = old_query[attr]
         sys.stdout.write(str(old_value))
         sys.stdout.write(",")
@@ -1283,90 +1427,345 @@ def print_runtime_info_comparison(old_runtime_info, new_runtime_info):
       print()
 
 
+def generate_DML_queries(cursor, dml_mod_values):
+  """Generate insert, upsert, update, delete DML statements.
+
+  For each table in the database that cursor is connected to, create 4 DML queries
+  (insert, upsert, update, delete) for each mod value in 'dml_mod_values'. This value
+  controls which rows will be affected. The generated queries assume that for each table
+  in the database, there exists a table with a '_original' suffix that is never modified.
+
+  This function has some limitations:
+  1. Only generates DML statements against Kudu tables, and ignores non-Kudu tables.
+  2. Requires that the type of the first column of the primary key is an integer type.
+  """
+  LOG.info("Generating DML queries")
+  tables = [cursor.describe_table(t) for t in cursor.list_table_names()
+            if not t.endswith("_original")]
+  result = []
+  for table in tables:
+    if not table.primary_keys:
+      # Skip non-Kudu tables. If a table has no primary keys, then it cannot be a Kudu
+      # table.
+      LOG.debug("Skipping table '{0}' because it has no primary keys.".format(table.name))
+      continue
+    if len(table.primary_keys) > 1:
+      # TODO(IMPALA-4665): Add support for tables with multiple primary keys.
+      LOG.debug("Skipping table '{0}' because it has more than "
+                "1 primary key column.".format(table.name))
+      continue
+    primary_key = table.primary_keys[0]
+    if primary_key.exact_type not in (Int, TinyInt, SmallInt, BigInt):
+      # We want to be able to apply the modulo operation on the primary key. If the
+      # the first primary key column happens to not be an integer, we will skip
+      # generating queries for this table
+      LOG.debug("Skipping table '{0}' because the first column '{1}' in the "
+                "primary key is not an integer.".format(table.name, primary_key.name))
+      continue
+    for mod_value in dml_mod_values:
+      # Insert
+      insert_query = Query()
+      # Populate runtime info for Insert and Upsert queries before Update and Delete
+      # queries because tables remain in original state after running the Insert and
+      # Upsert queries. During the binary search in runtime info population for the
+      # Insert query, we first delete some rows and then reinsert them, so the table
+      # remains in the original state. For the delete, the order is reversed, so the table
+      # is not in the original state after running the the delete (or update) query. This
+      # is why population_order is smaller for Insert and Upsert queries.
+      insert_query.population_order = 1
+      insert_query.query_type = QueryType.INSERT
+      insert_query.name = "insert_{0}".format(table.name)
+      insert_query.db_name = cursor.db_name
+      insert_query.sql = (
+          "INSERT INTO TABLE {0} SELECT * FROM {0}_original "
+          "WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
+      # Upsert
+      upsert_query = Query()
+      upsert_query.population_order = 1
+      upsert_query.query_type = QueryType.UPSERT
+      upsert_query.name = "upsert_{0}".format(table.name)
+      upsert_query.db_name = cursor.db_name
+      upsert_query.sql = (
+          "UPSERT INTO TABLE {0} SELECT * "
+          "FROM {0}_original WHERE {1} % {2} = 0").format(
+              table.name, primary_key.name, mod_value)
+      # Update
+      update_query = Query()
+      update_query.population_order = 2
+      update_query.query_type = QueryType.UPDATE
+      update_query.name = "update_{0}".format(table.name)
+      update_query.db_name = cursor.db_name
+      update_list = ', '.join(
+          'a.{0} = b.{0}'.format(col.name)
+          for col in table.cols if not col.is_primary_key)
+      update_query.sql = (
+          "UPDATE a SET {update_list} FROM {table_name} a JOIN {table_name}_original b "
+          "ON a.{pk} = b.{pk} + 1 WHERE a.{pk} % {mod_value} = 0").format(
+              table_name=table.name, pk=primary_key.name, mod_value=mod_value,
+              update_list=update_list)
+      # Delete
+      delete_query = Query()
+      delete_query.population_order = 2
+      delete_query.query_type = QueryType.DELETE
+      delete_query.name = "delete_{0}".format(table.name)
+      delete_query.db_name = cursor.db_name
+      delete_query.sql = ("DELETE FROM {0} WHERE {1} % {2} = 0").format(
+          table.name, primary_key.name, mod_value)
+
+      if table.name + "_original" in set(table.name for table in tables):
+        insert_query.set_up_sql = "DELETE FROM {0} WHERE {1} % {2} = 0".format(
+            table.name, primary_key.name, mod_value)
+        upsert_query.set_up_sql = insert_query.set_up_sql
+        update_query.set_up_sql = (
+            "UPSERT INTO TABLE {0} SELECT * FROM {0}_original "
+            "WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
+        delete_query.set_up_sql = update_query.set_up_sql
+
+      result.append(insert_query)
+      LOG.debug("Added insert query: {0}".format(insert_query))
+      result.append(update_query)
+      LOG.debug("Added update query: {0}".format(update_query))
+      result.append(upsert_query)
+      LOG.debug("Added upsert query: {0}".format(upsert_query))
+      result.append(delete_query)
+      LOG.debug("Added delete query: {0}".format(delete_query))
+  assert len(result) > 0, "No DML queries were added."
+  return result
+
+
+def generate_compute_stats_queries(cursor):
+  """For each table in the database that cursor is connected to, generate several compute
+  stats queries. Each query will have a different value for the MT_DOP query option.
+  """
+  LOG.info("Generating Compute Stats queries")
+  tables = [cursor.describe_table(t) for t in cursor.list_table_names()
+            if not t.endswith("_original")]
+  result = []
+  mt_dop_values = [str(2**k) for k in range(5)]
+  for table in tables:
+    for mt_dop_value in mt_dop_values:
+      compute_query = Query()
+      compute_query.population_order = 1
+      compute_query.query_type = QueryType.COMPUTE_STATS
+      compute_query.sql = "COMPUTE STATS {0}".format(table.name)
+      compute_query.options["MT_DOP"] = mt_dop_value
+      compute_query.db_name = cursor.db_name
+      compute_query.name = "compute_stats_{0}_mt_dop_{1}".format(
+          table.name, compute_query.options["MT_DOP"])
+      result.append(compute_query)
+      LOG.debug("Added compute stats query: {0}".format(compute_query))
+  return result
+
+
+def prepare_database(cursor):
+  """For each table in the database that cursor is connected to, create an identical copy
+  with '_original' suffix. This function is idempotent.
+
+  Note: At this time we only support Kudu tables with a simple hash partitioning based on
+  the primary key. (SHOW CREATE TABLE would not work otherwise.)
+  """
+  tables = {t: cursor.describe_table(t) for t in cursor.list_table_names()}
+  for table_name in tables:
+    if not table_name.endswith("_original") and table_name + "_original" not in tables:
+      LOG.debug("Creating original table: {0}".format(table_name))
+      cursor.execute("SHOW CREATE TABLE " + table_name)
+      create_sql = cursor.fetchone()[0]
+      search_pattern = r"CREATE TABLE (\w*)\.(.*) \("
+      replacement = "CREATE TABLE {tbl} (".format(tbl=table_name + "_original")
+      create_original_sql = re.sub(
+          search_pattern, replacement, create_sql, count=1)
+      LOG.debug("Create original SQL:\n{0}".format(create_original_sql))
+      cursor.execute(create_original_sql)
+      cursor.execute("INSERT INTO {0}_original SELECT * FROM {0}".format(table_name))
+      cursor.execute("COMPUTE STATS {0}".format(table_name + "_original"))
+
+
+def reset_databases(cursor):
+  """Reset the database to the initial state. This is done by overwriting tables which
+  don't have the _original suffix with data from tables with the _original suffix.
+
+  Note: At this time we only support Kudu tables with a simple hash partitioning based on
+  the primary key. (SHOW CREATE TABLE would not work otherwise.)
+  """
+  LOG.info("Resetting {0} database".format(cursor.db_name))
+  tables = {t: cursor.describe_table(t) for t in cursor.list_table_names()}
+  for table_name in tables:
+    if not table_name.endswith("_original"):
+      if table_name + "_original" in tables:
+        cursor.execute("SHOW CREATE TABLE " + table_name)
+        create_table_command = cursor.fetchone()[0]
+        cursor.execute("DROP TABLE {0}".format(table_name))
+        cursor.execute(create_table_command)
+        cursor.execute("INSERT INTO {0} SELECT * FROM {0}_original".format(table_name))
+        cursor.execute("COMPUTE STATS {0}".format(table_name))
+      else:
+        LOG.debug("Table '{0}' cannot be reset because '{0}_original' does not"
+                  " exist in '{1}' database.".format(table_name, cursor.db_name))
+
+
+def populate_all_queries(queries, impala, args, runtime_info_path,
+                         queries_with_runtime_info_by_db_sql_and_options):
+  """Populate runtime info for all queries, ordered by the population_order property."""
+  result = []
+  queries_by_order = {}
+  for query in queries:
+    if query.population_order not in queries_by_order:
+      queries_by_order[query.population_order] = []
+    queries_by_order[query.population_order].append(query)
+  for population_order in sorted(queries_by_order.keys()):
+    for query in queries_by_order[population_order]:
+      if (
+          query.sql in
+          queries_with_runtime_info_by_db_sql_and_options[query.db_name] and
+          str(sorted(query.options.items())) in
+          queries_with_runtime_info_by_db_sql_and_options[query.db_name][query.sql]
+      ):
+        LOG.debug("Reusing previous runtime data for query: " + query.sql)
+        result.append(queries_with_runtime_info_by_db_sql_and_options[
+            query.db_name][query.sql][str(sorted(query.options.items()))])
+      else:
+        populate_runtime_info(
+            query, impala, args.use_kerberos, args.result_hash_log_dir,
+            samples=args.samples, max_conflicting_samples=args.max_conflicting_samples)
+        save_runtime_info(runtime_info_path, query, impala)
+        result.append(query)
+  return result
+
+
 def main():
   from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
   from random import shuffle
   import tests.comparison.cli_options as cli_options
 
-  parser = ArgumentParser(epilog=dedent("""
-         Before running this script a CM cluster must be setup and any needed data
-         such as TPC-H/DS must be loaded. The first time this script is run it will
-         find memory limits and runtimes for each query and save the data to disk (since
-         collecting the data is slow) at --runtime-info-path then run the stress test.
-         Later runs will reuse the saved memory limits and timings. If the cluster changes
-         significantly the memory limits should be re-measured (deleting the file at
-         --runtime-info-path will cause re-measuring to happen).""").strip(),
-         formatter_class=ArgumentDefaultsHelpFormatter)
+  parser = ArgumentParser(
+      epilog=dedent("""
+      Before running this script a CM cluster must be setup and any needed data
+      such as TPC-H/DS must be loaded. The first time this script is run it will
+      find memory limits and runtimes for each query and save the data to disk (since
+      collecting the data is slow) at --runtime-info-path then run the stress test.
+      Later runs will reuse the saved memory limits and timings. If the cluster changes
+      significantly the memory limits should be re-measured (deleting the file at
+      --runtime-info-path will cause re-measuring to happen).""").strip(),
+      formatter_class=ArgumentDefaultsHelpFormatter)
   cli_options.add_logging_options(parser)
   cli_options.add_cluster_options(parser)
   cli_options.add_kerberos_options(parser)
-  parser.add_argument("--runtime-info-path",
+  parser.add_argument(
+      "--runtime-info-path",
       default=os.path.join(gettempdir(), "{cm_host}_query_runtime_info.json"),
       help="The path to store query runtime info at. '{cm_host}' will be replaced with"
       " the actual host name from --cm-host.")
-  parser.add_argument("--samples", default=1, type=int,
+  parser.add_argument(
+      "--samples", default=1, type=int,
       help='Used when collecting "runtime info" - the number of samples to collect when'
       ' testing a particular mem limit value.')
-  parser.add_argument("--max-conflicting-samples", default=0, type=int,
+  parser.add_argument(
+      "--max-conflicting-samples", default=0, type=int,
       help='Used when collecting "runtime info" - the number of samples outcomes that'
       ' can disagree when deciding to accept a particular mem limit. Ex, when trying to'
       ' determine the mem limit that avoids spilling with samples=5 and'
       ' max-conflicting-samples=1, then 4/5 queries must not spill at a particular mem'
       ' limit.')
-  parser.add_argument("--result-hash-log-dir", default=gettempdir(),
+  parser.add_argument(
+      "--result-hash-log-dir", default=gettempdir(),
       help="If query results do not match, a log file will be left in this dir. The log"
       " file is also created during the first run when runtime info is collected for"
       " each query.")
-  parser.add_argument("--no-status", action="store_true",
-      help="Do not print the status table.")
-  parser.add_argument("--cancel-current-queries", action="store_true",
+  parser.add_argument(
+      "--no-status", action="store_true", help="Do not print the status table.")
+  parser.add_argument(
+      "--cancel-current-queries", action="store_true",
       help="Cancel any queries running on the cluster before beginning.")
-  parser.add_argument("--filter-query-mem-ratio", type=float, default=0.333,
+  parser.add_argument(
+      "--filter-query-mem-ratio", type=float, default=0.333,
       help="Queries that require this ratio of total available memory will be filtered.")
-  parser.add_argument("--startup-queries-per-second", type=float, default=2.0,
+  parser.add_argument(
+      "--startup-queries-per-second", type=float, default=2.0,
       help="Adjust this depending on the cluster size and workload. This determines"
       " the minimum amount of time between successive query submissions when"
       " the workload is initially ramping up.")
-  parser.add_argument("--fail-upon-successive-errors", type=int, default=1,
+  parser.add_argument(
+      "--fail-upon-successive-errors", type=int, default=1,
       help="Continue running until N query errors are encountered in a row. Set"
       " this to a high number to only stop when something catastrophic happens. A"
       " value of 1 stops upon the first error.")
-  parser.add_argument("--mem-limit-padding-pct", type=int, default=25,
+  parser.add_argument(
+      "--mem-limit-padding-pct", type=int, default=25,
       help="Pad query mem limits found by solo execution with this percentage when"
       " running concurrently. After padding queries will not be expected to fail"
       " due to mem limit exceeded.")
-  parser.add_argument("--timeout-multiplier", type=float, default=1.0,
+  parser.add_argument(
+      "--mem-limit-padding-abs", type=int, default=0,
+      help="Pad query mem limits found by solo execution with this value (in megabytes)"
+      " running concurrently. After padding queries will not be expected to fail"
+      " due to mem limit exceeded. This is useful if we want to be able to add the same"
+      " amount of memory to smaller queries as to the big ones.")
+  parser.add_argument(
+      "--timeout-multiplier", type=float, default=1.0,
       help="Query timeouts will be multiplied by this value.")
   parser.add_argument("--max-queries", type=int, default=100)
+  parser.add_argument(
+      "--reset-databases-before-binary-search", action="store_true",
+      help="If True, databases will be reset to their original state before the binary"
+      " search.")
+  parser.add_argument(
+      "--reset-databases-after-binary-search", action="store_true",
+      help="If True, databases will be reset to their original state after the binary"
+      " search and before starting the stress test. The primary intent of this option is"
+      " to undo the changes made to the databases by the binary search. This option can"
+      " also be used to reset the databases before running other (non stress) tests on"
+      " the same data.")
+  parser.add_argument(
+      "--generate-dml-queries", action="store_true",
+      help="If True, DML queries will be generated for Kudu databases.")
+  parser.add_argument(
+      "--dml-mod-values", nargs="+", type=int, default=[11],
+      help="List of mod values to use for the DML queries. There will be 4 DML (delete,"
+      " insert, update, upsert) queries generated per mod value per table. The smaller"
+      " the value, the more rows the DML query would touch (the query should touch about"
+      " 1/mod_value rows.)")
+  parser.add_argument(
+      "--generate-compute-stats-queries", action="store_true",
+      help="If True, Compute Stats queries will be generated.")
+  parser.add_argument(
+      "--select-probability", type=float, default=0.5,
+      help="Probability of choosing a select query (as opposed to a DML query).")
   parser.add_argument("--tpcds-db", help="If provided, TPC-DS queries will be used.")
   parser.add_argument("--tpch-db", help="If provided, TPC-H queries will be used.")
-  parser.add_argument("--tpch-nested-db",
-      help="If provided, nested TPC-H queries will be used.")
-  parser.add_argument("--tpch-kudu-db",
-      help="If provided, TPC-H queries for Kudu will be used.")
-  parser.add_argument("--tpcds-kudu-db",
-      help="If provided, TPC-DS queries for Kudu will be used.")
-  parser.add_argument("--random-db",
-      help="If provided, random queries will be used.")
-  parser.add_argument("--random-query-count", type=int, default=50,
+  parser.add_argument(
+      "--tpch-nested-db", help="If provided, nested TPC-H queries will be used.")
+  parser.add_argument(
+      "--tpch-kudu-db", help="If provided, TPC-H queries for Kudu will be used.")
+  parser.add_argument(
+      "--tpcds-kudu-db", help="If provided, TPC-DS queries for Kudu will be used.")
+  parser.add_argument(
+      "--random-db", help="If provided, random queries will be used.")
+  parser.add_argument(
+      "--random-query-count", type=int, default=50,
       help="The number of random queries to generate.")
-  parser.add_argument("--random-query-timeout-seconds", type=int, default=(5 * 60),
+  parser.add_argument(
+      "--random-query-timeout-seconds", type=int, default=(5 * 60),
       help="A random query that runs longer than this time when running alone will"
       " be discarded.")
-  parser.add_argument("--query-file-path", help="Use queries in the given file. The file"
+  parser.add_argument(
+      "--query-file-path", help="Use queries in the given file. The file"
       " format must be the same as standard test case format. Queries are expected to "
       " be randomly generated and will be validated before running in stress mode.")
-  parser.add_argument("--query-file-db", help="The name of the database to use with the "
-      "queries from --query-file-path.")
+  parser.add_argument(
+      "--query-file-db",
+      help="The name of the database to use with the queries from --query-file-path.")
   parser.add_argument("--mem-overcommit-pct", type=float, default=0)
-  parser.add_argument("--mem-spill-probability", type=float, default=0.33,
-      dest="spill_probability",
+  parser.add_argument(
+      "--mem-spill-probability", type=float, default=0.33, dest="spill_probability",
       help="The probability that a mem limit will be set low enough to induce spilling.")
-  parser.add_argument("--mem-leak-check-interval-mins", type=int, default=None,
+  parser.add_argument(
+      "--mem-leak-check-interval-mins", type=int, default=None,
       help="Periodically stop query execution and check that memory levels have reset.")
-  parser.add_argument("--cancel-probability", type=float, default=0.1,
+  parser.add_argument(
+      "--cancel-probability", type=float, default=0.1,
       help="The probability a query will be cancelled.")
-  parser.add_argument("--nlj-filter", choices=("in", "out", None),
+  parser.add_argument(
+      "--nlj-filter", choices=("in", "out", None),
       help="'in' means only nested-loop queries will be used, 'out' means no NLJ queries"
       " will be used. The default is to not filter either way.")
   parser.add_argument(
@@ -1377,14 +1776,18 @@ def main():
       "DISABLE_CODEGEN=true RUNTIME_FILTER_MODE=1")
   args = parser.parse_args()
 
-  cli_options.configure_logging(args.log_level, debug_log_file=args.debug_log_file,
-      log_thread_id=True, log_process_id=True)
+  cli_options.configure_logging(
+      args.log_level, debug_log_file=args.debug_log_file, log_thread_id=True,
+      log_process_id=True)
   LOG.debug("CLI args: %s" % (args, ))
 
-  if not args.tpcds_db and not args.tpch_db and not args.random_db \
-      and not args.tpch_nested_db and not args.tpch_kudu_db \
-      and not args.tpcds_kudu_db and not args.query_file_path:
-    raise Exception("At least one of --tpcds-db, --tpch-db, --tpch-kudu-db,"
+  if (
+      not args.tpcds_db and not args.tpch_db and not args.random_db and not
+      args.tpch_nested_db and not args.tpch_kudu_db and not
+      args.tpcds_kudu_db and not args.query_file_path
+  ):
+    raise Exception(
+        "At least one of --tpcds-db, --tpch-db, --tpch-kudu-db,"
         "--tpcds-kudu-db, --tpch-nested-db, --random-db, --query-file-path is required")
 
   # The stress test sets these, so callers cannot override them.
@@ -1437,7 +1840,8 @@ def main():
   runtime_info_path = args.runtime_info_path
   if "{cm_host}" in runtime_info_path:
     runtime_info_path = runtime_info_path.format(cm_host=args.cm_host)
-  queries_with_runtime_info_by_db_and_sql = load_runtime_info(runtime_info_path, impala)
+  queries_with_runtime_info_by_db_sql_and_options = load_runtime_info(
+      runtime_info_path, impala)
 
   # Start loading the test queries.
   queries = list()
@@ -1449,36 +1853,57 @@ def main():
     for query in tpcds_queries:
       query.db_name = args.tpcds_db
     queries.extend(tpcds_queries)
+    if args.generate_compute_stats_queries:
+      with impala.cursor(db_name=args.tpcds_db) as cursor:
+        queries.extend(generate_compute_stats_queries(cursor))
   if args.tpch_db:
     tpch_queries = load_tpc_queries("tpch")
     for query in tpch_queries:
       query.db_name = args.tpch_db
     queries.extend(tpch_queries)
+    if args.generate_compute_stats_queries:
+      with impala.cursor(db_name=args.tpch_db) as cursor:
+        queries.extend(generate_compute_stats_queries(cursor))
   if args.tpch_nested_db:
     tpch_nested_queries = load_tpc_queries("tpch_nested")
     for query in tpch_nested_queries:
       query.db_name = args.tpch_nested_db
     queries.extend(tpch_nested_queries)
+    if args.generate_compute_stats_queries:
+      with impala.cursor(db_name=args.tpch_nested_db) as cursor:
+        queries.extend(generate_compute_stats_queries(cursor))
   if args.tpch_kudu_db:
     tpch_kudu_queries = load_tpc_queries("tpch", load_in_kudu=True)
     for query in tpch_kudu_queries:
       query.db_name = args.tpch_kudu_db
     queries.extend(tpch_kudu_queries)
+    if args.generate_compute_stats_queries:
+      with impala.cursor(db_name=args.tpch_kudu_db) as cursor:
+        queries.extend(generate_compute_stats_queries(cursor))
+    if args.generate_dml_queries:
+      with impala.cursor(db_name=args.tpch_kudu_db) as cursor:
+        prepare_database(cursor)
+        queries.extend(generate_DML_queries(cursor, args.dml_mod_values))
   if args.tpcds_kudu_db:
     tpcds_kudu_queries = load_tpc_queries("tpcds", load_in_kudu=True)
     for query in tpcds_kudu_queries:
       query.db_name = args.tpcds_kudu_db
     queries.extend(tpcds_kudu_queries)
-  for idx in xrange(len(queries) - 1, -1, -1):
-    query = queries[idx]
-    if query.sql in queries_with_runtime_info_by_db_and_sql[query.db_name]:
-      query = queries_with_runtime_info_by_db_and_sql[query.db_name][query.sql]
-      LOG.debug("Reusing previous runtime data for query: " + query.sql)
-      queries[idx] = query
-    else:
-      populate_runtime_info(query, impala, args.use_kerberos, args.result_hash_log_dir,
-          samples=args.samples, max_conflicting_samples=args.max_conflicting_samples)
-      save_runtime_info(runtime_info_path, query, impala)
+    if args.generate_compute_stats_queries:
+      with impala.cursor(db_name=args.tpcds_kudu_db) as cursor:
+        queries.extend(generate_compute_stats_queries(cursor))
+    if args.generate_dml_queries:
+      with impala.cursor(db_name=args.tpcds_kudu_db) as cursor:
+        prepare_database(cursor)
+        queries.extend(generate_DML_queries(cursor, args.dml_mod_values))
+
+  if args.reset_databases_before_binary_search:
+    for database in set(query.db_name for query in queries):
+      with impala.cursor(db_name=database) as cursor:
+        reset_databases(cursor)
+
+  queries = populate_all_queries(queries, impala, args, runtime_info_path,
+                                 queries_with_runtime_info_by_db_sql_and_options)
 
   # A particular random query may either fail (due to a generator or Impala bug) or
   # take a really long time to complete. So the queries needs to be validated. Since the
@@ -1487,28 +1912,30 @@ def main():
     query_generator = QueryGenerator(DefaultProfile())
     with impala.cursor(db_name=args.random_db) as cursor:
       tables = [cursor.describe_table(t) for t in cursor.list_table_names()]
-    queries.extend(load_random_queries_and_populate_runtime_info(query_generator,
-        SqlWriter.create(), tables, args.random_db, impala, args.use_kerberos,
-        args.random_query_count, args.random_query_timeout_seconds,
+    queries.extend(load_random_queries_and_populate_runtime_info(
+        query_generator, SqlWriter.create(), tables, args.random_db, impala,
+        args.use_kerberos, args.random_query_count, args.random_query_timeout_seconds,
         args.result_hash_log_dir))
 
   if args.query_file_path:
-    file_queries = load_queries_from_test_file(args.query_file_path,
-        db_name=args.query_file_db)
+    file_queries = load_queries_from_test_file(
+        args.query_file_path, db_name=args.query_file_db)
     shuffle(file_queries)
-    queries.extend(populate_runtime_info_for_random_queries(impala, args.use_kerberos,
-        file_queries, args.random_query_count, args.random_query_timeout_seconds,
-        args.result_hash_log_dir))
+    queries.extend(populate_runtime_info_for_random_queries(
+        impala, args.use_kerberos, file_queries, args.random_query_count,
+        args.random_query_timeout_seconds, args.result_hash_log_dir))
 
   # Apply tweaks to the query's runtime info as requested by CLI options.
   for idx in xrange(len(queries) - 1, -1, -1):
     query = queries[idx]
     if query.required_mem_mb_with_spilling:
-      query.required_mem_mb_with_spilling += int(query.required_mem_mb_with_spilling
-          * args.mem_limit_padding_pct / 100.0)
+      query.required_mem_mb_with_spilling += int(
+          query.required_mem_mb_with_spilling * args.mem_limit_padding_pct / 100.0) + \
+          args.mem_limit_padding_abs
     if query.required_mem_mb_without_spilling:
-      query.required_mem_mb_without_spilling += int(query.required_mem_mb_without_spilling
-          * args.mem_limit_padding_pct / 100.0)
+      query.required_mem_mb_without_spilling += int(
+          query.required_mem_mb_without_spilling * args.mem_limit_padding_pct / 100.0) + \
+          args.mem_limit_padding_abs
     if query.solo_runtime_secs_with_spilling:
       query.solo_runtime_secs_with_spilling *= args.timeout_multiplier
     if query.solo_runtime_secs_without_spilling:
@@ -1522,6 +1949,7 @@ def main():
       LOG.debug("Filtered query due to mem ratio option: " + query.sql)
       del queries[idx]
 
+  # Remove queries that have a nested loop join in the plan.
   if args.nlj_filter:
     with impala.cursor(db_name=args.random_db) as cursor:
       for idx in xrange(len(queries) - 1, -1, -1):
@@ -1548,6 +1976,15 @@ def main():
     raise Exception("All queries were filtered")
   print("Using %s queries" % len(queries))
 
+  # After the binary search phase finishes, it may be a good idea to reset the database
+  # again to start the stress test from a clean state.
+  if args.reset_databases_after_binary_search:
+    for database in set(query.db_name for query in queries):
+      with impala.cursor(db_name=database) as cursor:
+        reset_databases(cursor)
+
+  LOG.info("Number of queries in the list: {0}".format(len(queries)))
+
   stress_runner = StressRunner()
   stress_runner.result_hash_log_dir = args.result_hash_log_dir
   stress_runner.startup_queries_per_sec = args.startup_queries_per_second
@@ -1557,8 +1994,12 @@ def main():
   stress_runner.spill_probability = args.spill_probability
   stress_runner.leak_check_interval_mins = args.mem_leak_check_interval_mins
   stress_runner.common_query_options = common_query_options
-  stress_runner.run_queries(queries, impala, args.max_queries, args.mem_overcommit_pct,
-      not args.no_status)   # This is the value of 'should_print_status'.
+  stress_runner.run_queries(
+      queries, impala, args.max_queries, args.mem_overcommit_pct,
+      should_print_status=not args.no_status,
+      verify_results=not args.generate_dml_queries,
+      select_probability=args.select_probability)
+
 
 if __name__ == "__main__":
   main()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2159beee/tests/util/parse_util.py
----------------------------------------------------------------------
diff --git a/tests/util/parse_util.py b/tests/util/parse_util.py
index 207f4b5..ad40b68 100644
--- a/tests/util/parse_util.py
+++ b/tests/util/parse_util.py
@@ -52,7 +52,7 @@ def parse_mem_to_mb(mem, units):
   mem = float(mem)
   if mem <= 0:
     return
-  units = units.strip().upper()
+  units = units.strip().upper() if units else ""
   if units.endswith("B"):
     units = units[:-1]
   if not units:


[04/15] incubator-impala git commit: IMPALA-4640: Fix number of rows displayed by parquet-reader tool

Posted by ta...@apache.org.
IMPALA-4640: Fix number of rows displayed by parquet-reader tool

The variable just never got updated in the code. This change also adds
verification that all columns contain the same number of rows.

Change-Id: I281a784a0aa2df4ed1852dfb864587a0c1aa4d9a
Reviewed-on: http://gerrit.cloudera.org:8080/5453
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c40958f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c40958f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c40958f3

Branch: refs/heads/hadoop-next
Commit: c40958f3ff2bbc27b17c9e6acb3c2a69442f6aeb
Parents: c2faf4a
Author: Lars Volker <lv...@cloudera.com>
Authored: Fri Dec 9 16:13:07 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Fri Dec 16 08:07:07 2016 +0000

----------------------------------------------------------------------
 be/src/util/parquet-reader.cc | 29 +++++++++++++++++++----------
 1 file changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c40958f3/be/src/util/parquet-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index e6a9084..d48ef16 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -148,8 +148,8 @@ class ParquetLevelReader : public impala::RleDecoder {
 //     def levels - with our RLE scheme it is not possible to determine how many values
 //     were actually written if the final run is a literal run, only if the final run is
 //     a repeated run (see util/rle-encoding.h for more details).
-void CheckDataPage(const ColumnChunk& col, const PageHeader& header,
-    const uint8_t* page) {
+// Returns the number of rows specified by the header.
+int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_t* page) {
   const uint8_t* data = page;
   std::vector<uint8_t> decompressed_buffer;
   if (col.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED) {
@@ -191,6 +191,8 @@ void CheckDataPage(const ColumnChunk& col, const PageHeader& header,
       }
     }
   }
+
+  return header.data_page_header.num_values;
 }
 
 // Simple utility to read parquet files on local disk.  This utility validates the
@@ -248,12 +250,14 @@ int main(int argc, char** argv) {
   int total_page_header_size = 0;
   int total_compressed_data_size = 0;
   int total_uncompressed_data_size = 0;
-  vector<int> column_sizes;
+  vector<int> column_byte_sizes;
+  vector<int> column_num_rows;
 
   for (int i = 0; i < file_metadata.row_groups.size(); ++i) {
     cerr << "Reading row group " << i << endl;
     RowGroup& rg = file_metadata.row_groups[i];
-    column_sizes.resize(rg.columns.size());
+    column_byte_sizes.resize(rg.columns.size());
+    column_num_rows.resize(rg.columns.size());
 
     for (int c = 0; c < rg.columns.size(); ++c) {
       cerr << "  Reading column " << c << endl;
@@ -278,18 +282,23 @@ int main(int argc, char** argv) {
         }
 
         data += header_size;
-        if (header.__isset.data_page_header) CheckDataPage(col, header, data);
+        if (header.__isset.data_page_header) {
+          column_num_rows[c] += CheckDataPage(col, header, data);
+        }
 
         total_page_header_size += header_size;
-        column_sizes[c] += header.compressed_page_size;
+        column_byte_sizes[c] += header.compressed_page_size;
         total_compressed_data_size += header.compressed_page_size;
         total_uncompressed_data_size += header.uncompressed_page_size;
         data += header.compressed_page_size;
         ++pages_read;
       }
-      // Check that we ended exactly where we should have
+      // Check that we ended exactly where we should have.
       assert(data == col_end);
+      // Check that all cols have the same number of rows.
+      assert(column_num_rows[0] == column_num_rows[c]);
     }
+    num_rows += column_num_rows[0];
   }
   double compression_ratio =
       (double)total_uncompressed_data_size / total_compressed_data_size;
@@ -306,9 +315,9 @@ int main(int argc, char** argv) {
      << "(" << (total_compressed_data_size / (double)file_len) << ")" << endl;
   ss << "  Column uncompressed size: " << total_uncompressed_data_size
      << "(" << compression_ratio << ")" << endl;
-  for (int i = 0; i < column_sizes.size(); ++i) {
-    ss << "    " << "Col " << i << ": " << column_sizes[i]
-       << "(" << (column_sizes[i] / (double)file_len) << ")" << endl;
+  for (int i = 0; i < column_byte_sizes.size(); ++i) {
+    ss << "    " << "Col " << i << ": " << column_byte_sizes[i]
+       << "(" << (column_byte_sizes[i] / (double)file_len) << ")" << endl;
   }
   cerr << ss.str() << endl;
 


[09/15] incubator-impala git commit: Set version of CDH dependencies to cdh5.11.0-SNAPSHOT.

Posted by ta...@apache.org.
Set version of CDH dependencies to cdh5.11.0-SNAPSHOT.

This change enables us to run our integration Jenkins job.

The latest binaries have been uploaded to S3.
A private build with a full data load succeeded.

Change-Id: I6b23fb1e4c041c377725a22e313d8bf747205e31
Reviewed-on: http://gerrit.cloudera.org:8080/5540
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c03d398d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c03d398d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c03d398d

Branch: refs/heads/hadoop-next
Commit: c03d398da63139b2448fb67a9721fb3a8a571ded
Parents: 2159bee
Author: Alex Behm <al...@cloudera.com>
Authored: Thu Dec 15 17:46:49 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Dec 20 05:58:36 2016 +0000

----------------------------------------------------------------------
 bin/impala-config.sh | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c03d398d/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index a9dd90b..c9b822d 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -294,11 +294,11 @@ if [[ $OSTYPE == "darwin"* ]]; then
   IMPALA_THRIFT_JAVA_VERSION=0.9.2
 fi
 
-export IMPALA_HADOOP_VERSION=${IMPALA_HADOOP_VERSION:-2.6.0-cdh5.10.0-SNAPSHOT}
-export IMPALA_HBASE_VERSION=${IMPALA_HBASE_VERSION:-1.2.0-cdh5.10.0-SNAPSHOT}
-export IMPALA_HIVE_VERSION=${IMPALA_HIVE_VERSION:-1.1.0-cdh5.10.0-SNAPSHOT}
-export IMPALA_SENTRY_VERSION=${IMPALA_SENTRY_VERSION:-1.5.1-cdh5.10.0-SNAPSHOT}
-export IMPALA_PARQUET_VERSION=${IMPALA_PARQUET_VERSION:-1.5.0-cdh5.10.0-SNAPSHOT}
+export IMPALA_HADOOP_VERSION=${IMPALA_HADOOP_VERSION:-2.6.0-cdh5.11.0-SNAPSHOT}
+export IMPALA_HBASE_VERSION=${IMPALA_HBASE_VERSION:-1.2.0-cdh5.11.0-SNAPSHOT}
+export IMPALA_HIVE_VERSION=${IMPALA_HIVE_VERSION:-1.1.0-cdh5.11.0-SNAPSHOT}
+export IMPALA_SENTRY_VERSION=${IMPALA_SENTRY_VERSION:-1.5.1-cdh5.11.0-SNAPSHOT}
+export IMPALA_PARQUET_VERSION=${IMPALA_PARQUET_VERSION:-1.5.0-cdh5.11.0-SNAPSHOT}
 export IMPALA_LLAMA_MINIKDC_VERSION=${IMPALA_LLAMA_MINIKDC_VERSION:-1.0.0}
 
 export IMPALA_FE_DIR="$IMPALA_HOME/fe"


[07/15] incubator-impala git commit: IMPALA-4163: Add sortby() query hint

Posted by ta...@apache.org.
IMPALA-4163: Add sortby() query hint

This change introduces the sortby() query plan hint for insert
statements. When specified, sortby(a, b) will add an additional sort
step to the plan to order data by columns a, b before inserting it into
the target table.

Change-Id: I37a3ffab99aaa5d5a4fd1ac674b3e8b394a3c4c0
Reviewed-on: http://gerrit.cloudera.org:8080/5051
Reviewed-by: Marcel Kornacker <ma...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ce9b332e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ce9b332e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ce9b332e

Branch: refs/heads/hadoop-next
Commit: ce9b332ee9e640d79c8ae35e7abb8c7d787ddf78
Parents: 68131b3
Author: Lars Volker <lv...@cloudera.com>
Authored: Tue Nov 8 14:03:59 2016 +0100
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Dec 17 05:37:43 2016 +0000

----------------------------------------------------------------------
 fe/src/main/cup/sql-parser.cup                  | 65 ++++++++------
 .../org/apache/impala/analysis/InsertStmt.java  | 90 ++++++++++++++++----
 .../org/apache/impala/analysis/PlanHint.java    | 75 ++++++++++++++++
 .../org/apache/impala/analysis/SelectList.java  | 24 +++---
 .../org/apache/impala/analysis/TableRef.java    | 51 ++++++-----
 .../org/apache/impala/analysis/ToSqlUtils.java  |  5 +-
 .../java/org/apache/impala/planner/Planner.java | 29 ++++---
 fe/src/main/jflex/sql-scanner.flex              | 51 +++++++----
 .../impala/analysis/AnalyzeStmtsTest.java       | 51 +++++++++--
 .../org/apache/impala/analysis/ParserTest.java  | 84 ++++++++++++------
 .../org/apache/impala/analysis/ToSqlTest.java   |  4 +-
 .../queries/PlannerTest/insert.test             | 73 ++++++++++++++++
 .../queries/QueryTest/insert.test               | 26 ++++++
 13 files changed, 489 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ce9b332e/fe/src/main/cup/sql-parser.cup
----------------------------------------------------------------------
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 012da42..33f0591 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -276,10 +276,10 @@ terminal COLON, SEMICOLON, COMMA, DOT, DOTDOTDOT, STAR, LPAREN, RPAREN, LBRACKET
 terminal BITAND, BITOR, BITXOR, BITNOT;
 terminal EQUAL, NOT, NOTEQUAL, LESSTHAN, GREATERTHAN;
 terminal FACTORIAL; // Placeholder terminal for postfix factorial operator
+terminal COMMENTED_PLAN_HINT_START, COMMENTED_PLAN_HINT_END;
 terminal String IDENT;
 terminal String EMPTY_IDENT;
 terminal String NUMERIC_OVERFLOW;
-terminal String COMMENTED_PLAN_HINTS;
 terminal BigDecimal INTEGER_LITERAL;
 terminal BigDecimal DECIMAL_LITERAL;
 terminal String STRING_LITERAL;
@@ -367,7 +367,8 @@ nonterminal TableRef table_ref;
 nonterminal Subquery subquery;
 nonterminal JoinOperator join_operator;
 nonterminal opt_inner, opt_outer;
-nonterminal ArrayList<String> opt_plan_hints;
+nonterminal PlanHint plan_hint;
+nonterminal List<PlanHint> opt_plan_hints, plan_hint_list;
 nonterminal TypeDef type_def;
 nonterminal Type type;
 nonterminal Expr sign_chain_expr;
@@ -2300,41 +2301,41 @@ table_ref_list ::=
     list.add(table);
     RESULT = list;
   :}
-  | table_ref_list:list KW_CROSS KW_JOIN opt_plan_hints:hints table_ref:table
+  | table_ref_list:list KW_CROSS KW_JOIN opt_plan_hints:join_hints table_ref:table
     opt_plan_hints:table_hints
   {:
     table.setJoinOp(JoinOperator.CROSS_JOIN);
     // We will throw an AnalysisException if there are join hints so that we can provide
     // a better error message than a parser exception.
-    table.setJoinHints(hints);
+    table.setJoinHints(join_hints);
     table.setTableHints(table_hints);
     list.add(table);
     RESULT = list;
   :}
-  | table_ref_list:list join_operator:op opt_plan_hints:hints table_ref:table
+  | table_ref_list:list join_operator:op opt_plan_hints:join_hints table_ref:table
     opt_plan_hints:table_hints
   {:
     table.setJoinOp((JoinOperator) op);
-    table.setJoinHints(hints);
+    table.setJoinHints(join_hints);
     table.setTableHints(table_hints);
     list.add(table);
     RESULT = list;
   :}
-  | table_ref_list:list join_operator:op opt_plan_hints:hints table_ref:table
+  | table_ref_list:list join_operator:op opt_plan_hints:join_hints table_ref:table
     opt_plan_hints:table_hints KW_ON expr:e
   {:
     table.setJoinOp((JoinOperator) op);
-    table.setJoinHints(hints);
+    table.setJoinHints(join_hints);
     table.setTableHints(table_hints);
     table.setOnClause(e);
     list.add(table);
     RESULT = list;
   :}
-  | table_ref_list:list join_operator:op opt_plan_hints:hints table_ref:table
+  | table_ref_list:list join_operator:op opt_plan_hints:join_hints table_ref:table
     opt_plan_hints:table_hints KW_USING LPAREN ident_list:colNames RPAREN
   {:
     table.setJoinOp((JoinOperator) op);
-    table.setJoinHints(hints);
+    table.setJoinHints(join_hints);
     table.setTableHints(table_hints);
     table.setUsingClause(colNames);
     list.add(table);
@@ -2381,28 +2382,40 @@ opt_outer ::=
   ;
 
 opt_plan_hints ::=
-  COMMENTED_PLAN_HINTS:l
+  COMMENTED_PLAN_HINT_START plan_hint_list:hints COMMENTED_PLAN_HINT_END
+  {: RESULT = hints; :}
+  /* legacy straight_join hint style */
+  | KW_STRAIGHT_JOIN
+  {: RESULT = Lists.newArrayList(new PlanHint("straight_join")); :}
+  /* legacy plan-hint style */
+  | LBRACKET plan_hint_list:hints RBRACKET
+  {: RESULT = hints; :}
+  | /* empty */
+  {: RESULT = Lists.newArrayList(); :}
+  ;
+
+plan_hint ::=
+  KW_STRAIGHT_JOIN
+  {: RESULT = new PlanHint("straight_join"); :}
+  | IDENT:name
+  {: RESULT = new PlanHint(name); :}
+  | IDENT:name LPAREN ident_list:args RPAREN
+  {: RESULT = new PlanHint(name, args); :}
+  | /* empty */
+  {: RESULT = null; :}
+  ;
+
+plan_hint_list ::=
+  plan_hint:hint
   {:
-    ArrayList<String> hints = new ArrayList<String>();
-    String[] tokens = l.split(",");
-    for (String token: tokens) {
-      String trimmedToken = token.trim();
-      if (trimmedToken.length() > 0) hints.add(trimmedToken);
-    }
+    ArrayList<PlanHint> hints = Lists.newArrayList(hint);
     RESULT = hints;
   :}
-  /* legacy straight_join hint style */
-  | KW_STRAIGHT_JOIN
+  | plan_hint_list:hints COMMA plan_hint:hint
   {:
-    ArrayList<String> hints = new ArrayList<String>();
-    hints.add("straight_join");
+    if (hint != null) hints.add(hint);
     RESULT = hints;
   :}
-  /* legacy plan-hint style */
-  | LBRACKET ident_list:l RBRACKET
-  {: RESULT = l; :}
-  | /* empty */
-  {: RESULT = null; :}
   ;
 
 ident_list ::=

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ce9b332e/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 1dacf48..902d100 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -64,7 +64,7 @@ public class InsertStmt extends StatementBase {
   private final List<PartitionKeyValue> partitionKeyValues_;
 
   // User-supplied hints to control hash partitioning before the table sink in the plan.
-  private final List<String> planHints_;
+  private List<PlanHint> planHints_ = Lists.newArrayList();
 
   // False if the original insert statement had a query statement, true if we need to
   // auto-generate one (for insert into tbl()) during analysis.
@@ -124,6 +124,14 @@ public class InsertStmt extends StatementBase {
   // clustering step.
   private boolean hasClusteredHint_ = false;
 
+  // For every column of the target table that is referenced in the optional 'sortby()'
+  // hint, this list will contain the corresponding result expr from 'resultExprs_'.
+  // Before insertion, all rows will be sorted by these exprs. If the list is empty, no
+  // additional sorting by non-partitioning columns will be performed. For Hdfs tables,
+  // the 'sortby()' hint must not contain partition columns. For Kudu tables, it must not
+  // contain primary key columns.
+  private List<Expr> sortByExprs_ = Lists.newArrayList();
+
   // Output expressions that produce the final results to write to the target table. May
   // include casts. Set in prepareExpressions().
   // If this is an INSERT on a non-Kudu table, it will contain one Expr for all
@@ -153,19 +161,19 @@ public class InsertStmt extends StatementBase {
 
   public static InsertStmt createInsert(WithClause withClause, TableName targetTable,
       boolean overwrite, List<PartitionKeyValue> partitionKeyValues,
-      List<String> planHints, QueryStmt queryStmt, List<String> columnPermutation) {
+      List<PlanHint> planHints, QueryStmt queryStmt, List<String> columnPermutation) {
     return new InsertStmt(withClause, targetTable, overwrite, partitionKeyValues,
         planHints, queryStmt, columnPermutation, false);
   }
 
   public static InsertStmt createUpsert(WithClause withClause, TableName targetTable,
-      List<String> planHints, QueryStmt queryStmt, List<String> columnPermutation) {
+      List<PlanHint> planHints, QueryStmt queryStmt, List<String> columnPermutation) {
     return new InsertStmt(withClause, targetTable, false, null, planHints, queryStmt,
         columnPermutation, true);
   }
 
   protected InsertStmt(WithClause withClause, TableName targetTable, boolean overwrite,
-      List<PartitionKeyValue> partitionKeyValues, List<String> planHints,
+      List<PartitionKeyValue> partitionKeyValues, List<PlanHint> planHints,
       QueryStmt queryStmt, List<String> columnPermutation, boolean isUpsert) {
     Preconditions.checkState(!isUpsert || (!overwrite && partitionKeyValues == null));
     withClause_ = withClause;
@@ -173,7 +181,7 @@ public class InsertStmt extends StatementBase {
     originalTableName_ = targetTableName_;
     overwrite_ = overwrite;
     partitionKeyValues_ = partitionKeyValues;
-    planHints_ = planHints;
+    planHints_ = (planHints != null) ? planHints : new ArrayList<PlanHint>();
     queryStmt_ = queryStmt;
     needsGeneratedQueryStatement_ = (queryStmt == null);
     columnPermutation_ = columnPermutation;
@@ -210,6 +218,7 @@ public class InsertStmt extends StatementBase {
     hasShuffleHint_ = false;
     hasNoShuffleHint_ = false;
     hasClusteredHint_ = false;
+    sortByExprs_.clear();
     resultExprs_.clear();
     mentionedColumns_.clear();
     primaryKeyExprs_.clear();
@@ -729,25 +738,27 @@ public class InsertStmt extends StatementBase {
   }
 
   private void analyzePlanHints(Analyzer analyzer) throws AnalysisException {
-    if (planHints_ == null) return;
     if (!planHints_.isEmpty() && table_ instanceof HBaseTable) {
-      throw new AnalysisException("INSERT hints are only supported for inserting into " +
-          "Hdfs and Kudu tables.");
+      throw new AnalysisException(String.format("INSERT hints are only supported for " +
+          "inserting into Hdfs and Kudu tables: %s", getTargetTableName()));
     }
     boolean hasNoClusteredHint = false;
-    for (String hint: planHints_) {
-      if (hint.equalsIgnoreCase("SHUFFLE")) {
+    for (PlanHint hint: planHints_) {
+      if (hint.is("SHUFFLE")) {
         hasShuffleHint_ = true;
         analyzer.setHasPlanHints();
-      } else if (hint.equalsIgnoreCase("NOSHUFFLE")) {
+      } else if (hint.is("NOSHUFFLE")) {
         hasNoShuffleHint_ = true;
         analyzer.setHasPlanHints();
-      } else if (hint.equalsIgnoreCase("CLUSTERED")) {
+      } else if (hint.is("CLUSTERED")) {
         hasClusteredHint_ = true;
         analyzer.setHasPlanHints();
-      } else if (hint.equalsIgnoreCase("NOCLUSTERED")) {
+      } else if (hint.is("NOCLUSTERED")) {
         hasNoClusteredHint = true;
         analyzer.setHasPlanHints();
+      } else if (hint.is("SORTBY")) {
+        analyzeSortByHint(hint);
+        analyzer.setHasPlanHints();
       } else {
         analyzer.addWarning("INSERT hint not recognized: " + hint);
       }
@@ -761,6 +772,51 @@ public class InsertStmt extends StatementBase {
     }
   }
 
+  private void analyzeSortByHint(PlanHint hint) throws AnalysisException {
+    // HBase tables don't support insert hints at all (must be enforced by the caller).
+    Preconditions.checkState(!(table_ instanceof HBaseTable));
+
+    if (isUpsert_) {
+      throw new AnalysisException("SORTBY hint is not supported in UPSERT statements.");
+    }
+
+    List<String> columnNames = hint.getArgs();
+    Preconditions.checkState(!columnNames.isEmpty());
+    for (String columnName: columnNames) {
+      // Make sure it's not a Kudu primary key column or Hdfs partition column.
+      if (table_ instanceof KuduTable) {
+        KuduTable kuduTable = (KuduTable) table_;
+        if (kuduTable.isPrimaryKeyColumn(columnName)) {
+          throw new AnalysisException(String.format("SORTBY hint column list must not " +
+              "contain Kudu primary key column: '%s'", columnName));
+        }
+      } else {
+        for (Column tableColumn: table_.getClusteringColumns()) {
+          if (tableColumn.getName().equals(columnName)) {
+            throw new AnalysisException(String.format("SORTBY hint column list must " +
+                "not contain Hdfs partition column: '%s'", columnName));
+          }
+        }
+      }
+
+      // Find the matching column in the target table's column list (by name) and store
+      // the corresponding result expr in sortByExprs_.
+      boolean foundColumn = false;
+      List<Column> columns = table_.getNonClusteringColumns();
+      for (int i = 0; i < columns.size(); ++i) {
+        if (columns.get(i).getName().equals(columnName)) {
+          sortByExprs_.add(resultExprs_.get(i));
+          foundColumn = true;
+          break;
+        }
+      }
+      if (!foundColumn) {
+        throw new AnalysisException(String.format("Could not find SORTBY hint column " +
+            "'%s' in table.", columnName));
+      }
+    }
+  }
+
   @Override
   public ArrayList<Expr> getResultExprs() { return resultExprs_; }
 
@@ -772,7 +828,7 @@ public class InsertStmt extends StatementBase {
 
   private String getOpName() { return isUpsert_ ? "UPSERT" : "INSERT"; }
 
-  public List<String> getPlanHints() { return planHints_; }
+  public List<PlanHint> getPlanHints() { return planHints_; }
   public TableName getTargetTableName() { return targetTableName_; }
   public Table getTargetTable() { return table_; }
   public void setTargetTable(Table table) { this.table_ = table; }
@@ -788,6 +844,7 @@ public class InsertStmt extends StatementBase {
   public boolean hasNoShuffleHint() { return hasNoShuffleHint_; }
   public boolean hasClusteredHint() { return hasClusteredHint_; }
   public ArrayList<Expr> getPrimaryKeyExprs() { return primaryKeyExprs_; }
+  public List<Expr> getSortByExprs() { return sortByExprs_; }
 
   public List<String> getMentionedColumns() {
     List<String> result = Lists.newArrayList();
@@ -812,6 +869,7 @@ public class InsertStmt extends StatementBase {
     resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
     partitionKeyExprs_ = Expr.substituteList(partitionKeyExprs_, smap, analyzer, true);
     primaryKeyExprs_ = Expr.substituteList(primaryKeyExprs_, smap, analyzer, true);
+    sortByExprs_ = Expr.substituteList(sortByExprs_, smap, analyzer, true);
   }
 
   @Override
@@ -840,8 +898,8 @@ public class InsertStmt extends StatementBase {
       }
       strBuilder.append(" PARTITION (" + Joiner.on(", ").join(values) + ")");
     }
-    if (planHints_ != null) {
-      strBuilder.append(" " + ToSqlUtils.getPlanHintsSql(planHints_));
+    if (!planHints_.isEmpty()) {
+      strBuilder.append(" " + ToSqlUtils.getPlanHintsSql(getPlanHints()));
     }
     if (!needsGeneratedQueryStatement_) {
       strBuilder.append(" " + queryStmt_.toSql());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ce9b332e/fe/src/main/java/org/apache/impala/analysis/PlanHint.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/PlanHint.java b/fe/src/main/java/org/apache/impala/analysis/PlanHint.java
new file mode 100644
index 0000000..d16919f
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/PlanHint.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.impala.analysis;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Class to parse and store query plan hints, which can occur in various places inside SQL
+ * query statements. A hint consist of a name and an optional list of arguments.
+ */
+public class PlanHint {
+  /// The plan hint name.
+  private final String name_;
+
+  /// Optional list of arguments.
+  private final List<String> args_;
+
+  public PlanHint(String name) {
+    name_ = name;
+    args_ = Lists.newArrayList();
+  }
+
+  public PlanHint(String name, List<String> args) {
+    name_ = name;
+    args_ = args;
+  }
+
+  /// Check whether this hint equals to a given string, ignoring case.
+  public boolean is(String s) { return name_.equalsIgnoreCase(s); }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null) return false;
+    if (getClass() != o.getClass()) return false;
+    PlanHint oh = (PlanHint) o;
+    return name_.equals(oh.name_) && args_.equals(oh.args_);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(name_);
+    if (!args_.isEmpty()) {
+      sb.append("(");
+      sb.append(Joiner.on(",").join(args_));
+      sb.append(")");
+    }
+    return sb.toString();
+  }
+
+  public List<String> getArgs() { return args_; }
+  public String toSql() { return toString(); }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ce9b332e/fe/src/main/java/org/apache/impala/analysis/SelectList.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectList.java b/fe/src/main/java/org/apache/impala/analysis/SelectList.java
index 4c504bf..d7f12ff 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectList.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectList.java
@@ -22,13 +22,14 @@ import java.util.List;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.rewrite.ExprRewriter;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
  * Select list items plus optional distinct clause and optional plan hints.
  */
 public class SelectList {
-  private List<String> planHints_;
+  private List<PlanHint> planHints_ = Lists.newArrayList();;
   private boolean isDistinct_;
 
   /////////////////////////////////////////
@@ -50,7 +51,7 @@ public class SelectList {
   }
 
   public SelectList(List<SelectListItem> items, boolean isDistinct,
-      List<String> planHints) {
+      List<PlanHint> planHints) {
     isDistinct_ = isDistinct;
     items_ = items;
     planHints_ = planHints;
@@ -60,8 +61,7 @@ public class SelectList {
    * C'tor for cloning.
    */
   public SelectList(SelectList other) {
-    planHints_ =
-        (other.planHints_ != null) ? Lists.newArrayList(other.planHints_) : null;
+    planHints_ = Lists.newArrayList(other.planHints_);
     items_ = Lists.newArrayList();
     for (SelectListItem item: other.items_) {
       items_.add(item.clone());
@@ -70,16 +70,20 @@ public class SelectList {
   }
 
   public List<SelectListItem> getItems() { return items_; }
-  public void setPlanHints(List<String> planHints) { planHints_ = planHints; }
-  public List<String> getPlanHints() { return planHints_; }
+
+  public void setPlanHints(List<PlanHint> planHints) {
+    Preconditions.checkNotNull(planHints);
+    planHints_ = planHints;
+  }
+
+  public List<PlanHint> getPlanHints() { return planHints_; }
   public boolean isDistinct() { return isDistinct_; }
   public void setIsDistinct(boolean value) { isDistinct_ = value; }
-  public boolean hasPlanHints() { return planHints_ != null; }
+  public boolean hasPlanHints() { return !planHints_.isEmpty(); }
 
   public void analyzePlanHints(Analyzer analyzer) {
-    if (planHints_ == null) return;
-    for (String hint: planHints_) {
-      if (!hint.equalsIgnoreCase("straight_join")) {
+    for (PlanHint hint: planHints_) {
+      if (!hint.is("straight_join")) {
         analyzer.addWarning("PLAN hint not recognized: " + hint);
       }
       analyzer.setIsStraightJoin();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ce9b332e/fe/src/main/java/org/apache/impala/analysis/TableRef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableRef.java b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
index d6bbfd2..975b70b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
@@ -82,10 +82,10 @@ public class TableRef implements ParseNode {
   protected final Privilege priv_;
 
   protected JoinOperator joinOp_;
-  protected ArrayList<String> joinHints_;
+  protected List<PlanHint> joinHints_ = Lists.newArrayList();
   protected List<String> usingColNames_;
 
-  protected ArrayList<String> tableHints_;
+  protected List<PlanHint> tableHints_ = Lists.newArrayList();
   protected TReplicaPreference replicaPreference_;
   protected boolean randomReplica_;
 
@@ -156,13 +156,11 @@ public class TableRef implements ParseNode {
     hasExplicitAlias_ = other.hasExplicitAlias_;
     priv_ = other.priv_;
     joinOp_ = other.joinOp_;
-    joinHints_ =
-        (other.joinHints_ != null) ? Lists.newArrayList(other.joinHints_) : null;
+    joinHints_ = Lists.newArrayList(other.joinHints_);
     onClause_ = (other.onClause_ != null) ? other.onClause_.clone() : null;
     usingColNames_ =
         (other.usingColNames_ != null) ? Lists.newArrayList(other.usingColNames_) : null;
-    tableHints_ =
-        (other.tableHints_ != null) ? Lists.newArrayList(other.tableHints_) : null;
+    tableHints_ = Lists.newArrayList(other.tableHints_);
     replicaPreference_ = other.replicaPreference_;
     randomReplica_ = other.randomReplica_;
     distrMode_ = other.distrMode_;
@@ -262,8 +260,8 @@ public class TableRef implements ParseNode {
     return resolvedPath_.getRootTable();
   }
   public Privilege getPrivilege() { return priv_; }
-  public ArrayList<String> getJoinHints() { return joinHints_; }
-  public ArrayList<String> getTableHints() { return tableHints_; }
+  public List<PlanHint> getJoinHints() { return joinHints_; }
+  public List<PlanHint> getTableHints() { return tableHints_; }
   public Expr getOnClause() { return onClause_; }
   public List<String> getUsingClause() { return usingColNames_; }
   public void setJoinOp(JoinOperator op) { this.joinOp_ = op; }
@@ -271,12 +269,23 @@ public class TableRef implements ParseNode {
   public void setUsingClause(List<String> colNames) { this.usingColNames_ = colNames; }
   public TableRef getLeftTblRef() { return leftTblRef_; }
   public void setLeftTblRef(TableRef leftTblRef) { this.leftTblRef_ = leftTblRef; }
-  public void setJoinHints(ArrayList<String> hints) { this.joinHints_ = hints; }
-  public void setTableHints(ArrayList<String> hints) { this.tableHints_ = hints; }
+
+  public void setJoinHints(List<PlanHint> hints) {
+    Preconditions.checkNotNull(hints);
+    joinHints_ = hints;
+  }
+
+  public void setTableHints(List<PlanHint> hints) {
+    Preconditions.checkNotNull(hints);
+    tableHints_ = hints;
+  }
+
   public boolean isBroadcastJoin() { return distrMode_ == DistributionMode.BROADCAST; }
+
   public boolean isPartitionedJoin() {
     return distrMode_ == DistributionMode.PARTITIONED;
   }
+
   public DistributionMode getDistributionMode() { return distrMode_; }
   public List<TupleId> getCorrelatedTupleIds() { return correlatedTupleIds_; }
   public boolean isAnalyzed() { return isAnalyzed_; }
@@ -336,7 +345,7 @@ public class TableRef implements ParseNode {
   }
 
   private void analyzeTableHints(Analyzer analyzer) {
-    if (tableHints_ == null) return;
+    if (tableHints_.isEmpty()) return;
     if (!(this instanceof BaseTableRef)) {
       analyzer.addWarning("Table hints not supported for inline view and collections");
       return;
@@ -347,17 +356,17 @@ public class TableRef implements ParseNode {
         !(getResolvedPath().destTable() instanceof HdfsTable)) {
       analyzer.addWarning("Table hints only supported for Hdfs tables");
     }
-    for (String hint: tableHints_) {
-      if (hint.equalsIgnoreCase("SCHEDULE_CACHE_LOCAL")) {
+    for (PlanHint hint: tableHints_) {
+      if (hint.is("SCHEDULE_CACHE_LOCAL")) {
         analyzer.setHasPlanHints();
         replicaPreference_ = TReplicaPreference.CACHE_LOCAL;
-      } else if (hint.equalsIgnoreCase("SCHEDULE_DISK_LOCAL")) {
+      } else if (hint.is("SCHEDULE_DISK_LOCAL")) {
         analyzer.setHasPlanHints();
         replicaPreference_ = TReplicaPreference.DISK_LOCAL;
-      } else if (hint.equalsIgnoreCase("SCHEDULE_REMOTE")) {
+      } else if (hint.is("SCHEDULE_REMOTE")) {
         analyzer.setHasPlanHints();
         replicaPreference_ = TReplicaPreference.REMOTE;
-      } else if (hint.equalsIgnoreCase("SCHEDULE_RANDOM_REPLICA")) {
+      } else if (hint.is("SCHEDULE_RANDOM_REPLICA")) {
         analyzer.setHasPlanHints();
         randomReplica_ = true;
       } else {
@@ -369,9 +378,9 @@ public class TableRef implements ParseNode {
   }
 
   private void analyzeJoinHints(Analyzer analyzer) throws AnalysisException {
-    if (joinHints_ == null) return;
-    for (String hint: joinHints_) {
-      if (hint.equalsIgnoreCase("BROADCAST")) {
+    if (joinHints_.isEmpty()) return;
+    for (PlanHint hint: joinHints_) {
+      if (hint.is("BROADCAST")) {
         if (joinOp_ == JoinOperator.RIGHT_OUTER_JOIN
             || joinOp_ == JoinOperator.FULL_OUTER_JOIN
             || joinOp_ == JoinOperator.RIGHT_SEMI_JOIN
@@ -384,7 +393,7 @@ public class TableRef implements ParseNode {
         }
         distrMode_ = DistributionMode.BROADCAST;
         analyzer.setHasPlanHints();
-      } else if (hint.equalsIgnoreCase("SHUFFLE")) {
+      } else if (hint.is("SHUFFLE")) {
         if (joinOp_ == JoinOperator.CROSS_JOIN) {
           throw new AnalysisException("CROSS JOIN does not support SHUFFLE.");
         }
@@ -545,7 +554,7 @@ public class TableRef implements ParseNode {
     }
 
     StringBuilder output = new StringBuilder(" " + joinOp_.toString() + " ");
-    if(joinHints_ != null) output.append(ToSqlUtils.getPlanHintsSql(joinHints_) + " ");
+    if(!joinHints_.isEmpty()) output.append(ToSqlUtils.getPlanHintsSql(joinHints_) + " ");
     output.append(tableRefToSql());
     if (usingColNames_ != null) {
       output.append(" USING (").append(Joiner.on(", ").join(usingColNames_)).append(")");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ce9b332e/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index be4ab6f..35f7e79 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -389,8 +389,9 @@ public class ToSqlUtils {
    * commented plan hint style such that hinted views created by Impala are readable by
    * Hive (parsed as a comment by Hive).
    */
-  public static String getPlanHintsSql(List<String> hints) {
-    if (hints == null || hints.isEmpty()) return "";
+  public static String getPlanHintsSql(List<PlanHint> hints) {
+    Preconditions.checkNotNull(hints);
+    if (hints.isEmpty()) return "";
     StringBuilder sb = new StringBuilder();
     sb.append("\n-- +");
     sb.append(Joiner.on(",").join(hints));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ce9b332e/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 3369686..297e9b2 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -144,7 +144,7 @@ public class Planner {
             rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments);
       }
       // Add optional sort node to the plan, based on clustered/noclustered plan hint.
-      createClusteringSort(insertStmt, rootFragment, ctx_.getRootAnalyzer());
+      createPreInsertSort(insertStmt, rootFragment, ctx_.getRootAnalyzer());
       // set up table sink for root fragment
       rootFragment.setSink(insertStmt.createDataSink());
       resultExprs = insertStmt.getResultExprs();
@@ -512,21 +512,26 @@ public class Planner {
   }
 
   /**
-   * Insert a sort node on top of the plan, depending on the clustered/noclustered plan
-   * hint. This will sort the data produced by 'inputFragment' by the clustering columns
-   * (key columns for Kudu tables), so that partitions can be written sequentially in the
-   * table sink.
+   * Insert a sort node on top of the plan, depending on the clustered/noclustered/sortby
+   * plan hint. If clustering is enabled in insertStmt, then the ordering columns will
+   * start with the clustering columns (key columns for Kudu tables), so that partitions
+   * can be written sequentially in the table sink. Any additional non-clustering columns
+   * specified by the sortby hint will be added to the ordering columns and after any
+   * clustering columns. If neither clustering nor a sortby hint are specified, then no
+   * sort node will be added to the plan.
    */
-  public void createClusteringSort(InsertStmt insertStmt, PlanFragment inputFragment,
+  public void createPreInsertSort(InsertStmt insertStmt, PlanFragment inputFragment,
        Analyzer analyzer) throws ImpalaException {
-    if (!insertStmt.hasClusteredHint()) return;
+    List<Expr> orderingExprs = Lists.newArrayList();
 
-    List<Expr> orderingExprs;
-    if (insertStmt.getTargetTable() instanceof KuduTable) {
-      orderingExprs = Lists.newArrayList(insertStmt.getPrimaryKeyExprs());
-    } else {
-      orderingExprs = Lists.newArrayList(insertStmt.getPartitionKeyExprs());
+    if (insertStmt.hasClusteredHint()) {
+      if (insertStmt.getTargetTable() instanceof KuduTable) {
+        orderingExprs.addAll(insertStmt.getPrimaryKeyExprs());
+      } else {
+        orderingExprs.addAll(insertStmt.getPartitionKeyExprs());
+      }
     }
+    orderingExprs.addAll(insertStmt.getSortByExprs());
     // Ignore constants for the sake of clustering.
     Expr.removeConstants(orderingExprs);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ce9b332e/fe/src/main/jflex/sql-scanner.flex
----------------------------------------------------------------------
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 982e9a2..6d1a773 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -301,7 +301,6 @@ import org.apache.impala.analysis.SqlParserSymbols;
 %}
 
 LineTerminator = \r|\n|\r\n
-NonTerminator = [^\r\n]
 Whitespace = {LineTerminator} | [ \t\f]
 
 // Order of rules to resolve ambiguity:
@@ -321,14 +320,30 @@ QuotedIdentifier = \`(\\.|[^\\\`])*\`
 SingleQuoteStringLiteral = \'(\\.|[^\\\'])*\'
 DoubleQuoteStringLiteral = \"(\\.|[^\\\"])*\"
 
+EolHintBegin = "--" " "* "+"
+CommentedHintBegin = "/*" " "* "+"
+CommentedHintEnd = "*/"
+
 // Both types of plan hints must appear within a single line.
-TraditionalCommentedPlanHints = "/*" [ ]* "+" [^\r\n*]* "*/"
-// Must end with a line terminator.
-EndOfLineCommentedPlanHints = "--" [ ]* "+" {NonTerminator}* {LineTerminator}
+HintContent = " "* "+" [^\r\n]*
 
 Comment = {TraditionalComment} | {EndOfLineComment}
-TraditionalComment = "/*" ~"*/"
-EndOfLineComment = "--" {NonTerminator}* {LineTerminator}?
+
+// Match anything that has a comment end (*/) in it.
+ContainsCommentEnd = [^]* "*/" [^]*
+// Match anything that has a line terminator in it.
+ContainsLineTerminator = [^]* {LineTerminator} [^]*
+
+// A traditional comment is anything that starts and ends like a comment and has neither a
+// plan hint inside nor a CommentEnd (*/).
+TraditionalComment = "/*" !({HintContent}|{ContainsCommentEnd}) "*/"
+// Similar for a end-of-line comment.
+EndOfLineComment = "--" !({HintContent}|{ContainsLineTerminator}) {LineTerminator}?
+
+// This additional state is needed because newlines signal the end of a end-of-line hint
+// if one has been started earlier. Hence we need to discern between newlines within and
+// outside of end-of-line hints.
+%state EOLHINT
 
 %%
 // Put '...' before '.'
@@ -412,18 +427,22 @@ EndOfLineComment = "--" {NonTerminator}* {LineTerminator}?
   return newToken(SqlParserSymbols.STRING_LITERAL, yytext().substring(1, yytext().length()-1));
 }
 
-{TraditionalCommentedPlanHints} {
-  String text = yytext();
-  // Remove everything before the first '+' as well as the trailing "*/"
-  String hintStr = text.substring(text.indexOf('+') + 1, text.length() - 2);
-  return newToken(SqlParserSymbols.COMMENTED_PLAN_HINTS, hintStr.trim());
+{CommentedHintBegin} {
+  return newToken(SqlParserSymbols.COMMENTED_PLAN_HINT_START, null);
 }
 
-{EndOfLineCommentedPlanHints} {
-  String text = yytext();
-  // Remove everything before the first '+'
-  String hintStr = text.substring(text.indexOf('+') + 1);
-  return newToken(SqlParserSymbols.COMMENTED_PLAN_HINTS, hintStr.trim());
+{CommentedHintEnd} {
+  return newToken(SqlParserSymbols.COMMENTED_PLAN_HINT_END, null);
+}
+
+{EolHintBegin} {
+  yybegin(EOLHINT);
+  return newToken(SqlParserSymbols.COMMENTED_PLAN_HINT_START, null);
+}
+
+<EOLHINT> {LineTerminator} {
+  yybegin(YYINITIAL);
+  return newToken(SqlParserSymbols.COMMENTED_PLAN_HINT_END, null);
 }
 
 {Comment} { /* ignore */ }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ce9b332e/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 11f6842..9b641a0 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -1604,14 +1604,6 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
           String.format("select * from functional.alltypes a join %sbadhint%s " +
               "functional.alltypes b using (int_col)", prefix, suffix),
           "JOIN hint not recognized: badhint");
-      // Hints must be comma separated. Legacy-style hint does not parse because
-      // of space-separated identifiers.
-      if (!prefix.contains("[")) {
-        AnalyzesOk(String.format(
-            "select * from functional.alltypes a join %sbroadcast broadcast%s " +
-                "functional.alltypes b using (int_col)", prefix, suffix),
-            "JOIN hint not recognized: broadcast broadcast");
-      }
       AnalysisError(
           String.format("select * from functional.alltypes a cross join %sshuffle%s " +
           "functional.alltypes b", prefix, suffix),
@@ -1744,7 +1736,8 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
       AnalysisError(String.format(
           "insert into table functional_hbase.alltypes %sshuffle%s " +
           "select * from functional_hbase.alltypes", prefix, suffix),
-          "INSERT hints are only supported for inserting into Hdfs and Kudu tables.");
+          "INSERT hints are only supported for inserting into Hdfs and Kudu tables: " +
+          "functional_hbase.alltypes");
       // Conflicting plan hints.
       AnalysisError("insert into table functional.alltypessmall " +
           "partition (year, month) /* +shuffle,noshuffle */ " +
@@ -1766,6 +1759,46 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
           "insert into table functional.alltypessmall partition (year, month) " +
           "/* +clustered,noclustered */ select * from functional.alltypes", prefix,
           suffix), "Conflicting INSERT hints: clustered and noclustered");
+
+      // Below are tests for hints that are not supported by the legacy syntax.
+      if (prefix == "[") continue;
+
+      // Tests for sortby hint
+      AnalyzesOk(String.format("insert into functional.alltypessmall " +
+          "partition (year, month) %ssortby(int_col)%s select * from functional.alltypes",
+          prefix, suffix));
+      AnalyzesOk(String.format("insert into functional.alltypessmall " +
+          "partition (year, month) %sshuffle,clustered,sortby(int_col)%s select * from " +
+          "functional.alltypes", prefix, suffix));
+      AnalyzesOk(String.format("insert into functional.alltypessmall " +
+          "partition (year, month) %ssortby(int_col, bool_col)%s select * from " +
+          "functional.alltypes", prefix, suffix));
+      AnalyzesOk(String.format("insert into functional.alltypessmall " +
+          "partition (year, month) %sshuffle,clustered,sortby(int_col,bool_col)%s " +
+          "select * from functional.alltypes", prefix, suffix));
+      AnalyzesOk(String.format("insert into functional.alltypessmall " +
+          "partition (year, month) %sshuffle,sortby(int_col,bool_col),clustered%s " +
+          "select * from functional.alltypes", prefix, suffix));
+      AnalyzesOk(String.format("insert into functional.alltypessmall " +
+          "partition (year, month) %ssortby(int_col,bool_col),shuffle,clustered%s " +
+          "select * from functional.alltypes", prefix, suffix));
+      // Column in sortby hint must exist.
+      AnalysisError(String.format("insert into functional.alltypessmall " +
+          "partition (year, month) %ssortby(foo)%s select * from functional.alltypes",
+          prefix, suffix), "Could not find SORTBY hint column 'foo' in table.");
+      // Column in sortby hint must not be a Hdfs partition column.
+      AnalysisError(String.format("insert into functional.alltypessmall " +
+          "partition (year, month) %ssortby(year)%s select * from " +
+          "functional.alltypes", prefix, suffix),
+          "SORTBY hint column list must not contain Hdfs partition column: 'year'");
+      // Column in sortby hint must not be a Kudu primary key column.
+      AnalysisError(String.format("insert into functional_kudu.alltypessmall " +
+          "%ssortby(id)%s select * from functional_kudu.alltypes", prefix, suffix),
+          "SORTBY hint column list must not contain Kudu primary key column: 'id'");
+      // sortby() hint is not supported in UPSERT queries
+      AnalysisError(String.format("upsert into functional_kudu.alltypessmall " +
+          "%ssortby(id)%s select * from functional_kudu.alltypes", prefix, suffix),
+          "SORTBY hint is not supported in UPSERT statements.");
     }
 
     // Multiple non-conflicting hints and case insensitivity of hints.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ce9b332e/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index 180eabd..2b37bf3 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -209,6 +209,13 @@ public class ParserTest extends FrontendTestBase {
     ParsesOk("/* select 1; */ select 1");
     ParsesOk("/** select 1; */ select 1");
     ParsesOk("/* select */ select 1 /* 1 */");
+    ParsesOk("select 1 /* sortby(() */");
+    // Empty columns list in sortby hint
+    ParserError("select 1 /*+ sortby() */");
+    // Mismatching parentheses
+    ParserError("select 1 /*+ sortby(() */");
+    ParserError("select 1 /*+ sortby(a) \n");
+    ParserError("select 1 --+ sortby(a) */\n from t");
   }
 
   /**
@@ -230,6 +237,9 @@ public class ParserTest extends FrontendTestBase {
     ParserError("-- baz /*\nselect 1*/");
     ParsesOk("select -- blah\n 1");
     ParsesOk("select -- select 1\n 1");
+    ParsesOk("select 1 -- sortby(()");
+    // Mismatching parentheses
+    ParserError("select 1 -- +sortby(()\n");
   }
 
   /**
@@ -241,10 +251,10 @@ public class ParserTest extends FrontendTestBase {
     SelectStmt selectStmt = (SelectStmt) ParsesOk(stmt);
     Preconditions.checkState(selectStmt.getTableRefs().size() > 1);
     List<String> actualHints = Lists.newArrayList();
-    assertEquals(null, selectStmt.getTableRefs().get(0).getJoinHints());
+    assertTrue(selectStmt.getTableRefs().get(0).getJoinHints().isEmpty());
     for (int i = 1; i < selectStmt.getTableRefs().size(); ++i) {
-      List<String> hints = selectStmt.getTableRefs().get(i).getJoinHints();
-      if (hints != null) actualHints.addAll(hints);
+      List<PlanHint> hints = selectStmt.getTableRefs().get(i).getJoinHints();
+      for (PlanHint hint: hints) actualHints.add(hint.toString());
     }
     if (actualHints.isEmpty()) actualHints = Lists.newArrayList((String) null);
     assertEquals(Lists.newArrayList(expectedHints), actualHints);
@@ -255,8 +265,8 @@ public class ParserTest extends FrontendTestBase {
     Preconditions.checkState(selectStmt.getTableRefs().size() > 0);
     List<String> actualHints = Lists.newArrayList();
     for (int i = 0; i < selectStmt.getTableRefs().size(); ++i) {
-      List<String> hints = selectStmt.getTableRefs().get(i).getTableHints();
-      if (hints != null) actualHints.addAll(hints);
+      List<PlanHint> hints = selectStmt.getTableRefs().get(i).getTableHints();
+      for (PlanHint hint: hints) actualHints.add(hint.toString());
     }
     if (actualHints.isEmpty()) actualHints = Lists.newArrayList((String) null);
     assertEquals(Lists.newArrayList(expectedHints), actualHints);
@@ -267,10 +277,10 @@ public class ParserTest extends FrontendTestBase {
     Preconditions.checkState(selectStmt.getTableRefs().size() > 0);
     List<String> actualHints = Lists.newArrayList();
     for (int i = 0; i < selectStmt.getTableRefs().size(); ++i) {
-      List<String> joinHints = selectStmt.getTableRefs().get(i).getJoinHints();
-      if (joinHints != null) actualHints.addAll(joinHints);
-      List<String> tableHints = selectStmt.getTableRefs().get(i).getTableHints();
-      if (tableHints != null) actualHints.addAll(tableHints);
+      List<PlanHint> joinHints = selectStmt.getTableRefs().get(i).getJoinHints();
+      for (PlanHint hint: joinHints) actualHints.add(hint.toString());
+      List<PlanHint> tableHints = selectStmt.getTableRefs().get(i).getTableHints();
+      for (PlanHint hint: tableHints) actualHints.add(hint.toString());
     }
     if (actualHints.isEmpty()) actualHints = Lists.newArrayList((String) null);
     assertEquals(Lists.newArrayList(expectedHints), actualHints);
@@ -282,8 +292,10 @@ public class ParserTest extends FrontendTestBase {
    */
   private void TestSelectListHints(String stmt, String... expectedHints) {
     SelectStmt selectStmt = (SelectStmt) ParsesOk(stmt);
-    List<String> actualHints = selectStmt.getSelectList().getPlanHints();
-    if (actualHints == null) actualHints = Lists.newArrayList((String) null);
+    List<String> actualHints = Lists.newArrayList();
+    List<PlanHint> hints = selectStmt.getSelectList().getPlanHints();
+    for (PlanHint hint: hints) actualHints.add(hint.toString());
+    if (actualHints.isEmpty()) actualHints = Lists.newArrayList((String) null);
     assertEquals(Lists.newArrayList(expectedHints), actualHints);
   }
 
@@ -292,8 +304,10 @@ public class ParserTest extends FrontendTestBase {
    */
   private void TestInsertHints(String stmt, String... expectedHints) {
     InsertStmt insertStmt = (InsertStmt) ParsesOk(stmt);
-    List<String> actualHints = insertStmt.getPlanHints();
-    if (actualHints == null) actualHints = Lists.newArrayList((String) null);
+    List<String> actualHints = Lists.newArrayList();
+    List<PlanHint> hints = insertStmt.getPlanHints();
+    for (PlanHint hint: hints) actualHints.add(hint.toString());
+    if (actualHints.isEmpty()) actualHints = Lists.newArrayList((String) null);
     assertEquals(Lists.newArrayList(expectedHints), actualHints);
   }
 
@@ -407,22 +421,26 @@ public class ParserTest extends FrontendTestBase {
           suffix), "schedule_cache_local", "schedule_random_replica", "broadcast",
           "schedule_remote");
 
+      TestSelectListHints(String.format(
+          "select %sfoo,bar,baz%s * from functional.alltypes a", prefix, suffix),
+          "foo", "bar", "baz");
+
       // Test select-list hints (e.g., straight_join). The legacy-style hint has no
       // prefix and suffix.
-      if (prefix.contains("[")) {
-        prefix = "";
-        suffix = "";
-      }
-      TestSelectListHints(String.format(
-          "select %sstraight_join%s * from functional.alltypes a", prefix, suffix),
-          "straight_join");
-      // Only the new hint-style is recognized
-      if (!prefix.equals("")) {
+      {
+        String localPrefix = prefix;
+        String localSuffix = suffix;
+        if (prefix == "[") {
+          localPrefix = "";
+          localSuffix = "";
+        }
         TestSelectListHints(String.format(
-            "select %sfoo,bar,baz%s * from functional.alltypes a", prefix, suffix),
-            "foo", "bar", "baz");
+            "select %sstraight_join%s * from functional.alltypes a", localPrefix,
+            localSuffix), "straight_join");
       }
-      if (prefix.isEmpty()) continue;
+
+      // Below are tests for hints that are not supported by the legacy syntax.
+      if (prefix == "[") continue;
 
       // Test mixing commented hints and comments.
       for (String[] commentStyle: commentStyles) {
@@ -439,6 +457,22 @@ public class ParserTest extends FrontendTestBase {
         TestSelectListHints(query, "straight_join");
         TestJoinHints(query, "shuffle");
       }
+
+      // Tests for hints with arguments.
+      TestInsertHints(String.format(
+          "insert into t %ssortby(a)%s select * from t", prefix, suffix),
+          "sortby(a)");
+      TestInsertHints(String.format(
+          "insert into t %sclustered,shuffle,sortby(a)%s select * from t", prefix,
+          suffix), "clustered", "shuffle", "sortby(a)");
+      TestInsertHints(String.format(
+          "insert into t %ssortby(a,b)%s select * from t", prefix, suffix),
+          "sortby(a,b)");
+      TestInsertHints(String.format(
+          "insert into t %ssortby(a  , b)%s select * from t", prefix, suffix),
+          "sortby(a,b)");
+      ParserError(String.format(
+          "insert into t %ssortby(  a  ,  , ,,, b  )%s select * from t", prefix, suffix));
     }
     // No "+" at the beginning so the comment is not recognized as a hint.
     TestJoinHints("select * from functional.alltypes a join /* comment */" +

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ce9b332e/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index b5cf446..9514cc6 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -463,11 +463,11 @@ public class ToSqlTest extends FrontendTestBase {
       // Insert hint.
       testToSql(String.format(
           "insert into functional.alltypes(int_col, bool_col) " +
-          "partition(year, month) %snoshuffle%s " +
+          "partition(year, month) %snoshuffle,sortby(int_col)%s " +
           "select int_col, bool_col, year, month from functional.alltypes",
           prefix, suffix),
           "INSERT INTO TABLE functional.alltypes(int_col, bool_col) " +
-              "PARTITION (year, month) \n-- +noshuffle\n " +
+              "PARTITION (year, month) \n-- +noshuffle,sortby(int_col)\n " +
           "SELECT int_col, bool_col, year, month FROM functional.alltypes");
 
       // Table hint

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ce9b332e/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
index f201d0a..8e3adb9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
@@ -705,3 +705,76 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
 00:SCAN HDFS [functional.alltypesnopart]
    partitions=1/1 files=0 size=0B
 ====
+# IMPALA-4163: sortby hint in insert statement adds sort node.
+insert into table functional.alltypes partition(year, month)
+       /*+ sortby(int_col, bool_col),shuffle */
+       select * from functional.alltypes
+---- PLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+01:SORT
+|  order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+02:SORT
+|  order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# IMPALA-4163: sortby hint in insert statement with noshuffle adds sort node.
+insert into table functional.alltypes partition(year, month)
+       /*+ sortby(int_col, bool_col),noshuffle */
+       select * from functional.alltypes
+---- PLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+01:SORT
+|  order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+01:SORT
+|  order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# IMPALA-4163: sortby hint in insert statement adds ordering columns to clustering sort.
+insert into table functional.alltypes partition(year, month)
+       /*+ sortby(int_col, bool_col),clustered */
+       select * from functional.alltypes
+---- PLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+01:SORT
+|  order by: year DESC NULLS LAST, month DESC NULLS LAST, int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+02:SORT
+|  order by: year DESC NULLS LAST, month DESC NULLS LAST, int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|
+01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ce9b332e/testdata/workloads/functional-query/queries/QueryTest/insert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/insert.test b/testdata/workloads/functional-query/queries/QueryTest/insert.test
index 5601b35..9dfaf34 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/insert.test
@@ -945,3 +945,29 @@ RESET alltypesnopart_insert
 ---- RESULTS
 : 100
 ====
+---- QUERY
+# IMPALA-4163: insert into table sortby() plan hint
+insert into table alltypesinsert
+partition (year, month) /*+ clustered,shuffle,sortby(int_col, bool_col) */
+select * from alltypestiny;
+---- SETUP
+DROP PARTITIONS alltypesinsert
+RESET alltypesinsert
+---- RESULTS
+year=2009/month=1/: 2
+year=2009/month=2/: 2
+year=2009/month=3/: 2
+year=2009/month=4/: 2
+====
+---- QUERY
+# IMPALA-4163: insert into table sortby() plan hint
+insert into table alltypesnopart_insert
+/*+ clustered,shuffle,sortby(int_col, bool_col) */
+select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col,
+double_col, date_string_col, string_col, timestamp_col from alltypestiny;
+---- SETUP
+DROP PARTITIONS alltypesinsert
+RESET alltypesinsert
+---- RESULTS
+: 8
+====


[02/15] incubator-impala git commit: IMPALA-4631: don't use floating point operations for time unit conversions

Posted by ta...@apache.org.
IMPALA-4631: don't use floating point operations for time unit conversions

This was leading to the PlanFragmentExecutor::Close() DCHECK because
with floating point we can have c * a + c * b > c * (a + b).  Also note
this is much more likely to happen when using the MONOTONIC_COARSE since
that will result in the nested scoped timers ending up starting/stopping
at exactly the same time.  Additionally, the new code is faster.

Change-Id: I7237f579b201f5bd3930f66e9c2c8d700c37ffeb
Reviewed-on: http://gerrit.cloudera.org:8080/5434
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Jim Apple <jb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/54194af6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/54194af6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/54194af6

Branch: refs/heads/hadoop-next
Commit: 54194af6ef08048a1ae367f29350228dafd8f2aa
Parents: d6eb1b1
Author: Dan Hecht <dh...@cloudera.com>
Authored: Thu Dec 8 15:29:47 2016 -0800
Committer: Dan Hecht <dh...@cloudera.com>
Committed: Thu Dec 15 22:37:47 2016 +0000

----------------------------------------------------------------------
 be/src/gutil/walltime.cc | 12 ------------
 be/src/gutil/walltime.h  | 20 ++++++++++++--------
 be/src/util/stopwatch.h  |  2 +-
 be/src/util/time.h       |  6 +++---
 4 files changed, 16 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54194af6/be/src/gutil/walltime.cc
----------------------------------------------------------------------
diff --git a/be/src/gutil/walltime.cc b/be/src/gutil/walltime.cc
index b497500..04d7f4b 100644
--- a/be/src/gutil/walltime.cc
+++ b/be/src/gutil/walltime.cc
@@ -166,18 +166,6 @@ bool WallTime_Parse_Timezone(const char* time_spec,
   return true;
 }
 
-WallTime WallTime_Now() {
-#if defined(__APPLE__)
-  mach_timespec_t ts;
-  walltime_internal::GetCurrentTime(&ts);
-  return ts.tv_sec + ts.tv_nsec / static_cast<double>(1e9);
-#else
-  timespec ts;
-  clock_gettime(CLOCK_REALTIME, &ts);
-  return ts.tv_sec + ts.tv_nsec / static_cast<double>(1e9);
-#endif  // defined(__APPLE__)
-}
-
 void StringAppendStrftime(string* dst,
                           const char* format,
                           time_t when,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54194af6/be/src/gutil/walltime.h
----------------------------------------------------------------------
diff --git a/be/src/gutil/walltime.h b/be/src/gutil/walltime.h
index 8d31627..2f04ebe 100644
--- a/be/src/gutil/walltime.h
+++ b/be/src/gutil/walltime.h
@@ -30,6 +30,12 @@ using std::string;
 #include "common/logging.h"
 #include "gutil/integral_types.h"
 
+#define NANOS_PER_SEC  1000000000ll
+#define NANOS_PER_MICRO      1000ll
+#define MICROS_PER_SEC    1000000ll
+#define MICROS_PER_MILLI     1000ll
+#define MILLIS_PER_SEC       1000ll
+
 typedef double WallTime;
 
 // Append result to a supplied string.
@@ -52,9 +58,6 @@ bool WallTime_Parse_Timezone(const char* time_spec,
                                     bool local,
                                     WallTime* result);
 
-// Return current time in seconds as a WallTime.
-WallTime WallTime_Now();
-
 typedef int64 MicrosecondsInt64;
 typedef int64 NanosecondsInt64;
 
@@ -76,7 +79,7 @@ inline void GetCurrentTime(mach_timespec_t* ts) {
 inline MicrosecondsInt64 GetCurrentTimeMicros() {
   mach_timespec_t ts;
   GetCurrentTime(&ts);
-  return ts.tv_sec * 1e6 + ts.tv_nsec / 1e3;
+  return ts.tv_sec * MICROS_PER_SEC + ts.tv_nsec / NANOS_PER_MICRO;
 }
 
 inline int64_t GetMonoTimeNanos() {
@@ -91,7 +94,7 @@ inline int64_t GetMonoTimeNanos() {
 }
 
 inline MicrosecondsInt64 GetMonoTimeMicros() {
-  return GetMonoTimeNanos() / 1e3;
+  return GetMonoTimeNanos() / NANOS_PER_MICRO;
 }
 
 inline MicrosecondsInt64 GetThreadCpuTimeMicros() {
@@ -117,7 +120,8 @@ inline MicrosecondsInt64 GetThreadCpuTimeMicros() {
     return 0;
   }
 
-  return thread_info_data.user_time.seconds * 1e6 + thread_info_data.user_time.microseconds;
+  return thread_info_data.user_time.seconds * MICROS_PER_SEC +
+      thread_info_data.user_time.microseconds;
 }
 
 #else
@@ -125,13 +129,13 @@ inline MicrosecondsInt64 GetThreadCpuTimeMicros() {
 inline MicrosecondsInt64 GetClockTimeMicros(clockid_t clock) {
   timespec ts;
   clock_gettime(clock, &ts);
-  return ts.tv_sec * 1e6 + ts.tv_nsec / 1e3;
+  return ts.tv_sec * MICROS_PER_SEC + ts.tv_nsec / NANOS_PER_MICRO;
 }
 
 inline NanosecondsInt64 GetClockTimeNanos(clockid_t clock) {
   timespec ts;
   clock_gettime(clock, &ts);
-  return ts.tv_sec * 1e9 + ts.tv_nsec;
+  return ts.tv_sec * NANOS_PER_SEC + ts.tv_nsec;
 }
 
 #endif // defined(__APPLE__)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54194af6/be/src/util/stopwatch.h
----------------------------------------------------------------------
diff --git a/be/src/util/stopwatch.h b/be/src/util/stopwatch.h
index 0e73b6a..c1f85aa 100644
--- a/be/src/util/stopwatch.h
+++ b/be/src/util/stopwatch.h
@@ -170,7 +170,7 @@ class MonotonicStopWatch {
     // Now() can be called frequently (IMPALA-2407).
     timespec ts;
     clock_gettime(OsInfo::fast_clock(), &ts);
-    return ts.tv_sec * 1e9 + ts.tv_nsec;
+    return ts.tv_sec * NANOS_PER_SEC + ts.tv_nsec;
 #endif
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/54194af6/be/src/util/time.h
----------------------------------------------------------------------
diff --git a/be/src/util/time.h b/be/src/util/time.h
index 28a0f66..efe6a3b 100644
--- a/be/src/util/time.h
+++ b/be/src/util/time.h
@@ -39,11 +39,11 @@ inline int64_t MonotonicMicros() {  // 63 bits ~= 5K years uptime
 }
 
 inline int64_t MonotonicMillis() {
-  return GetMonoTimeMicros() / 1e3;
+  return GetMonoTimeMicros() / MICROS_PER_MILLI;
 }
 
 inline int64_t MonotonicSeconds() {
-  return GetMonoTimeMicros() / 1e6;
+  return GetMonoTimeMicros() / MICROS_PER_SEC;
 }
 
 
@@ -52,7 +52,7 @@ inline int64_t MonotonicSeconds() {
 /// a cluster. For more accurate timings on the local host use the monotonic functions
 /// above.
 inline int64_t UnixMillis() {
-  return GetCurrentTimeMicros() / 1e3;
+  return GetCurrentTimeMicros() / MICROS_PER_MILLI;
 }
 
 /// Sleeps the current thread for at least duration_ms milliseconds.


[10/15] incubator-impala git commit: IMPALA-4684: Wrap checking for HBase nodes ina try/finally block

Posted by ta...@apache.org.
IMPALA-4684: Wrap checking for HBase nodes ina  try/finally block

If an exception (other than NoNodeError) was raised while checking for
HBase nodes, we weren't cleanly stopping the ZooKeeper client, which
in turn created a second exception when the the connection was closed.
The second exception masked the original error condition.

Tested by forcibly raising unexpected errors while checking for HBase
nodes.

Change-Id: I46a74d018f9169385a9f10a85718044c31a24dbc
Reviewed-on: http://gerrit.cloudera.org:8080/5547
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/73146a0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/73146a0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/73146a0a

Branch: refs/heads/hadoop-next
Commit: 73146a0a46817660505fa37957d44d73f020bc65
Parents: c03d398
Author: David Knupp <dk...@cloudera.com>
Authored: Mon Dec 19 14:47:38 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Dec 21 00:22:16 2016 +0000

----------------------------------------------------------------------
 testdata/bin/check-hbase-nodes.py | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/73146a0a/testdata/bin/check-hbase-nodes.py
----------------------------------------------------------------------
diff --git a/testdata/bin/check-hbase-nodes.py b/testdata/bin/check-hbase-nodes.py
index 999a657..28ac0c1 100755
--- a/testdata/bin/check-hbase-nodes.py
+++ b/testdata/bin/check-hbase-nodes.py
@@ -131,8 +131,10 @@ def check_znodes_list_for_errors(nodes, zookeeper_hosts, timeout):
         0 success, or else the number of unresponsive nodes
     """
     with closing(connect_to_zookeeper(zookeeper_hosts, timeout)) as zk_client:
-        errors = sum([check_znode(node, zk_client, timeout) for node in nodes])
-        zk_client.stop()
+        try:
+            errors = sum([check_znode(node, zk_client, timeout) for node in nodes])
+        finally:
+            zk_client.stop()
     return errors