You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/13 06:09:32 UTC

[01/10] incubator-impala git commit: IMPALA-3527: use codegen'd ProcessProbeBatch() when spilling.

Repository: incubator-impala
Updated Branches:
  refs/heads/master 14cdb0497 -> 46c3e43ed


IMPALA-3527: use codegen'd ProcessProbeBatch() when spilling.

Change-Id: I92ebfb01e370d0a842270771c9e5f1a4610dc16a
Reviewed-on: http://gerrit.cloudera.org:8080/3035
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6910f497
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6910f497
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6910f497

Branch: refs/heads/master
Commit: 6910f4975ad7969f5b6ba67e341f68dddf6608fa
Parents: a2e88f0
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed May 11 17:56:40 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 23:06:35 2016 -0700

----------------------------------------------------------------------
 be/src/exec/partitioned-hash-join-node.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6910f497/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 0ecec1e..47c118a 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -952,7 +952,7 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
       // in the xcompiled function, so call it here instead.
       int rows_added = 0;
       SCOPED_TIMER(probe_timer_);
-      if (process_probe_batch_fn_ == NULL || ht_ctx_->level() != 0) {
+      if (process_probe_batch_fn_ == NULL) {
         rows_added = ProcessProbeBatch(join_op_, out_batch, ht_ctx_.get(), &status);
       } else {
         DCHECK(process_probe_batch_fn_level0_ != NULL);


[06/10] incubator-impala git commit: IMPALA-3528: Transfer scratch tuple memory in Close() of Parquet scanner.

Posted by ta...@apache.org.
IMPALA-3528: Transfer scratch tuple memory in Close() of Parquet scanner.

The lifetime of a scanner thread is decoupled from that of row batches that
it produces. That means that all resources associated with row batches
produced by the scanner thread should be transferred to those batches.

The bug was that we were not transferring the ownership of memory from the
scratch batch to the final row batch returned in HdfsParquetScanner::Close().

Triggering an event that would cause the freed memory to be dereferenced is
possible, but very difficult. My understanding is that it is only possible
in exceptional non-deterministic scenarios, e.g., a query is cancelled just
at the right time, or the scanner hits a parse/decoding error.

Testing: I tested this change locally by running the scanner and nested
types test as well as TPCH, nested TPCH, and TPC-DS.

Change-Id: Ic34d32c9a41ea66b2b2d8f5e187cc84d4cb569b2
Reviewed-on: http://gerrit.cloudera.org:8080/3041
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e96b4635
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e96b4635
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e96b4635

Branch: refs/heads/master
Commit: e96b463587cf5e7e5211741bcbc70237d1b1b2a6
Parents: 7e0cbaf
Author: Alex Behm <al...@cloudera.com>
Authored: Wed May 11 18:22:41 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 23:06:36 2016 -0700

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e96b4635/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 47e19a6..3fcb693 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -1156,10 +1156,12 @@ void HdfsParquetScanner::Close() {
   }
   if (batch_ != NULL) {
     AttachPool(dictionary_pool_.get(), false);
+    AttachPool(scratch_batch_->mem_pool(), false);
     AddFinalRowBatch();
   }
   // Verify all resources (if any) have been transferred.
   DCHECK_EQ(dictionary_pool_.get()->total_allocated_bytes(), 0);
+  DCHECK_EQ(scratch_batch_->mem_pool()->total_allocated_bytes(), 0);
   DCHECK_EQ(context_->num_completed_io_buffers(), 0);
   // If this was a metadata only read (i.e. count(*)), there are no columns.
   if (compression_types.empty()) compression_types.push_back(THdfsCompression::NONE);


[05/10] incubator-impala git commit: IMPALA-3459: Add test for DROP TABLE PURGE for S3

Posted by ta...@apache.org.
IMPALA-3459: Add test for DROP TABLE PURGE for S3

It was previously thought that PURGE had no effect on S3. However,
the Hive Metastore actually created a .Trash directory and copied the
files there when a DROP TABLE was conducted from Impala.

This patch just enables the existing PURGE tests for S3. There were a
few reasons this wasn't working before. The paths given to the S3
client (boto3) should not have a leading "/". This has been fixed as
it doesn't make a difference for HDFS if that exists or not.

Also, PURGE is a pure delete whereas a regular DROP is a copy. A copy
is consistent whereas a delete is only eventually consistent, so when
we PURGE a table or partition, the files will still be visible for
sometime after the query has completed. The tests have been modified
to accomodate for this case as well.

Change-Id: I52d2451e090b00ae2fd9a879c28defa6c940047c
Reviewed-on: http://gerrit.cloudera.org:8080/3036
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7e0cbaf1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7e0cbaf1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7e0cbaf1

Branch: refs/heads/master
Commit: 7e0cbaf1a06da075639b36290b1ec09ef82122e0
Parents: 6910f49
Author: Sailesh Mukil <sa...@cloudera.com>
Authored: Wed May 11 18:22:17 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 23:06:36 2016 -0700

----------------------------------------------------------------------
 tests/common/skip.py       |  2 -
 tests/metadata/test_ddl.py | 94 +++++++++++++++++++++++------------------
 2 files changed, 52 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7e0cbaf1/tests/common/skip.py
----------------------------------------------------------------------
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 3c4fe27..b2f52ba 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -34,8 +34,6 @@ class SkipIfS3:
   jira = partial(pytest.mark.skipif, IS_S3)
   hdfs_encryption = pytest.mark.skipif(IS_S3,
       reason="HDFS encryption is not supported with S3")
-  hdfs_purge = pytest.mark.skipif(IS_S3,
-      reason="PURGE has no effect on S3")
 
   # These ones need test infra work to re-enable.
   udfs = pytest.mark.skipif(IS_S3, reason="udas/udfs not copied to S3")

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7e0cbaf1/tests/metadata/test_ddl.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index 791d68d..0a1900c 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -68,7 +68,6 @@ class TestDdlStatements(ImpalaTestSuite):
     for dir_ in ['part_data', 't1_tmp1', 't_part_tmp']:
       self.filesystem_client.delete_file_dir('test-warehouse/%s' % dir_, recursive=True)
 
-  @SkipIfS3.hdfs_purge
   @SkipIfLocal.hdfs_client
   @pytest.mark.execute_serially
   def test_drop_table_with_purge(self):
@@ -80,41 +79,48 @@ class TestDdlStatements(ImpalaTestSuite):
     self.client.execute("create table {0}.t1(i int)".format(DDL_PURGE_DB))
     self.client.execute("create table {0}.t2(i int)".format(DDL_PURGE_DB))
     # Create sample test data files under the table directories
-    self.hdfs_client.create_file("test-warehouse/{0}.db/t1/t1.txt".format(DDL_PURGE_DB),\
-        file_data='t1')
-    self.hdfs_client.create_file("test-warehouse/{0}.db/t2/t2.txt".format(DDL_PURGE_DB),\
-        file_data='t2')
+    self.filesystem_client.create_file("test-warehouse/{0}.db/t1/t1.txt".\
+        format(DDL_PURGE_DB), file_data='t1')
+    self.filesystem_client.create_file("test-warehouse/{0}.db/t2/t2.txt".\
+        format(DDL_PURGE_DB), file_data='t2')
     # Drop the table (without purge) and make sure it exists in trash
     self.client.execute("drop table {0}.t1".format(DDL_PURGE_DB))
-    assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/t1.txt".\
+    assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/t1.txt".\
         format(DDL_PURGE_DB))
-    assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/".format(DDL_PURGE_DB))
-    assert self.hdfs_client.exists(\
-        "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/t1.txt".\
+    assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/".\
+        format(DDL_PURGE_DB))
+    assert self.filesystem_client.exists(\
+        "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/t1.txt".\
         format(getpass.getuser(), DDL_PURGE_DB))
-    assert self.hdfs_client.exists(\
-        "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1".\
+    assert self.filesystem_client.exists(\
+        "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1".\
         format(getpass.getuser(), DDL_PURGE_DB))
     # Drop the table (with purge) and make sure it doesn't exist in trash
     self.client.execute("drop table {0}.t2 purge".format(DDL_PURGE_DB))
-    assert not self.hdfs_client.exists("test-warehouse/{0}.db/t2/".format(DDL_PURGE_DB))
-    assert not self.hdfs_client.exists("test-warehouse/{0}.db/t2/t2.txt".\
-        format(DDL_PURGE_DB))
-    assert not self.hdfs_client.exists(\
-        "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t2/t2.txt".\
+    if not IS_S3:
+      # In S3, deletes are eventual. So even though we dropped the table, the files
+      # belonging to this table will still be visible for some unbounded time. This
+      # happens only with PURGE. A regular DROP TABLE is just a copy of files which is
+      # consistent.
+      assert not self.filesystem_client.exists("test-warehouse/{0}.db/t2/".\
+          format(DDL_PURGE_DB))
+      assert not self.filesystem_client.exists("test-warehouse/{0}.db/t2/t2.txt".\
+          format(DDL_PURGE_DB))
+    assert not self.filesystem_client.exists(\
+        "user/{0}/.Trash/Current/test-warehouse/{1}.db/t2/t2.txt".\
         format(getpass.getuser(), DDL_PURGE_DB))
-    assert not self.hdfs_client.exists(\
-        "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t2".\
+    assert not self.filesystem_client.exists(\
+        "user/{0}/.Trash/Current/test-warehouse/{1}.db/t2".\
         format(getpass.getuser(), DDL_PURGE_DB))
     # Create an external table t3 and run the same test as above. Make
     # sure the data is not deleted
-    self.hdfs_client.make_dir("test-warehouse/data_t3/", permission=777)
-    self.hdfs_client.create_file("test-warehouse/data_t3/data.txt", file_data='100')
+    self.filesystem_client.make_dir("test-warehouse/data_t3/", permission=777)
+    self.filesystem_client.create_file("test-warehouse/data_t3/data.txt", file_data='100')
     self.client.execute("create external table {0}.t3(i int) stored as \
       textfile location \'/test-warehouse/data_t3\'" .format(DDL_PURGE_DB))
     self.client.execute("drop table {0}.t3 purge".format(DDL_PURGE_DB))
-    assert self.hdfs_client.exists("test-warehouse/data_t3/data.txt")
-    self.hdfs_client.delete_file_dir("test-warehouse/data_t3", recursive=True)
+    assert self.filesystem_client.exists("test-warehouse/data_t3/data.txt")
+    self.filesystem_client.delete_file_dir("test-warehouse/data_t3", recursive=True)
 
   @SkipIfLocal.hdfs_client
   @pytest.mark.execute_serially
@@ -306,7 +312,6 @@ class TestDdlStatements(ImpalaTestSuite):
     self.run_test_case('QueryTest/alter-table', vector, use_db='alter_table_test_db',
         multiple_impalad=self._use_multiple_impalad(vector))
 
-  @SkipIfS3.hdfs_purge # S3: missing coverage: alter table drop partition
   @SkipIfLocal.hdfs_client
   @pytest.mark.execute_serially
   def test_drop_partition_with_purge(self, vector):
@@ -315,38 +320,43 @@ class TestDdlStatements(ImpalaTestSuite):
     # Create a sample database alter_purge_db and table t1 in it
     self._create_db(ALTER_PURGE_DB)
     self.client.execute("create table {0}.t1(i int) partitioned\
-      by (j int)".format(ALTER_PURGE_DB))
+        by (j int)".format(ALTER_PURGE_DB))
     # Add two partitions (j=1) and (j=2) to table t1
     self.client.execute("alter table {0}.t1 add partition(j=1)".format(ALTER_PURGE_DB))
     self.client.execute("alter table {0}.t1 add partition(j=2)".format(ALTER_PURGE_DB))
-    self.hdfs_client.create_file(\
-            "test-warehouse/{0}.db/t1/j=1/j1.txt".format(ALTER_PURGE_DB), file_data='j1')
-    self.hdfs_client.create_file(\
-            "test-warehouse/{0}.db/t1/j=2/j2.txt".format(ALTER_PURGE_DB), file_data='j2')
+    self.filesystem_client.create_file(\
+        "test-warehouse/{0}.db/t1/j=1/j1.txt".format(ALTER_PURGE_DB), file_data='j1')
+    self.filesystem_client.create_file(\
+        "test-warehouse/{0}.db/t1/j=2/j2.txt".format(ALTER_PURGE_DB), file_data='j2')
     # Drop the partition (j=1) without purge and make sure it exists in trash
     self.client.execute("alter table {0}.t1 drop partition(j=1)".format(ALTER_PURGE_DB));
-    assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/j=1/j1.txt".\
+    assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/j=1/j1.txt".\
         format(ALTER_PURGE_DB))
-    assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/j=1".\
+    assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/j=1".\
         format(ALTER_PURGE_DB))
-    assert self.hdfs_client.exists(\
-        "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=1/j1.txt".\
+    assert self.filesystem_client.exists(\
+        "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=1/j1.txt".\
         format(getpass.getuser(), ALTER_PURGE_DB))
-    assert self.hdfs_client.exists(\
-        "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=1".\
+    assert self.filesystem_client.exists(\
+        "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=1".\
         format(getpass.getuser(), ALTER_PURGE_DB))
     # Drop the partition (with purge) and make sure it doesn't exist in trash
     self.client.execute("alter table {0}.t1 drop partition(j=2) purge".\
         format(ALTER_PURGE_DB));
-    assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/j=2/j2.txt".\
-        format(ALTER_PURGE_DB))
-    assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/j=2".\
-        format(ALTER_PURGE_DB))
-    assert not self.hdfs_client.exists(\
-        "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=2".\
+    if not IS_S3:
+      # In S3, deletes are eventual. So even though we dropped the partition, the files
+      # belonging to this partition will still be visible for some unbounded time. This
+      # happens only with PURGE. A regular DROP TABLE is just a copy of files which is
+      # consistent.
+      assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/j=2/j2.txt".\
+          format(ALTER_PURGE_DB))
+      assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/j=2".\
+          format(ALTER_PURGE_DB))
+    assert not self.filesystem_client.exists(\
+        "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=2".\
         format(getpass.getuser(), ALTER_PURGE_DB))
-    assert not self.hdfs_client.exists(
-        "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=2/j2.txt".\
+    assert not self.filesystem_client.exists(
+        "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=2/j2.txt".\
         format(getpass.getuser(), ALTER_PURGE_DB))
 
   @pytest.mark.execute_serially


[02/10] incubator-impala git commit: IMPALA-3495: incorrect join result due to implicit cast in Murmur hash

Posted by ta...@apache.org.
IMPALA-3495: incorrect join result due to implicit cast in Murmur hash

We observed that some spilling joins started returning incorrect
results. The behaviour seems to happen when a codegen'd insert and a
non-codegen'd probe function is used (or vice-versa). This only seems to
happen in a subset of cases.

The bug appears to be a result of the implicit cast of the uint32_t seed
value to the int32_t hash argument to HashTable::Hash(). The behaviour
is unspecified if the uint32_t does not fit in the int32_t. In Murmur
hash, this value is subsequently cast to a uint64_t, so we have a chain
of uint32_t->int32_t->uint64_t conversions. It would require a very
careful reading of the C++ standard to understand what the expected
result is, and whether we're seeing a compiler bug or just unspecified
behaviour, but we can avoid it entirely by keeping the values unsigned.

Testing:
I was able to reproduce the issue under a very specific of circumstances,
listed below. Before this change it consistently returned 0 rows. After the
change it consistently returned the correct results. I haven't had much
luck creating a suitable regression test.

* 1 impalad
* --disable_mem_pools=true
* use tpch_20_parquet;
* set mem_limit=1275mb;
* TPC-H query 7:

select
  supp_nation,
  cust_nation,
  l_year,
  sum(volume) as revenue
from (
  select
    n1.n_name as supp_nation,
    n2.n_name as cust_nation,
    year(l_shipdate) as l_year,
    l_extendedprice * (1 - l_discount) as volume
  from
    supplier,
    lineitem,
    orders,
    customer,
    nation n1,
    nation n2
  where
    s_suppkey = l_suppkey
    and o_orderkey = l_orderkey
    and c_custkey = o_custkey
    and s_nationkey = n1.n_nationkey
    and c_nationkey = n2.n_nationkey
    and (
      (n1.n_name = 'FRANCE' and n2.n_name = 'GERMANY')
      or (n1.n_name = 'GERMANY' and n2.n_name = 'FRANCE')
    )
    and l_shipdate between '1995-01-01' and '1996-12-31'
  ) as shipping
group by
  supp_nation,
  cust_nation,
  l_year
order by
  supp_nation,
  cust_nation,
  l_year

Change-Id: I952638dc94119a4bc93126ea94cc6a3edf438956
Reviewed-on: http://gerrit.cloudera.org:8080/3034
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a2e88f0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a2e88f0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a2e88f0e

Branch: refs/heads/master
Commit: a2e88f0e6c7f017a3fe64bebbf5f05913aed4bc7
Parents: df1412c
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed May 11 17:40:07 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 23:06:35 2016 -0700

----------------------------------------------------------------------
 be/src/exec/hash-table.h | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a2e88f0e/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index 3d090a8..673822e 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -202,7 +202,7 @@ class HashTableCtx {
   }
 
   /// Wrapper function for calling correct HashUtil function in non-codegen'd case.
-  uint32_t inline Hash(const void* input, int len, int32_t hash) const {
+  uint32_t inline Hash(const void* input, int len, uint32_t hash) const {
     /// Use CRC hash at first level for better performance. Switch to murmur hash at
     /// subsequent levels since CRC doesn't randomize well with different seed inputs.
     if (level_ == 0) return HashUtil::Hash(input, len, hash);


[04/10] incubator-impala git commit: Remove replica_preference query option

Posted by ta...@apache.org.
Remove replica_preference query option

Change-Id: I5a3134b874a53241706d850d186acbfed768f5ee
Reviewed-on: http://gerrit.cloudera.org:8080/2323
Reviewed-by: Marcel Kornacker <ma...@cloudera.com>
Reviewed-by: Silvius Rus <sr...@cloudera.com>
Tested-by: Internal Jenkins
Reviewed-on: http://gerrit.cloudera.org:8080/3030
Reviewed-by: Lars Volker <lv...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cb377741
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cb377741
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cb377741

Branch: refs/heads/master
Commit: cb377741eca8b5e5aa218e39518136676e535561
Parents: 9174dee
Author: Lars Volker <lv...@cloudera.com>
Authored: Thu Feb 25 22:07:32 2016 -0800
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 23:06:36 2016 -0700

----------------------------------------------------------------------
 be/src/scheduling/simple-scheduler-test.cc      |  6 +----
 be/src/scheduling/simple-scheduler.cc           |  7 ++---
 be/src/service/query-options.cc                 | 28 ++------------------
 be/src/service/query-options.h                  |  3 +--
 common/thrift/ImpalaInternalService.thrift      |  7 +----
 common/thrift/ImpalaService.thrift              |  7 +----
 .../com/cloudera/impala/analysis/TableRef.java  | 11 +-------
 .../impala/analysis/AnalyzeStmtsTest.java       | 26 +++++-------------
 .../cloudera/impala/analysis/ParserTest.java    | 21 ++++-----------
 .../com/cloudera/impala/analysis/ToSqlTest.java | 17 +++++-------
 10 files changed, 30 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb377741/be/src/scheduling/simple-scheduler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler-test.cc b/be/src/scheduling/simple-scheduler-test.cc
index d7786bd..bebb1fb 100644
--- a/be/src/scheduling/simple-scheduler-test.cc
+++ b/be/src/scheduling/simple-scheduler-test.cc
@@ -365,11 +365,7 @@ class Plan {
 
   const TQueryOptions& query_options() const { return query_options_; }
 
-  void SetReplicaPreference(TReplicaPreference::type p) {
-    query_options_.replica_preference = p;
-  }
-
-  void SetRandomReplica(bool b) { query_options_.random_replica = b; }
+  void SetRandomReplica(bool b) { query_options_.schedule_random_replica = b; }
   void SetDisableCachedReads(bool b) { query_options_.disable_cached_reads = b; }
   const Cluster& cluster() const { return schema_.cluster(); }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb377741/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc
index 265ca10..5074071 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -451,8 +451,8 @@ Status SimpleScheduler::ComputeScanRangeAssignment(
     const TQueryOptions& query_options, FragmentScanRangeAssignment* assignment) {
   // We adjust all replicas with memory distance less than base_distance to base_distance
   // and view all replicas with equal or better distance as the same. For a full list of
-  // memory distance classes see TReplicaPreference in ImpalaInternalService.thrift.
-  TReplicaPreference::type base_distance = query_options.replica_preference;
+  // memory distance classes see TReplicaPreference in PlanNodes.thrift.
+  TReplicaPreference::type base_distance = TReplicaPreference::CACHE_LOCAL;
   // The query option to disable cached reads adjusts the memory base distance to view
   // all replicas as disk_local or worse.
   // TODO remove in CDH6
@@ -467,7 +467,8 @@ Status SimpleScheduler::ComputeScanRangeAssignment(
   // On otherwise equivalent disk replicas we either pick the first one, or we pick a
   // random one. Picking random ones helps with preventing hot spots across several
   // queries. On cached replica we will always break ties randomly.
-  bool random_non_cached_tiebreak = node_random_replica || query_options.random_replica;
+  bool random_non_cached_tiebreak = node_random_replica
+      || query_options.schedule_random_replica;
 
   // map from datanode host to total assigned bytes.
   unordered_map<TNetworkAddress, uint64_t> assigned_bytes_per_host;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb377741/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 4617256..274776c 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -239,14 +239,6 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       case TImpalaQueryOptions::DISABLE_CACHED_READS:
         if (iequals(value, "true") || iequals(value, "1")) {
-          if (query_options->replica_preference == TReplicaPreference::CACHE_LOCAL ||
-              query_options->replica_preference == TReplicaPreference::CACHE_RACK) {
-            stringstream ss;
-            ss << "Conflicting settings: DISABLE_CACHED_READS = true and"
-               << " REPLICA_PREFERENCE = " << _TReplicaPreference_VALUES_TO_NAMES.at(
-                query_options->replica_preference);
-            return Status(ss.str());
-          }
           query_options->__set_disable_cached_reads(true);
         }
         break;
@@ -286,24 +278,8 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_optimize_partition_key_scans(
             iequals(value, "true") || iequals(value, "1"));
         break;
-      case TImpalaQueryOptions::REPLICA_PREFERENCE:
-        if (iequals(value, "cache_local") || iequals(value, "0")) {
-          if (query_options->disable_cached_reads) {
-            return Status("Conflicting settings: DISABLE_CACHED_READS = true and"
-                " REPLICA_PREFERENCE = CACHE_LOCAL");
-          }
-          query_options->__set_replica_preference(TReplicaPreference::CACHE_LOCAL);
-        } else if (iequals(value, "disk_local") || iequals(value, "2")) {
-          query_options->__set_replica_preference(TReplicaPreference::DISK_LOCAL);
-        } else if (iequals(value, "remote") || iequals(value, "4")) {
-          query_options->__set_replica_preference(TReplicaPreference::REMOTE);
-        } else {
-          return Status(Substitute("Invalid replica memory distance preference '$0'."
-              "Valid values are CACHE_LOCAL(0), DISK_LOCAL(2), REMOTE(4)", value));
-        }
-        break;
-      case TImpalaQueryOptions::RANDOM_REPLICA:
-        query_options->__set_random_replica(
+      case TImpalaQueryOptions::SCHEDULE_RANDOM_REPLICA:
+        query_options->__set_schedule_random_replica(
             iequals(value, "true") || iequals(value, "1"));
         break;
       case TImpalaQueryOptions::SCAN_NODE_CODEGEN_THRESHOLD:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb377741/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 6f4c214..fb24530 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -65,8 +65,7 @@ class TQueryOptions;
   QUERY_OPT_FN(seq_compression_mode, SEQ_COMPRESSION_MODE)\
   QUERY_OPT_FN(exec_single_node_rows_threshold, EXEC_SINGLE_NODE_ROWS_THRESHOLD)\
   QUERY_OPT_FN(optimize_partition_key_scans, OPTIMIZE_PARTITION_KEY_SCANS)\
-  QUERY_OPT_FN(replica_preference, REPLICA_PREFERENCE)\
-  QUERY_OPT_FN(random_replica, RANDOM_REPLICA)\
+  QUERY_OPT_FN(schedule_random_replica, SCHEDULE_RANDOM_REPLICA)\
   QUERY_OPT_FN(scan_node_codegen_threshold, SCAN_NODE_CODEGEN_THRESHOLD)\
   QUERY_OPT_FN(disable_streaming_preaggregations, DISABLE_STREAMING_PREAGGREGATIONS)\
   QUERY_OPT_FN(runtime_filter_mode, RUNTIME_FILTER_MODE)\

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb377741/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index b9892e6..74bef28 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -133,17 +133,12 @@ struct TQueryOptions {
   // produce different results than the scan based approach in some edge cases.
   32: optional bool optimize_partition_key_scans = 0
 
-  // Specify the prefered locality level of replicas during scan scheduling.
-  // Replicas with an equal or better locality will be preferred.
-  33: optional PlanNodes.TReplicaPreference replica_preference =
-      PlanNodes.TReplicaPreference.CACHE_LOCAL
-
   // Configure whether scheduling of scans over multiple non-cached replicas will break
   // ties between multiple, otherwise equivalent locations at random or deterministically.
   // The former will pick a random replica, the latter will use the replica order from the
   // metastore. This setting will not affect tie-breaking for cached replicas. Instead,
   // they will always break ties randomly.
-  34: optional bool random_replica = 0
+  34: optional bool schedule_random_replica = 0
 
   // For scan nodes with any conjuncts, use codegen to evaluate the conjuncts if
   // the number of rows * number of operators in the conjuncts exceeds this threshold.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb377741/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 68647df..9421ec4 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -172,13 +172,8 @@ enum TImpalaQueryOptions {
   // produce different results than the scan based approach in some edge cases.
   OPTIMIZE_PARTITION_KEY_SCANS,
 
-  // Prefered memory distance of replicas. This parameter determines the pool of replicas
-  // among which scans will be scheduled in terms of the distance of the replica storage
-  // from the impalad.
-  REPLICA_PREFERENCE,
-
   // Determines tie breaking policy when picking locations.
-  RANDOM_REPLICA,
+  SCHEDULE_RANDOM_REPLICA,
 
   // For scan nodes with any conjuncts, use codegen to evaluate the conjuncts if
   // the number of rows * number of operators in the conjuncts exceeds this threshold.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb377741/fe/src/main/java/com/cloudera/impala/analysis/TableRef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/TableRef.java b/fe/src/main/java/com/cloudera/impala/analysis/TableRef.java
index e53f1b1..6f9ac85 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/TableRef.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/TableRef.java
@@ -394,16 +394,7 @@ public class TableRef implements ParseNode {
       analyzer.addWarning("Table hints only supported for Hdfs tables");
     }
     for (String hint: tableHints_) {
-      if (hint.equalsIgnoreCase("SCHEDULE_CACHE_LOCAL")) {
-        analyzer.setHasPlanHints();
-        replicaPreference_ = TReplicaPreference.CACHE_LOCAL;
-      } else if (hint.equalsIgnoreCase("SCHEDULE_DISK_LOCAL")) {
-        analyzer.setHasPlanHints();
-        replicaPreference_ = TReplicaPreference.DISK_LOCAL;
-      } else if (hint.equalsIgnoreCase("SCHEDULE_REMOTE")) {
-        analyzer.setHasPlanHints();
-        replicaPreference_ = TReplicaPreference.REMOTE;
-      } else if (hint.equalsIgnoreCase("RANDOM_REPLICA")) {
+      if (hint.equalsIgnoreCase("SCHEDULE_RANDOM_REPLICA")) {
         analyzer.setHasPlanHints();
         randomReplica_ = true;
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb377741/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java
index 08d86a6..72c58d5 100644
--- a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java
@@ -1611,20 +1611,8 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
       String suffix = hintStyle[1];
       for (String alias : new String[] { "", "a" }) {
         AnalyzesOk(
-            String.format("select * from functional.alltypes %s %sschedule_cache_local%s",
-            alias, prefix, suffix));
-        AnalyzesOk(
-            String.format("select * from functional.alltypes %s %sschedule_disk_local%s",
-            alias, prefix, suffix));
-        AnalyzesOk(
-            String.format("select * from functional.alltypes %s %sschedule_remote%s",
-            alias, prefix, suffix));
-        AnalyzesOk(
-            String.format("select * from functional.alltypes %s %sschedule_remote," +
-            "random_replica%s", alias, prefix, suffix));
-        AnalyzesOk(
-            String.format("select * from functional.alltypes %s %srandom_replica," +
-            "schedule_remote%s", alias, prefix, suffix));
+            String.format("select * from functional.alltypes %s " +
+            "%sschedule_random_replica%s", alias, prefix, suffix));
 
         String name = alias.isEmpty() ? "functional.alltypes" : alias;
         AnalyzesOk(String.format("select * from functional.alltypes %s %sFOO%s", alias,
@@ -1633,24 +1621,24 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
 
         // Table hints not supported for HBase tables
         AnalyzesOk(String.format("select * from functional_hbase.alltypes %s " +
-              "%srandom_replica%s", alias, prefix, suffix),
+              "%sschedule_random_replica%s", alias, prefix, suffix),
             "Table hints only supported for Hdfs tables");
         // Table hints not supported for catalog views
         AnalyzesOk(String.format("select * from functional.alltypes_view %s " +
-              "%srandom_replica%s", alias, prefix, suffix),
+              "%sschedule_random_replica%s", alias, prefix, suffix),
             "Table hints not supported for inline view and collections");
         // Table hints not supported for with clauses
         AnalyzesOk(String.format("with t as (select 1) select * from t %s " +
-              "%srandom_replica%s", alias, prefix, suffix),
+              "%sschedule_random_replica%s", alias, prefix, suffix),
             "Table hints not supported for inline view and collections");
       }
       // Table hints not supported for inline views
       AnalyzesOk(String.format("select * from (select tinyint_col * 2 as c1 from " +
-          "functional.alltypes) as v1 %srandom_replica%s", prefix, suffix),
+          "functional.alltypes) as v1 %sschedule_random_replica%s", prefix, suffix),
           "Table hints not supported for inline view and collections");
       // Table hints not supported for collection tables
       AnalyzesOk(String.format("select item from functional.allcomplextypes, " +
-          "allcomplextypes.int_array_col %srandom_replica%s", prefix, suffix),
+          "allcomplextypes.int_array_col %sschedule_random_replica%s", prefix, suffix),
           "Table hints not supported for inline view and collections");
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb377741/fe/src/test/java/com/cloudera/impala/analysis/ParserTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/analysis/ParserTest.java b/fe/src/test/java/com/cloudera/impala/analysis/ParserTest.java
index b8bf54f..f503d69 100644
--- a/fe/src/test/java/com/cloudera/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/com/cloudera/impala/analysis/ParserTest.java
@@ -392,25 +392,14 @@ public class ParserTest {
 
       // Test TableRef hints.
       TestTableHints(String.format(
-          "select * from functional.alltypes %sschedule_disk_local%s", prefix, suffix),
-          "schedule_disk_local");
-      TestTableHints(String.format(
-          "select * from functional.alltypes %sschedule_cache_local,random_replica%s",
-          prefix, suffix), "schedule_cache_local", "random_replica");
-      TestTableHints(String.format(
-          "select * from functional.alltypes a %sschedule_cache_local,random_replica%s",
-          prefix, suffix), "schedule_cache_local", "random_replica");
-      TestTableHints(String.format(
-          "select * from functional.alltypes a %sschedule_cache_local,random_replica%s" +
-          ", functional.alltypes b %sschedule_remote%s", prefix, suffix,
-          prefix, suffix), "schedule_cache_local", "random_replica", "schedule_remote");
+          "select * from functional.alltypes %sschedule_random_replica%s", prefix,
+          suffix), "schedule_random_replica");
 
       // Test both TableRef and join hints.
       TestTableAndJoinHints(String.format(
-          "select * from functional.alltypes a %sschedule_cache_local,random_replica%s" +
-          "join %sbroadcast%s functional.alltypes b %sschedule_remote%s using(id)",
-          prefix, suffix, prefix, suffix, prefix, suffix), "schedule_cache_local",
-          "random_replica", "broadcast", "schedule_remote");
+          "select * from functional.alltypes a %sschedule_random_replica%s join " +
+          "%sbroadcast%s functional.alltypes b using(id)", prefix, suffix, prefix, suffix,
+          prefix, suffix), "schedule_random_replica", "broadcast");
 
       // Test select-list hints (e.g., straight_join). The legacy-style hint has no
       // prefix and suffix.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb377741/fe/src/test/java/com/cloudera/impala/analysis/ToSqlTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/analysis/ToSqlTest.java b/fe/src/test/java/com/cloudera/impala/analysis/ToSqlTest.java
index 521b19f..6005d35 100644
--- a/fe/src/test/java/com/cloudera/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/com/cloudera/impala/analysis/ToSqlTest.java
@@ -468,20 +468,17 @@ public class ToSqlTest extends AnalyzerTest {
 
       // Table hint
       testToSql(String.format(
-          "select * from functional.alltypes at %srandom_replica%s", prefix, suffix),
-          "SELECT * FROM functional.alltypes at \n-- +random_replica\n");
+          "select * from functional.alltypes at %sschedule_random_replica%s", prefix,
+          suffix), "SELECT * FROM functional.alltypes at \n-- +schedule_random_replica\n"
+          );
       testToSql(String.format(
-          "select * from functional.alltypes %srandom_replica%s", prefix, suffix),
-          "SELECT * FROM functional.alltypes \n-- +random_replica\n");
-      testToSql(String.format(
-          "select * from functional.alltypes %srandom_replica,schedule_disk_local%s",
-          prefix, suffix), "SELECT * FROM functional.alltypes \n-- +random_replica," +
-          "schedule_disk_local\n");
+          "select * from functional.alltypes %sschedule_random_replica%s", prefix,
+          suffix), "SELECT * FROM functional.alltypes \n-- +schedule_random_replica\n");
       testToSql(String.format(
           "select c1 from (select at.tinyint_col as c1 from functional.alltypes at " +
-          "%srandom_replica%s) s1", prefix, suffix),
+          "%sschedule_random_replica%s) s1", prefix, suffix),
           "SELECT c1 FROM (SELECT at.tinyint_col c1 FROM functional.alltypes at \n-- +" +
-          "random_replica\n) s1");
+          "schedule_random_replica\n) s1");
 
       // Select-list hint. The legacy-style hint has no prefix and suffix.
       if (prefix.contains("[")) {


[10/10] incubator-impala git commit: IMPALA-3232: Allow not-exists uncorrelated subqueries

Posted by ta...@apache.org.
IMPALA-3232: Allow not-exists uncorrelated subqueries

Before this patch, correlated exists and not exists subqueries were
rewritten as as left semi and anti joins respectively. Uncorrelated
exists subqueries were rewritten as cross joins, and uncorrelated
not-exists subqueries were not supported at all. This patch takes
advantage of the nested loop join that was recently introduced, which
allows us to rewrite both correlated and uncorrelated exists subqueries
as left semi joins and both correlated and uncorrelated not-exists
subqueries as anti joins.

Change-Id: I52ae12f116d026190f3a2a7575cda855317d11e8
Reviewed-on: http://gerrit.cloudera.org:8080/2792
Reviewed-by: Taras Bobrovytsky <tb...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/46c3e43e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/46c3e43e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/46c3e43e

Branch: refs/heads/master
Commit: 46c3e43edb753b432b104a1953fbde779e70d2b4
Parents: 7767d30
Author: Taras Bobrovytsky <tb...@cloudera.com>
Authored: Thu Apr 14 17:04:27 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 23:06:36 2016 -0700

----------------------------------------------------------------------
 .../cloudera/impala/analysis/StmtRewriter.java  | 32 +++-------
 .../impala/analysis/AnalyzeStmtsTest.java       | 10 ++-
 .../impala/analysis/AnalyzeSubqueriesTest.java  | 10 ++-
 .../queries/PlannerTest/analytic-fns.test       |  2 +-
 .../queries/PlannerTest/subquery-rewrite.test   | 66 +++++++++++++++++++-
 .../queries/QueryTest/subquery.test             | 27 ++++++++
 6 files changed, 113 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/46c3e43e/fe/src/main/java/com/cloudera/impala/analysis/StmtRewriter.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/StmtRewriter.java b/fe/src/main/java/com/cloudera/impala/analysis/StmtRewriter.java
index d682ea9..c3fbc20 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/StmtRewriter.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/StmtRewriter.java
@@ -420,34 +420,22 @@ public class StmtRewriter {
     if (onClausePredicate == null) {
       Preconditions.checkState(expr instanceof ExistsPredicate);
       ExistsPredicate existsPred = (ExistsPredicate) expr;
+      // TODO This is very expensive if uncorrelated. Remove it when we implement
+      // independent subquery evaluation.
+      if (existsPred.isNotExists()) {
+        inlineView.setJoinOp(JoinOperator.LEFT_ANTI_JOIN);
+      } else {
+        inlineView.setJoinOp(JoinOperator.LEFT_SEMI_JOIN);
+      }
       // Note that the concept of a 'correlated inline view' is similar but not the same
       // as a 'correlated subquery', i.e., a subquery with a correlated predicate.
-      if (inlineView.isCorrelated()) {
-        if (existsPred.isNotExists()) {
-          inlineView.setJoinOp(JoinOperator.LEFT_ANTI_JOIN);
-        } else {
-          inlineView.setJoinOp(JoinOperator.LEFT_SEMI_JOIN);
-        }
-        // No visible tuples added.
-        return false;
-      } else {
-        // TODO: Remove this when we support independent subquery evaluation.
-        if (existsPred.isNotExists()) {
-          throw new AnalysisException("Unsupported uncorrelated NOT EXISTS subquery: " +
-              subqueryStmt.toSql());
-        }
+      if (!inlineView.isCorrelated()) {
         // For uncorrelated subqueries, we limit the number of rows returned by the
         // subquery.
         subqueryStmt.setLimit(1);
-        // We don't have an ON clause predicate to create an equi-join. Rewrite the
-        // subquery using a CROSS JOIN.
-        // TODO This is very expensive. Remove it when we implement independent
-        // subquery evaluation.
-        inlineView.setJoinOp(JoinOperator.CROSS_JOIN);
-        LOG.warn("uncorrelated subquery rewritten using a cross join");
-        // Indicate that new visible tuples may be added in stmt's select list.
-        return true;
+        inlineView.setOnClause(new BoolLiteral(true));
       }
+      return false;
     }
 
     // Create an smap from the original select-list exprs of the select list to

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/46c3e43e/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java
index 72c58d5..80120b9 100644
--- a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeStmtsTest.java
@@ -1129,7 +1129,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     AnalyzesOk("select key, av from functional.allcomplextypes t, " +
         "(select a1.key, av from t.array_map_col a1, " +
         "(select avg(item) av from a1.value a2) v1) v2");
-    // TOOD: Enable once we support complex-typed exprs in the select list.
+    // TODO: Enable once we support complex-typed exprs in the select list.
     //AnalyzesOk("select key, av from functional.allcomplextypes t, " +
     //    "(select a1.key, a1.value from t.array_map_col a1) v1, " +
     //    "(select avg(item) av from v1.value) v2");
@@ -1152,7 +1152,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "(select a1.key, av from t.array_map_col a1, " +
         "(select avg(item) av from a1.value a2) v1) v2) " +
         "select * from w");
-    // TOOD: Enable once we support complex-typed exprs in the select list.
+    // TODO: Enable once we support complex-typed exprs in the select list.
     //AnalyzesOk("with w as (select key, av from functional.allcomplextypes t, " +
     //    "(select a1.key, a1.value from t.array_map_col a1) v1, " +
     //    "(select avg(item) av from v1.value) v2) " +
@@ -1196,7 +1196,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "uncorrelated one 'functional.alltypes':\n" +
         "SELECT * FROM functional.alltypes, (SELECT count(1) cnt " +
         "FROM t.int_array_col) v1");
-    // TOOD: Enable once we support complex-typed exprs in the select list.
+    // TODO: Enable once we support complex-typed exprs in the select list.
     // Correlated table ref has correlated inline view as parent.
     //AnalysisError("select cnt from functional.allcomplextypes t, " +
     //    "(select value arr from t.array_map_col) v1, " +
@@ -2741,6 +2741,10 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "where timestamp_col between cast('2001-01-01' as timestamp) and " +
         "(cast('2001-01-01' as timestamp) + interval 10 days)) " +
         "select * from with_1");
+    AnalyzesOk("with with_1 as (select 1 as col_name), " +
+        "with_2 as (select 1 as col_name) " +
+        "select a.tinyint_col from functional.alltypes a " +
+        "where not exists (select 1 from with_1) ");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/46c3e43e/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeSubqueriesTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeSubqueriesTest.java b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeSubqueriesTest.java
index 206c245..a5b4fe2 100644
--- a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeSubqueriesTest.java
+++ b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeSubqueriesTest.java
@@ -635,15 +635,13 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest {
     AnalyzesOk("select id from functional.alltypes where exists " +
         "(select id from functional.alltypestiny where int_col < 10 and exists (" +
         "select id from functional.alltypessmall where bool_col = true))");
-    // Uncorrelated NOT EXISTS subquery is illegal with only relative table refs
+    // Uncorrelated NOT EXISTS with relative table ref
     AnalyzesOk(String.format(
         "select id from functional.allcomplextypes t where not exists " +
         "(select item from t.int_array_col a where item < 10)"));
-    // Uncorrelated NOT EXISTS subquery is illegal with absolute table refs
-    AnalysisError("select * from functional.alltypestiny where not exists " +
-        "(select 1 from functional.alltypessmall where bool_col = false)",
-        "Unsupported uncorrelated NOT EXISTS subquery: SELECT 1 FROM " +
-        "functional.alltypessmall WHERE bool_col = FALSE");
+    // Uncorrelated NOT EXISTS subquery
+    AnalyzesOk("select * from functional.alltypestiny where not exists " +
+        "(select 1 from functional.alltypessmall where bool_col = false)");
 
     // Subquery references an explicit alias from the outer block in the FROM
     // clause

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/46c3e43e/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
index bbabef0..0f9b2d0 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
@@ -1653,7 +1653,7 @@ WHERE EXISTS (SELECT t1.year AS int_col_1 FROM functional.alltypesagg t1)
 03:SORT
 |  order by: day ASC
 |
-02:NESTED LOOP JOIN [CROSS JOIN]
+02:NESTED LOOP JOIN [LEFT SEMI JOIN]
 |
 |--01:SCAN HDFS [functional.alltypesagg t1]
 |     partitions=11/11 files=11 size=814.73KB

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/46c3e43e/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
index bbff166..8115502 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
@@ -640,7 +640,7 @@ select *
 from functional.alltypestiny t
 where exists (select * from functional.alltypessmall s where s.id < 5)
 ---- PLAN
-02:NESTED LOOP JOIN [CROSS JOIN]
+02:NESTED LOOP JOIN [LEFT SEMI JOIN]
 |
 |--01:SCAN HDFS [functional.alltypessmall s]
 |     partitions=4/4 files=4 size=6.32KB
@@ -658,7 +658,7 @@ where exists
    from functional.alltypesagg where tinyint_col = 10
    group by id, int_col, bigint_col)
 ---- PLAN
-03:NESTED LOOP JOIN [CROSS JOIN]
+03:NESTED LOOP JOIN [RIGHT SEMI JOIN]
 |
 |--00:SCAN HDFS [functional.alltypestiny t]
 |     partitions=4/4 files=4 size=460B
@@ -678,6 +678,68 @@ where exists (select * from functional.alltypessmall limit 0)
 ---- PLAN
 00:EMPTYSET
 ====
+# Uncorrelated NOT EXISTS
+select *
+from functional.alltypestiny t
+where not exists (select * from functional.alltypessmall s where s.id < 5)
+---- PLAN
+02:NESTED LOOP JOIN [LEFT ANTI JOIN]
+|
+|--01:SCAN HDFS [functional.alltypessmall s]
+|     partitions=4/4 files=4 size=6.32KB
+|     predicates: s.id < 5
+|     limit: 1
+|
+00:SCAN HDFS [functional.alltypestiny t]
+   partitions=4/4 files=4 size=460B
+====
+# Uncorrelated NOT exists referencing a WITH clause
+with
+  w1 as (select * from functional.alltypestiny t),
+  w2 as (select * from functional.alltypessmall s where s.id < 0)
+select *
+from w1 t
+where not exists (select 1 from w2)
+---- PLAN
+02:NESTED LOOP JOIN [LEFT ANTI JOIN]
+|
+|--01:SCAN HDFS [functional.alltypessmall s]
+|     partitions=4/4 files=4 size=6.32KB
+|     predicates: s.id < 0
+|     limit: 1
+|
+00:SCAN HDFS [functional.alltypestiny t]
+   partitions=4/4 files=4 size=460B
+====
+# Uncorrelated NOT EXISTS with an analytic function and a group by clause
+select 1
+from functional.alltypestiny t
+where not exists
+  (select id, max(int_col) over (partition by bigint_col)
+   from functional.alltypesagg where tinyint_col = 10
+   group by id, int_col, bigint_col)
+---- PLAN
+03:NESTED LOOP JOIN [RIGHT ANTI JOIN]
+|
+|--00:SCAN HDFS [functional.alltypestiny t]
+|     partitions=4/4 files=4 size=460B
+|
+02:AGGREGATE [FINALIZE]
+|  group by: id, int_col, bigint_col
+|  limit: 1
+|
+01:SCAN HDFS [functional.alltypesagg]
+   partitions=11/11 files=11 size=814.73KB
+   predicates: tinyint_col = 10
+====
+# Uncorrelated NOT EXISTS with a LIMIT 0 clause
+select 1
+from functional.alltypestiny t
+where not exists (select * from functional.alltypessmall limit 0)
+---- PLAN
+00:SCAN HDFS [functional.alltypestiny t]
+   partitions=4/4 files=4 size=460B
+====
 # Multiple nesting levels
 select count(*)
 from functional.alltypes a

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/46c3e43e/testdata/workloads/functional-query/queries/QueryTest/subquery.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/subquery.test b/testdata/workloads/functional-query/queries/QueryTest/subquery.test
index af587f0..32b74d3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/subquery.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/subquery.test
@@ -330,6 +330,33 @@ and t.id > 0
 TINYINT
 ====
 ---- QUERY
+# Uncorrelated NOT EXISTS
+select id
+from functional.alltypestiny t
+where not exists (select 1 from functional.alltypessmall where bool_col = false)
+and bool_col = true
+---- RESULTS
+---- TYPES
+INT
+====
+---- QUERY
+# Uncorrelated NOT EXISTS that returns an empty set
+select 1
+from functional.alltypestiny t
+where not exists (select null from functional.alltypessmall where id < 0)
+and t.id > 0
+---- RESULTS
+1
+1
+1
+1
+1
+1
+1
+---- TYPES
+TINYINT
+====
+---- QUERY
 # Uncorrelated aggregate subquery
 select count(*) from
 functional.alltypessmall t


[09/10] incubator-impala git commit: IMPALA-3534: allow overriding of CMAKE_CXX_COMPILER for ASAN

Posted by ta...@apache.org.
IMPALA-3534: allow overriding of CMAKE_CXX_COMPILER for ASAN

This makes it consistent with the regular toolchain and makes it easier
to use wrapper scripts like distcc.

Change-Id: I3ab488182c46f9ccb1850a0a2b064653e7e3da26
Reviewed-on: http://gerrit.cloudera.org:8080/3050
Reviewed-by: Jim Apple <jb...@cloudera.com>
Reviewed-by: Casey Ching <ca...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/2b61ae7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/2b61ae7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/2b61ae7f

Branch: refs/heads/master
Commit: 2b61ae7f2a31cdddc73d2a8eb5a6fc3d3272c2b2
Parents: e96b463
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu May 12 13:51:03 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 23:06:36 2016 -0700

----------------------------------------------------------------------
 cmake_modules/asan_toolchain.cmake | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2b61ae7f/cmake_modules/asan_toolchain.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/asan_toolchain.cmake b/cmake_modules/asan_toolchain.cmake
index bdc6297..05997bf 100644
--- a/cmake_modules/asan_toolchain.cmake
+++ b/cmake_modules/asan_toolchain.cmake
@@ -24,7 +24,13 @@ set(GCC_ROOT $ENV{IMPALA_TOOLCHAIN}/gcc-$ENV{IMPALA_GCC_VERSION})
 set(LLVM_ASAN_ROOT $ENV{IMPALA_TOOLCHAIN}/llvm-$ENV{IMPALA_LLVM_ASAN_VERSION})
 
 set(CMAKE_C_COMPILER ${LLVM_ASAN_ROOT}/bin/clang)
-set(CMAKE_CXX_COMPILER ${LLVM_ASAN_ROOT}/bin/clang++)
+
+# Use clang to build unless overridden by environment.
+if($ENV{IMPALA_CXX_COMPILER} STREQUAL "default")
+  set(CMAKE_CXX_COMPILER ${LLVM_ASAN_ROOT}/bin/clang++)
+else()
+  set(CMAKE_CXX_COMPILER $ENV{IMPALA_CXX_COMPILER})
+endif()
 
 # Add the GCC root location to the compiler flags
 set(CXX_COMMON_FLAGS "--gcc-toolchain=${GCC_ROOT}")


[03/10] incubator-impala git commit: IMPALA-3480: Add query options for min/max filter sizes

Posted by ta...@apache.org.
IMPALA-3480: Add query options for min/max filter sizes

This patch adds two query options for runtime filters:

  RUNTIME_FILTER_MAX_SIZE
  RUNTIME_FILTER_MIN_SIZE

These options define the minimum and maximum filter sizes for a filter,
no matter what the estimates produced by the planner are. Filter sizes
are rounded up to the nearest power of two.

Change-Id: I5c13c200a0f1855f38a5da50ca34a737e741868b
Reviewed-on: http://gerrit.cloudera.org:8080/2966
Tested-by: Internal Jenkins
Reviewed-by: Henry Robinson <he...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/df1412c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/df1412c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/df1412c9

Branch: refs/heads/master
Commit: df1412c962945fe6e69591e80354fad692413ba3
Parents: 14cdb04
Author: Henry Robinson <he...@cloudera.com>
Authored: Thu May 5 10:00:29 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 23:06:35 2016 -0700

----------------------------------------------------------------------
 be/src/exec/hash-table-test.cc                  |  6 +--
 be/src/exec/hash-table.h                        |  2 +-
 be/src/exec/nested-loop-join-node.cc            | 12 ++---
 be/src/exec/partitioned-aggregation-node.cc     |  2 +-
 be/src/runtime/runtime-filter.cc                | 34 ++++++++----
 be/src/runtime/runtime-filter.h                 | 14 +++--
 be/src/service/query-options-test.cc            | 56 +++++++++++---------
 be/src/service/query-options.cc                 | 19 +++++--
 be/src/service/query-options.h                  |  6 ++-
 be/src/util/bit-util.h                          |  7 ++-
 be/src/util/debug-util.cc                       |  5 +-
 be/src/util/debug-util.h                        |  1 +
 be/src/util/fixed-size-hash-table.h             |  4 +-
 be/src/util/parse-util-test.cc                  | 12 ++++-
 be/src/util/parse-util.cc                       |  6 +++
 be/src/util/parse-util.h                        |  1 +
 common/thrift/ImpalaInternalService.thrift      |  6 +++
 common/thrift/ImpalaService.thrift              |  6 +++
 .../queries/QueryTest/runtime_filters.test      | 46 ++++++++++++++++
 .../queries/QueryTest/runtime_filters_wait.test |  1 +
 20 files changed, 183 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index bef3f1e..25cd4f1 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -189,7 +189,7 @@ class HashTableTest : public testing::Test {
         &tracker_, runtime_state_, &client).ok());
 
     // Initial_num_buckets must be a power of two.
-    EXPECT_EQ(initial_num_buckets, BitUtil::NextPowerOfTwo(initial_num_buckets));
+    EXPECT_EQ(initial_num_buckets, BitUtil::RoundUpToPowerOfTwo(initial_num_buckets));
     int64_t max_num_buckets = 1L << 31;
     table->reset(new HashTable(quadratic, runtime_state_, client, 1, NULL,
           max_num_buckets, initial_num_buckets));
@@ -354,12 +354,12 @@ class HashTableTest : public testing::Test {
     ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);
 
     // Resize and try again.
-    int target_size = BitUtil::NextPowerOfTwo(2 * total_rows);
+    int target_size = BitUtil::RoundUpToPowerOfTwo(2 * total_rows);
     ResizeTable(hash_table.get(), target_size, &ht_ctx);
     EXPECT_EQ(hash_table->num_buckets(), target_size);
     ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);
 
-    target_size = BitUtil::NextPowerOfTwo(total_rows + 1);
+    target_size = BitUtil::RoundUpToPowerOfTwo(total_rows + 1);
     ResizeTable(hash_table.get(), target_size, &ht_ctx);
     EXPECT_EQ(hash_table->num_buckets(), target_size);
     ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index e496cde..3d090a8 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -423,7 +423,7 @@ class HashTable {
   /// rounded up to a power of two, and also assumes that there are no duplicates.
   static int64_t EstimateNumBuckets(int64_t num_rows) {
     /// Assume max 66% fill factor and no duplicates.
-    return BitUtil::NextPowerOfTwo(3 * num_rows / 2);
+    return BitUtil::RoundUpToPowerOfTwo(3 * num_rows / 2);
   }
   static int64_t EstimateSize(int64_t num_rows) {
     int64_t num_buckets = EstimateNumBuckets(num_rows);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index 5789534..646c089 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -316,7 +316,7 @@ Status NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state,
     RowBatch* output_batch) {
   ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
   size_t num_join_ctxs = join_conjunct_ctxs_.size();
-  const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+  const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
 
   while (!eos_) {
     DCHECK(HasValidProbeRow());
@@ -361,7 +361,7 @@ Status NestedLoopJoinNode::GetNextLeftAntiJoin(RuntimeState* state,
     RowBatch* output_batch) {
   ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
   size_t num_join_ctxs = join_conjunct_ctxs_.size();
-  const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+  const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
 
   while (!eos_) {
     DCHECK(HasValidProbeRow());
@@ -414,7 +414,7 @@ Status NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state,
   ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
   size_t num_join_ctxs = join_conjunct_ctxs_.size();
   DCHECK(matching_build_rows_ != NULL);
-  const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+  const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
 
   while (!eos_) {
     DCHECK(HasValidProbeRow());
@@ -471,7 +471,7 @@ Status NestedLoopJoinNode::GetNextRightAntiJoin(RuntimeState* state,
   ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
   size_t num_join_ctxs = join_conjunct_ctxs_.size();
   DCHECK(matching_build_rows_ != NULL);
-  const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+  const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
 
   while (!eos_ && HasMoreProbeRows()) {
     DCHECK(HasValidProbeRow());
@@ -557,7 +557,7 @@ Status NestedLoopJoinNode::ProcessUnmatchedBuildRows(
   size_t num_ctxs = conjunct_ctxs_.size();
   DCHECK(matching_build_rows_ != NULL);
 
-  const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+  const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
   while (!build_row_iterator_.AtEnd()) {
     // This loop can go on for a long time if the conjuncts are very selective. Do query
     // maintenance every N iterations.
@@ -612,7 +612,7 @@ Status NestedLoopJoinNode::FindBuildMatches(
   ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
   size_t num_ctxs = conjunct_ctxs_.size();
 
-  const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+  const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
   while (!build_row_iterator_.AtEnd()) {
     DCHECK(current_probe_row_ != NULL);
     TupleRow* output_row = output_batch->GetRow(output_batch->AddRow());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index a1fef0f..0bf51a9 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -432,7 +432,7 @@ Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
 
   SCOPED_TIMER(get_results_timer_);
   int count = 0;
-  const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+  const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
   // Keeping returning rows from the current partition.
   while (!output_iterator_.AtEnd()) {
     // This loop can go on for a long time if the conjuncts are very selective. Do query

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/runtime/runtime-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc
index 2659125..7616320 100644
--- a/be/src/runtime/runtime-filter.cc
+++ b/be/src/runtime/runtime-filter.cc
@@ -32,8 +32,8 @@ DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability o
 
 const int RuntimeFilter::SLEEP_PERIOD_MS = 20;
 
-const int32_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
-const int32_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
+const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
+const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
 
 RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state)
     : query_ctx_(query_ctx), state_(state), closed_(false) {
@@ -41,10 +41,26 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* s
       state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES);
 
   // Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE
-  int32_t bloom_filter_size = query_ctx_.request.query_options.runtime_bloom_filter_size;
-  bloom_filter_size = std::max(bloom_filter_size, MIN_BLOOM_FILTER_SIZE);
-  bloom_filter_size = std::min(bloom_filter_size, MAX_BLOOM_FILTER_SIZE);
-  default_log_filter_size_ = Bits::Log2Ceiling64(bloom_filter_size);
+  max_filter_size_ = query_ctx_.request.query_options.runtime_filter_max_size;
+  max_filter_size_ = max<int64_t>(max_filter_size_, MIN_BLOOM_FILTER_SIZE);
+  max_filter_size_ =
+      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(max_filter_size_, MAX_BLOOM_FILTER_SIZE));
+
+  min_filter_size_ = query_ctx_.request.query_options.runtime_filter_min_size;
+  min_filter_size_ = max<int64_t>(min_filter_size_, MIN_BLOOM_FILTER_SIZE);
+  min_filter_size_ =
+      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(min_filter_size_, MAX_BLOOM_FILTER_SIZE));
+
+  // Make sure that min <= max
+  min_filter_size_ = min<int64_t>(min_filter_size_, max_filter_size_);
+
+  DCHECK_GT(min_filter_size_, 0);
+  DCHECK_GT(max_filter_size_, 0);
+
+  default_filter_size_ = query_ctx_.request.query_options.runtime_bloom_filter_size;
+  default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_);
+  default_filter_size_ =
+      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_));
 }
 
 RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
@@ -172,11 +188,11 @@ BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
 }
 
 int64_t RuntimeFilterBank::GetFilterSizeForNdv(int64_t ndv) {
-  if (ndv == -1) return 1LL << default_log_filter_size_;
+  if (ndv == -1) return default_filter_size_;
   int64_t required_space =
       1LL << BloomFilter::MinLogSpace(ndv, FLAGS_max_filter_error_rate);
-  if (required_space > MAX_BLOOM_FILTER_SIZE) required_space = MAX_BLOOM_FILTER_SIZE;
-  if (required_space < MIN_BLOOM_FILTER_SIZE) required_space = MIN_BLOOM_FILTER_SIZE;
+  required_space = max<int64_t>(required_space, min_filter_size_);
+  required_space = min<int64_t>(required_space, max_filter_size_);
   return required_space;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/runtime/runtime-filter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 50c77b0..178c03f 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -100,8 +100,8 @@ class RuntimeFilterBank {
   /// Releases all memory allocated for BloomFilters.
   void Close();
 
-  static const int32_t MIN_BLOOM_FILTER_SIZE = 4 * 1024;           // 4KB
-  static const int32_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024;   // 16MB
+  static const int64_t MIN_BLOOM_FILTER_SIZE = 4 * 1024;           // 4KB
+  static const int64_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024;   // 16MB
 
  private:
   /// Returns the the space (in bytes) required for a filter to achieve the configured
@@ -136,8 +136,14 @@ class RuntimeFilterBank {
   /// Total amount of memory allocated to Bloom Filters
   RuntimeProfile::Counter* memory_allocated_;
 
-  /// Precomputed logarithm of default BloomFilter heap size.
-  int default_log_filter_size_;
+  /// Precomputed default BloomFilter size.
+  int64_t default_filter_size_;
+
+  /// Maximum filter size, in bytes, rounded up to a power of two.
+  int64_t max_filter_size_;
+
+  /// Minimum filter size, in bytes, rounded up to a power of two.
+  int64_t min_filter_size_;
 };
 
 /// RuntimeFilters represent set-membership predicates (implemented with bloom filters)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 83a5770..53767d9 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -29,30 +29,38 @@ using namespace std;
 
 TEST(QueryOptions, SetBloomSize) {
   TQueryOptions options;
-
-  // The upper and lower bound of the allowed values:
-  EXPECT_FALSE(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE",
-     lexical_cast<string>(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE - 1), &options, NULL)
-     .ok());
-
-  EXPECT_FALSE(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE",
-      lexical_cast<string>(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE + 1), &options, NULL)
-      .ok());
-
-  EXPECT_OK(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE",
-      lexical_cast<string>(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE), &options, NULL));
-  EXPECT_EQ(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE, options.runtime_bloom_filter_size);
-
-  EXPECT_OK(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE",
-      lexical_cast<string>(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE), &options, NULL));
-  EXPECT_EQ(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE, options.runtime_bloom_filter_size);
-
-  // Parsing memory values works in a reasonable way:
-  EXPECT_OK(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE", "1MB", &options, NULL));
-  EXPECT_EQ(1 << 20, options.runtime_bloom_filter_size);
-
-  // Bloom filters cannot occupy a percentage of memory:
-  EXPECT_FALSE(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE", "10%", &options, NULL).ok());
+  vector<pair<string, int*>> option_list = {
+    {"RUNTIME_BLOOM_FILTER_SIZE", &options.runtime_bloom_filter_size},
+    {"RUNTIME_FILTER_MAX_SIZE", &options.runtime_filter_max_size},
+    {"RUNTIME_FILTER_MIN_SIZE", &options.runtime_filter_min_size}};
+  for (const auto& option: option_list) {
+
+    // The upper and lower bound of the allowed values:
+    EXPECT_FALSE(SetQueryOption(option.first,
+        lexical_cast<string>(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE - 1), &options,
+        NULL)
+        .ok());
+
+    EXPECT_FALSE(SetQueryOption(option.first,
+        lexical_cast<string>(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE + 1), &options,
+        NULL)
+        .ok());
+
+    EXPECT_OK(SetQueryOption(option.first,
+        lexical_cast<string>(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE), &options, NULL));
+    EXPECT_EQ(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE, *option.second);
+
+    EXPECT_OK(SetQueryOption(option.first,
+        lexical_cast<string>(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE), &options, NULL));
+    EXPECT_EQ(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE, *option.second);
+
+    // Parsing memory values works in a reasonable way:
+    EXPECT_OK(SetQueryOption(option.first, "1MB", &options, NULL));
+    EXPECT_EQ(1 << 20, *option.second);
+
+    // Bloom filters cannot occupy a percentage of memory:
+    EXPECT_FALSE(SetQueryOption(option.first, "10%", &options, NULL).ok());
+  }
 }
 
 TEST(QueryOptions, SetFilterWait) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index ce538bf..4617256 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -325,17 +325,26 @@ Status impala::SetQueryOption(const string& key, const string& value,
               " OFF(0), LOCAL(1) or GLOBAL(2).", value));
         }
         break;
+      case TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE:
+      case TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE:
       case TImpalaQueryOptions::RUNTIME_BLOOM_FILTER_SIZE: {
         int64_t size;
         RETURN_IF_ERROR(ParseMemValue(value, "Bloom filter size", &size));
         if (size < RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE ||
             size > RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE) {
-          return Status(Substitute(
-              "$0 is not a valid Bloom filter size. Valid sizes are in [$1, $2].", value,
-              RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
-              RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE));
+          return Status(Substitute("$0 is not a valid Bloom filter size for $1. "
+                  "Valid sizes are in [$2, $3].", value, PrintTImpalaQueryOptions(
+                      static_cast<TImpalaQueryOptions::type>(option)),
+                  RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
+                  RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE));
+        }
+        if (option == TImpalaQueryOptions::RUNTIME_BLOOM_FILTER_SIZE) {
+          query_options->__set_runtime_bloom_filter_size(size);
+        } else if (option == TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE) {
+          query_options->__set_runtime_filter_min_size(size);
+        } else if (option == TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE) {
+          query_options->__set_runtime_filter_max_size(size);
         }
-        query_options->__set_runtime_bloom_filter_size(size);
         break;
       }
       case TImpalaQueryOptions::RUNTIME_FILTER_WAIT_TIME_MS: {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 56e2e1a..6f4c214 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -32,7 +32,7 @@ class TQueryOptions;
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::S3_SKIP_INSERT_STAGING + 1);\
+      TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE + 1);\
   QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -77,7 +77,9 @@ class TQueryOptions;
   QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8)\
   QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION)\
   QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES)\
-  QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING);
+  QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING)\
+  QUERY_OPT_FN(runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE)\
+  QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE);
 
 /// Converts a TQueryOptions struct into a map of key, value pairs.
 void TQueryOptionsToMap(const TQueryOptions& query_options,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/bit-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h
index e255f0c..eed6df3 100644
--- a/be/src/util/bit-util.h
+++ b/be/src/util/bit-util.h
@@ -51,11 +51,10 @@ class BitUtil {
     return (value / factor) * factor;
   }
 
-  /// Returns the smallest power of two that contains v. Taken from
+  /// Returns the smallest power of two that contains v. If v is a power of two, v is
+  /// returned. Taken from
   /// http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
-  /// TODO: Pick a better name, as it is not clear what happens when the input is
-  /// already a power of two.
-  static inline int64_t NextPowerOfTwo(int64_t v) {
+  static inline int64_t RoundUpToPowerOfTwo(int64_t v) {
     --v;
     v |= v >> 1;
     v |= v >> 2;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/debug-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index 7e6a290..4cf606b 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -56,7 +56,8 @@ namespace impala {
 // Macro to stamp out operator<< for thrift enums.  Why doesn't thrift do this?
 #define THRIFT_ENUM_OUTPUT_FN(E) THRIFT_ENUM_OUTPUT_FN_IMPL(E , _##E##_VALUES_TO_NAMES)
 
-// Macro to implement Print function that returns string for thrift enums
+// Macro to implement Print function that returns string for thrift enums. Make sure you
+// define a corresponding THRIFT_ENUM_OUTPUT_FN.
 #define THRIFT_ENUM_PRINT_FN(E) \
   string Print##E(const E::type& e) {\
     stringstream ss;\
@@ -78,6 +79,7 @@ THRIFT_ENUM_OUTPUT_FN(CompressionCodec);
 THRIFT_ENUM_OUTPUT_FN(Type);
 THRIFT_ENUM_OUTPUT_FN(TMetricKind);
 THRIFT_ENUM_OUTPUT_FN(TUnit);
+THRIFT_ENUM_OUTPUT_FN(TImpalaQueryOptions);
 
 THRIFT_ENUM_PRINT_FN(TCatalogObjectType);
 THRIFT_ENUM_PRINT_FN(TDdlType);
@@ -88,6 +90,7 @@ THRIFT_ENUM_PRINT_FN(QueryState);
 THRIFT_ENUM_PRINT_FN(Encoding);
 THRIFT_ENUM_PRINT_FN(TMetricKind);
 THRIFT_ENUM_PRINT_FN(TUnit);
+THRIFT_ENUM_PRINT_FN(TImpalaQueryOptions);
 
 
 ostream& operator<<(ostream& os, const TUniqueId& id) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/debug-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 6872e66..c9550dc 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -69,6 +69,7 @@ std::string PrintEncoding(const parquet::Encoding::type& type);
 std::string PrintAsHex(const char* bytes, int64_t len);
 std::string PrintTMetricKind(const TMetricKind::type& type);
 std::string PrintTUnit(const TUnit::type& type);
+std::string PrintTImpalaQueryOptions(const TImpalaQueryOptions::type& type);
 /// Returns the fully qualified path, e.g. "database.table.array_col.item.field"
 std::string PrintPath(const TableDescriptor& tbl_desc, const SchemaPath& path);
 /// Returns the numeric path without column/field names, e.g. "[0,1,2]"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/fixed-size-hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/util/fixed-size-hash-table.h b/be/src/util/fixed-size-hash-table.h
index 8ecb328..828769e 100644
--- a/be/src/util/fixed-size-hash-table.h
+++ b/be/src/util/fixed-size-hash-table.h
@@ -46,8 +46,8 @@ class FixedSizeHashTable {
     DCHECK_GT(min_capacity, 0);
     // Capacity cannot be greater than largest uint32_t power of two.
     capacity_ = static_cast<uint32_t>(std::min(static_cast<int64_t>(1) << 31,
-        BitUtil::NextPowerOfTwo(min_capacity)));
-    DCHECK_EQ(capacity_, BitUtil::NextPowerOfTwo(capacity_));
+        BitUtil::RoundUpToPowerOfTwo(min_capacity)));
+    DCHECK_EQ(capacity_, BitUtil::RoundUpToPowerOfTwo(capacity_));
     if (tbl_ != NULL) free(tbl_);
     int64_t tbl_byte_size = capacity_ * sizeof(Entry);
     tbl_ = reinterpret_cast<Entry*>(malloc(tbl_byte_size));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/parse-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parse-util-test.cc b/be/src/util/parse-util-test.cc
index a6726fd..bd49371 100644
--- a/be/src/util/parse-util-test.cc
+++ b/be/src/util/parse-util-test.cc
@@ -31,7 +31,8 @@ TEST(ParseMemSpecs, Basic) {
   bool is_percent;
   int64_t bytes;
 
-  int64_t megabytes = 1024 * 1024;
+  int64_t kilobytes = 1024;
+  int64_t megabytes = 1024 * kilobytes;
   int64_t gigabytes = 1024 * megabytes;
 
   bytes = ParseUtil::ParseMemSpec("1", &is_percent, MemInfo::physical_mem());
@@ -42,6 +43,14 @@ TEST(ParseMemSpecs, Basic) {
   ASSERT_EQ(100, bytes);
   ASSERT_FALSE(is_percent);
 
+  bytes = ParseUtil::ParseMemSpec("100kb", &is_percent, MemInfo::physical_mem());
+  ASSERT_EQ(100 * 1024, bytes);
+  ASSERT_FALSE(is_percent);
+
+  bytes = ParseUtil::ParseMemSpec("5KB", &is_percent, MemInfo::physical_mem());
+  ASSERT_EQ(5 * 1024, bytes);
+  ASSERT_FALSE(is_percent);
+
   bytes = ParseUtil::ParseMemSpec("4MB", &is_percent, MemInfo::physical_mem());
   ASSERT_EQ(4 * megabytes, bytes);
   ASSERT_FALSE(is_percent);
@@ -77,6 +86,7 @@ TEST(ParseMemSpecs, Basic) {
   bad_values.push_back("gb");
   bad_values.push_back("1GMb");
   bad_values.push_back("1b1Mb");
+  bad_values.push_back("1kib");
   bad_values.push_back("1Bb");
   bad_values.push_back("1%%");
   bad_values.push_back("1.1");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/parse-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parse-util.cc b/be/src/util/parse-util.cc
index 60cf3ac..1a0af85 100644
--- a/be/src/util/parse-util.cc
+++ b/be/src/util/parse-util.cc
@@ -49,6 +49,12 @@ int64_t ParseUtil::ParseMemSpec(const string& mem_spec_str, bool* is_percent,
       number_str_len--;
       multiplier = 1024L * 1024L;
       break;
+    case 'k':
+    case 'K':
+      // Kilobytes
+      number_str_len--;
+      multiplier = 1024L;
+      break;
     case '%':
       // Don't allow a suffix of "%B".
       if (suffix_char != mem_spec_str.rbegin()) return -1;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/parse-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/parse-util.h b/be/src/util/parse-util.h
index 5b13137..02ce120 100644
--- a/be/src/util/parse-util.h
+++ b/be/src/util/parse-util.h
@@ -27,6 +27,7 @@ class ParseUtil {
   /// Sets *is_percent to indicate whether the given spec is in percent.
   /// Accepted formats:
   /// '<int>[bB]?'  -> bytes (default if no unit given)
+  /// '<float>[kK(bB)]' -> kilobytes
   /// '<float>[mM(bB)]' -> megabytes
   /// '<float>[gG(bB)]' -> in gigabytes
   /// '<int>%'      -> in percent of relative_reference

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 611155c..b9892e6 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -192,6 +192,12 @@ struct TQueryOptions {
   // those queries, the coordinator deletes all files in the final location before copying
   // the files there.
   45: optional bool s3_skip_insert_staging = true
+
+  // Minimum runtime filter size, in bytes
+  46: optional i32 runtime_filter_min_size = 1048576
+
+  // Maximum runtime filter size, in bytes
+  47: optional i32 runtime_filter_max_size = 16777216
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 0a030ad..68647df 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -219,6 +219,12 @@ enum TImpalaQueryOptions {
   // the files there.
   // TODO: Find a way to get this working for INSERT OVERWRITEs too.
   S3_SKIP_INSERT_STAGING
+
+  // Maximum runtime filter size, in bytes.
+  RUNTIME_FILTER_MAX_SIZE,
+
+  // Minimum runtime filter size, in bytes.
+  RUNTIME_FILTER_MIN_SIZE
 }
 
 // The summary of an insert.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
index 4432810..2d32064 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
@@ -259,6 +259,7 @@ row_regex: .*RowsReturned: 2.43K .*
 
 SET RUNTIME_FILTER_WAIT_TIME_MS=15000;
 SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_MAX_SIZE=4K;
 select STRAIGHT_JOIN count(*) from alltypes a
     join [BROADCAST]
     # Build-side needs to be sufficiently large to trigger FP check.
@@ -335,6 +336,7 @@ select STRAIGHT_JOIN count(a.id) from alltypes a
 ####################################################
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=4KB;
 with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
     join (select * from l LIMIT 1) b on a.l_orderkey = -b.l_orderkey;
@@ -347,6 +349,7 @@ row_regex: .*Filter 0 \(4.00 KB\).*
 ---- QUERY
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=4KB;
 with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
     join (select * from l LIMIT 500000) b on a.l_orderkey = -b.l_orderkey;
@@ -359,6 +362,7 @@ row_regex: .*Filter 0 \(256.00 KB\).*
 ---- QUERY
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=4KB;
 with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
     join (select * from l LIMIT 1000000) b on a.l_orderkey = -b.l_orderkey;
@@ -371,6 +375,7 @@ row_regex: .*Filter 0 \(512.00 KB\).*
 ---- QUERY
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=4KB;
 with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
     join (select * from l LIMIT 2000000) b on a.l_orderkey = -b.l_orderkey;
@@ -380,3 +385,44 @@ select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
 row_regex: .*1 of 1 Runtime Filter Published.*
 row_regex: .*Filter 0 \(1.00 MB\).*
 ====
+
+
+---- QUERY
+####################################################
+# Test case 16: Filter sizes respect query options
+####################################################
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=8KB;
+SET RUNTIME_FILTER_MAX_SIZE=8KB;
+# This query would produce a 4KB filter without setting the minimum size.
+select STRAIGHT_JOIN count(*) from alltypes a join [SHUFFLE] alltypes b on a.id = b.id;
+---- RESULTS
+7300
+---- RUNTIME_PROFILE
+row_regex: .*1 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(8.00 KB\).*
+====
+---- QUERY
+# Check that filter sizes are rounded up to power-of-two
+SET RUNTIME_FILTER_MIN_SIZE=6000B;
+SET RUNTIME_FILTER_MAX_SIZE=6000B;
+select STRAIGHT_JOIN count(*) from alltypes a join [SHUFFLE] alltypes b on a.id = b.id;
+---- RESULTS
+7300
+---- RUNTIME_PROFILE
+row_regex: .*1 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(8.00 KB\).*
+====
+---- QUERY
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MAX_SIZE=8192;
+# Query would produce a 512KB filter without setting the max
+with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
+select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
+    join (select * from l LIMIT 1000000) b on a.l_orderkey = -b.l_orderkey;
+---- RUNTIME_PROFILE
+row_regex: .*0 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(8.00 KB\).*
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test
index 324eb1c..4743f3e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test
@@ -24,6 +24,7 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 
 SET RUNTIME_FILTER_WAIT_TIME_MS=600000;
 SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_MAX_SIZE=4096;
 select STRAIGHT_JOIN count(*) from alltypes a
     join [BROADCAST]
     # Build-side needs to be sufficiently large to trigger FP check.


[07/10] incubator-impala git commit: IMPALA-1578: fix text scanner to handle "\r\n" delimiters split across blocks

Posted by ta...@apache.org.
IMPALA-1578: fix text scanner to handle "\r\n" delimiters split across blocks

This patch modifies HdfsTextScanner to specifically check for split
"\r\n" delimiters when the scan range ends with '\r'. If there does
turn out to be a split delimiter, the next tuple is considered the
responsibility of the next scan range's scanner, as if the delimiter
appeared fully in the second scan range. This should not affect the
overall performance characteristics of the text scanner since it
already must do a remote read past the end of the scan range to read
the last tuple.

Change-Id: Id42b441674bb21517ad2788b99942a4b5dc55420
Reviewed-on: http://gerrit.cloudera.org:8080/2803
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9174dee3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9174dee3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9174dee3

Branch: refs/heads/master
Commit: 9174dee395bd1391b54224804aaab18254910cd0
Parents: 2b61ae7
Author: Skye Wanderman-Milne <sk...@cloudera.com>
Authored: Thu May 12 16:12:03 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 23:06:36 2016 -0700

----------------------------------------------------------------------
 be/src/exec/delimited-text-parser.cc |  5 --
 be/src/exec/hdfs-text-scanner.cc     | 84 +++++++++++++++++++++++++++----
 be/src/exec/hdfs-text-scanner.h      | 21 ++++++++
 tests/query_test/test_scanners.py    | 77 ++++++++++++++++++++++++++++
 4 files changed, 171 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174dee3/be/src/exec/delimited-text-parser.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.cc b/be/src/exec/delimited-text-parser.cc
index f1185a4..950eae4 100644
--- a/be/src/exec/delimited-text-parser.cc
+++ b/be/src/exec/delimited-text-parser.cc
@@ -284,11 +284,6 @@ restart:
     if (num_escape_chars % 2 != 0) goto restart;
   }
 
-  if (tuple_start == len - 1 && buffer_start[tuple_start] == '\r') {
-    // If \r is the last char we need to wait to see if the next one is \n or not.
-    last_row_delim_offset_ = 0;
-    return -1;
-  }
   if (tuple_start < len && buffer_start[tuple_start] == '\n' &&
       buffer_start[tuple_start - 1] == '\r') {
     // We have \r\n, move to the next character.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174dee3/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index dee34dc..dd0d524 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -249,6 +249,20 @@ Status HdfsTextScanner::ResetScanner() {
 Status HdfsTextScanner::FinishScanRange() {
   if (scan_node_->ReachedLimit()) return Status::OK();
 
+  DCHECK_EQ(byte_buffer_ptr_, byte_buffer_end_);
+  bool split_delimiter;
+  RETURN_IF_ERROR(CheckForSplitDelimiter(&split_delimiter));
+  if (split_delimiter) {
+    // If the scan range ends on the '\r' of a "\r\n", the next tuple is considered part
+    // of the next scan range. Nothing to do since we already fully parsed the previous
+    // tuple.
+    DCHECK(!delimited_text_parser_->HasUnfinishedTuple());
+    DCHECK(partial_tuple_empty_);
+    DCHECK(boundary_column_.Empty());
+    DCHECK(boundary_row_.Empty());
+    return Status::OK();
+  }
+
   // For text we always need to scan past the scan range to find the next delimiter
   while (true) {
     bool eosr = true;
@@ -576,29 +590,47 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) {
   if (num_rows_to_skip > 0) {
     int num_skipped_rows = 0;
     *tuple_found = false;
-    while (true) {
-      bool eosr = false;
-      RETURN_IF_ERROR(FillByteBuffer(&eosr));  // updates byte_buffer_read_size_
+    bool eosr = false;
+    // Offset maybe not point to a tuple boundary, skip ahead to the first tuple start in
+    // this scan range (if one exists).
+    do {
+      RETURN_IF_ERROR(FillByteBuffer(&eosr));
 
       delimited_text_parser_->ParserReset();
       SCOPED_TIMER(parse_delimiter_timer_);
       int next_tuple_offset = 0;
+      int bytes_left = byte_buffer_read_size_;
       while (num_skipped_rows < num_rows_to_skip) {
         next_tuple_offset = delimited_text_parser_->FindFirstInstance(byte_buffer_ptr_,
-          byte_buffer_read_size_);
+          bytes_left);
         if (next_tuple_offset == -1) break;
         byte_buffer_ptr_ += next_tuple_offset;
-        byte_buffer_read_size_ -= next_tuple_offset;
+        bytes_left -= next_tuple_offset;
         ++num_skipped_rows;
       }
 
-      if (next_tuple_offset == -1) {
-        // Didn't find enough new tuples in this buffer, continue with the next one.
-        if (!eosr) continue;
-      } else {
-        *tuple_found = true;
+      if (next_tuple_offset != -1) *tuple_found = true;
+    } while (!*tuple_found && !eosr);
+
+    // Special case: if the first delimiter is at the end of the current buffer, it's
+    // possible it's a split "\r\n" delimiter.
+    if (*tuple_found && byte_buffer_ptr_ == byte_buffer_end_) {
+      bool split_delimiter;
+      RETURN_IF_ERROR(CheckForSplitDelimiter(&split_delimiter));
+      if (split_delimiter) {
+        if (eosr) {
+          // Split delimiter at the end of the scan range. The next tuple is considered
+          // part of the next scan range, so we report no tuple found.
+          *tuple_found = false;
+        } else {
+          // Split delimiter at the end of the current buffer, but not eosr. Advance to
+          // the correct position in the next buffer.
+          RETURN_IF_ERROR(FillByteBuffer(&eosr));
+          DCHECK_GT(byte_buffer_read_size_, 0);
+          DCHECK_EQ(*byte_buffer_ptr_, '\n');
+          byte_buffer_ptr_ += 1;
+        }
       }
-      break;
     }
     if (num_rows_to_skip > 1 && num_skipped_rows != num_rows_to_skip) {
       DCHECK(!*tuple_found);
@@ -613,6 +645,36 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) {
   return Status::OK();
 }
 
+Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) {
+  DCHECK_EQ(byte_buffer_ptr_, byte_buffer_end_);
+  *split_delimiter = false;
+
+  // Nothing in buffer
+  if (byte_buffer_read_size_ == 0) return Status::OK();
+
+  // If the line delimiter is "\n" (meaning we also accept "\r" and "\r\n" as delimiters)
+  // and the current buffer ends with '\r', this could be a "\r\n" delimiter.
+  bool split_delimiter_possible = context_->partition_descriptor()->line_delim() == '\n'
+      && *(byte_buffer_end_ - 1) == '\r';
+  if (!split_delimiter_possible) return Status::OK();
+
+  // The '\r' may be escaped. If it's not the text parser will report a complete tuple.
+  if (delimited_text_parser_->HasUnfinishedTuple()) return Status::OK();
+
+  // Peek ahead one byte to see if the '\r' is followed by '\n'.
+  Status status;
+  uint8_t* next_byte;
+  int64_t out_len;
+  stream_->GetBytes(1, &next_byte, &out_len, &status, /*peek*/ true);
+  RETURN_IF_ERROR(status);
+
+  // No more bytes after current buffer
+  if (out_len == 0) return Status::OK();
+
+  *split_delimiter = *next_byte == '\n';
+  return Status::OK();
+}
+
 // Codegen for materializing parsed data into tuples.  The function WriteCompleteTuple is
 // codegen'd using the IRBuilder for the specific tuple description.  This function
 // is then injected into the cross-compiled driving function, WriteAlignedTuples().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174dee3/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 74ce2e7..03b8343 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -27,6 +27,20 @@ struct HdfsFileDesc;
 
 /// HdfsScanner implementation that understands text-formatted records.
 /// Uses SSE instructions, if available, for performance.
+///
+/// Splitting text files:
+/// This scanner handles text files split across multiple blocks/scan ranges. Note that
+/// the split can occur anywhere in the file, e.g. in the middle of a row. Each scanner
+/// starts materializing tuples right after the first row delimiter found in the scan
+/// range, and stops at the first row delimiter occuring past the end of the scan
+/// range. If no delimiter is found in the scan range, the scanner doesn't materialize
+/// anything. This scheme ensures that every row is materialized by exactly one scanner.
+///
+/// A special case is a "\r\n" row delimiter split across two scan ranges. (When the row
+/// delimiter is '\n', we also consider '\r' and "\r\n" row delimiters.) In this case, the
+/// delimiter is considered part of the second scan range, i.e., the first scan range's
+/// scanner is responsible for the tuple directly before it, and the second scan range's
+/// scanner for the tuple directly after it.
 class HdfsTextScanner : public HdfsScanner {
  public:
   HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state);
@@ -115,6 +129,13 @@ class HdfsTextScanner : public HdfsScanner {
   Status DecompressBufferStream(int64_t bytes_to_read, uint8_t** decompressed_buffer,
       int64_t* decompressed_len, bool *eosr);
 
+  /// Checks if the current buffer ends with a row delimiter spanning this and the next
+  /// buffer (i.e. a "\r\n" delimiter). Does not modify byte_buffer_ptr_, etc. Always
+  /// returns false if the table's row delimiter is not '\n'. This can only be called
+  /// after the buffer has been fully parsed, i.e. when byte_buffer_ptr_ ==
+  /// byte_buffer_end_.
+  Status CheckForSplitDelimiter(bool* split_delimiter);
+
   /// Prepends field data that was from the previous file buffer (This field straddled two
   /// file buffers). 'data' already contains the pointer/len from the current file buffer,
   /// boundary_column_ contains the beginning of the data from the previous file buffer.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174dee3/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 6936765..216d3df 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -9,6 +9,7 @@
 import logging
 import pytest
 import random
+import tempfile
 from copy import deepcopy
 from subprocess import call, check_call
 
@@ -417,6 +418,82 @@ class TestTextScanRangeLengths(ImpalaTestSuite):
       result = self.client.execute("select count(*) from " + t)
       assert result.data == expected_result.data
 
+# Tests behavior of split "\r\n" delimiters.
+class TestTextSplitDelimiters(ImpalaTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestTextSplitDelimiters, cls).add_test_dimensions()
+    cls.TestMatrix.add_constraint(lambda v:\
+        v.get_value('table_format').file_format == 'text' and\
+        v.get_value('table_format').compression_codec == 'none')
+
+  def test_text_split_delimiters(self, vector, unique_database):
+    """Creates and queries a datafile that exercises interesting edge cases around split
+    "\r\n" delimiters. The data file contains the following 4-byte scan ranges:
+
+    abc\r   First scan range, ends with split \r\n
+            - materializes (abc)
+    \nde\r  Initial delimiter found, scan range ends with split \r\n
+            - materializes (de)
+    \nfg\r  Initial delimiter found, scan range ends with \r
+            - materializes (fg),(hij)
+    hij\r   Initial delimiter is \r at end
+            - materializes (klm)
+    klm\r   Initial delimiter is split \r\n
+            - materializes nothing
+    \nno\r  Final scan range, initial delimiter found, ends with \r
+            - materializes (no)
+    """
+    DATA = "abc\r\nde\r\nfg\rhij\rklm\r\nno\r"
+    max_scan_range_length = 4
+    expected_result = ['abc', 'de', 'fg', 'hij', 'klm', 'no']
+
+    self._create_and_query_test_table(
+      vector, unique_database, DATA, max_scan_range_length, expected_result)
+
+  def test_text_split_across_buffers_delimiter(self, vector, unique_database):
+    """Creates and queries a datafile that exercises a split "\r\n" across io buffers (but
+    within a single scan range). We use a 32MB file and 16MB scan ranges, so there are two
+    scan ranges of two io buffers each. The first scan range exercises a split delimiter
+    in the main text parsing algorithm. The second scan range exercises correctly
+    identifying a split delimiter as the first in a scan range."""
+    DEFAULT_IO_BUFFER_SIZE = 8 * 1024 * 1024
+    data = ('a' * (DEFAULT_IO_BUFFER_SIZE - 1) + "\r\n" + # first scan range
+            'b' * (DEFAULT_IO_BUFFER_SIZE - 3) + "\r\n" +
+            'a' * (DEFAULT_IO_BUFFER_SIZE - 1) + "\r\n" +     # second scan range
+            'b' * (DEFAULT_IO_BUFFER_SIZE - 1))
+    assert len(data) == DEFAULT_IO_BUFFER_SIZE * 4
+
+    max_scan_range_length = DEFAULT_IO_BUFFER_SIZE * 2
+    expected_result = data.split("\r\n")
+
+    self._create_and_query_test_table(
+      vector, unique_database, data, max_scan_range_length, expected_result)
+
+  def _create_and_query_test_table(self, vector, unique_database, data,
+        max_scan_range_length, expected_result):
+    TABLE_NAME = "test_text_split_delimiters"
+    qualified_table_name = "%s.%s" % (unique_database, TABLE_NAME)
+    location = get_fs_path("/test-warehouse/%s_%s" % (unique_database, TABLE_NAME))
+    query = "create table %s (s string) location '%s'" % (qualified_table_name, location)
+    self.client.execute(query)
+
+    with tempfile.NamedTemporaryFile() as f:
+      f.write(data)
+      f.flush()
+      check_call(['hadoop', 'fs', '-copyFromLocal', f.name, location])
+    self.client.execute("refresh %s" % qualified_table_name);
+
+    vector.get_value('exec_option')['max_scan_range_length'] = max_scan_range_length
+    query = "select * from %s" % qualified_table_name
+    result = self.execute_query_expect_success(
+      self.client, query, vector.get_value('exec_option'))
+
+    assert sorted(result.data) == sorted(expected_result)
 
 # Test for IMPALA-1740: Support for skip.header.line.count
 class TestTextScanRangeLengths(ImpalaTestSuite):


[08/10] incubator-impala git commit: IMPALA-3311: fix string data coming out of aggs in subplans

Posted by ta...@apache.org.
IMPALA-3311: fix string data coming out of aggs in subplans

The problem: varlen data (e.g. strings) produced by aggregations is
freed by FreeLocalAllocations() after passing up the output
batch. This works for streaming operators or blocking operators that
copy their input, but results in memory corruption when the output
reaches non-copying blocking operators, e.g. SubplanNode and
NestedLoopJoinNode.

The fix: this patch makes the PartitionedAggregationNode copy out
produced string data if the node is in a subplan. Otherwise it calls
MarkNeedsToReturn() on the output batch. Marking the batch would work
in the subplan case as well, but would likely be less efficient since
it would result in many small batches coming out of the subplan.

The patch includes a test case. However, this test only exposes the
problem with an ASAN build and the --disable_mem_pools flag, which we
don't currently have automated testing for.

Change-Id: Iada891504c261ba54f4eb8c9d7e4e5223668d7b9
Reviewed-on: http://gerrit.cloudera.org:8080/2929
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7767d300
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7767d300
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7767d300

Branch: refs/heads/master
Commit: 7767d300a3f018c8c8b32fa72abe5c126900a2be
Parents: cb37774
Author: Skye Wanderman-Milne <sk...@cloudera.com>
Authored: Thu May 12 17:03:12 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 23:06:36 2016 -0700

----------------------------------------------------------------------
 be/src/exec/partitioned-aggregation-node.cc     | 55 ++++++++++++++++++++
 be/src/exec/partitioned-aggregation-node.h      | 14 +++++
 be/src/exprs/agg-fn-evaluator.h                 |  1 +
 .../queries/QueryTest/nested-types-runtime.test | 16 ++++++
 .../queries/subplan_aggregation.test            | 11 ++++
 5 files changed, 97 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7767d300/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index 0bf51a9..b7dca61 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -360,6 +360,61 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) {
 
 Status PartitionedAggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch,
     bool* eos) {
+  int first_row_idx = row_batch->num_rows();
+  RETURN_IF_ERROR(GetNextInternal(state, row_batch, eos));
+  RETURN_IF_ERROR(HandleOutputStrings(row_batch, first_row_idx));
+  return Status::OK();
+}
+
+Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch,
+    int first_row_idx) {
+  if (!needs_finalize_ && !needs_serialize_) return Status::OK();
+  // String data returned by Serialize() or Finalize() is from local expr allocations in
+  // the agg function contexts, and will be freed on the next GetNext() call by
+  // FreeLocalAllocations(). The data either needs to be copied out or sent up the plan
+  // tree via MarkNeedToReturn(). (See IMPALA-3311)
+  for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
+    const SlotDescriptor* slot_desc = aggregate_evaluators_[i]->output_slot_desc();
+    DCHECK(!slot_desc->type().IsCollectionType()) << "producing collections NYI";
+    if (!slot_desc->type().IsVarLenStringType()) continue;
+    if (IsInSubplan()) {
+      // Copy string data to the row batch's pool. This is more efficient than
+      // MarkNeedToReturn() in a subplan since we are likely producing many small batches.
+      RETURN_IF_ERROR(CopyStringData(slot_desc, row_batch, first_row_idx,
+              row_batch->tuple_data_pool()));
+    } else {
+      row_batch->MarkNeedToReturn();
+      break;
+    }
+  }
+  return Status::OK();
+}
+
+Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor* slot_desc,
+    RowBatch* row_batch, int first_row_idx, MemPool* pool) {
+  DCHECK(slot_desc->type().IsVarLenStringType());
+  DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1);
+  FOREACH_ROW(row_batch, first_row_idx, batch_iter) {
+    Tuple* tuple = batch_iter.Get()->GetTuple(0);
+    StringValue* sv = reinterpret_cast<StringValue*>(
+        tuple->GetSlot(slot_desc->tuple_offset()));
+    if (sv == NULL || sv->len == 0) continue;
+    char* new_ptr = reinterpret_cast<char*>(pool->TryAllocate(sv->len));
+    if (new_ptr == NULL) {
+      Status s = Status::MemLimitExceeded();
+      s.AddDetail(Substitute("Cannot perform aggregation at node with id $0."
+              " Failed to allocate $1 output bytes.", id_, sv->len));
+      state_->SetMemLimitExceeded();
+      return s;
+    }
+    memcpy(new_ptr, sv->ptr, sv->len);
+    sv->ptr = new_ptr;
+  }
+  return Status::OK();
+}
+
+Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state,
+    RowBatch* row_batch, bool* eos) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7767d300/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h
index 0b94511..ab560c5 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -390,6 +390,20 @@ class PartitionedAggregationNode : public ExecNode {
   /// a temporary buffer.
   boost::scoped_ptr<BufferedTupleStream> serialize_stream_;
 
+  /// Materializes 'row_batch' in either grouping or non-grouping case.
+  Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos);
+
+  /// Helper function called by GetNextInternal() to ensure that string data referenced in
+  /// 'row_batch' will live as long as 'row_batch's tuples. 'first_row_idx' indexes the
+  /// first row that should be processed in 'row_batch'.
+  Status HandleOutputStrings(RowBatch* row_batch, int first_row_idx);
+
+  /// Copies string data from the specified slot into 'pool', and sets the StringValues'
+  /// ptrs to the copied data. Copies data from all tuples in 'row_batch' from
+  /// 'first_row_idx' onwards. 'slot_desc' must have a var-len string type.
+  Status CopyStringData(const SlotDescriptor* slot_desc, RowBatch* row_batch,
+      int first_row_idx, MemPool* pool);
+
   /// Constructs singleton output tuple, allocating memory from pool.
   Tuple* ConstructSingletonOutputTuple(
       const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, MemPool* pool);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7767d300/be/src/exprs/agg-fn-evaluator.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h
index 98fb4a1..e9598ea 100644
--- a/be/src/exprs/agg-fn-evaluator.h
+++ b/be/src/exprs/agg-fn-evaluator.h
@@ -118,6 +118,7 @@ class AggFnEvaluator {
   const std::string& fn_name() const { return fn_.name.function_name; }
   const std::string& update_symbol() const { return fn_.aggregate_fn.update_fn_symbol; }
   const std::string& merge_symbol() const { return fn_.aggregate_fn.merge_fn_symbol; }
+  const SlotDescriptor* output_slot_desc() const { return output_slot_desc_; }
 
   static std::string DebugString(const std::vector<AggFnEvaluator*>& exprs);
   std::string DebugString() const;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7767d300/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test b/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test
index 2e38d1d..35e27c5 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test
@@ -450,3 +450,19 @@ inner join t2.int_array a
 ---- TYPES
 bigint
 ====
+---- QUERY
+# IMPALA-3311: test string data coming out of an agg in a subplan
+select id, m from complextypestbl t,
+(select min(cast(item as string)) m from t.int_array) v
+---- RESULTS
+1,'1'
+2,'1'
+3,'NULL'
+4,'NULL'
+5,'NULL'
+6,'NULL'
+7,'NULL'
+8,'-1'
+---- TYPES
+BIGINT,STRING
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7767d300/testdata/workloads/perf-regression/queries/subplan_aggregation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/perf-regression/queries/subplan_aggregation.test b/testdata/workloads/perf-regression/queries/subplan_aggregation.test
new file mode 100644
index 0000000..b9ea894
--- /dev/null
+++ b/testdata/workloads/perf-regression/queries/subplan_aggregation.test
@@ -0,0 +1,11 @@
+====
+---- QUERY: subplan_aggregation
+-- Description: Agg in subplan produces string output that's fed to non-trivial parent
+-- plan
+-- Target test case: Regression test for IMPALA-3311
+select c_custkey, max(m) from customer c,
+(select max(o_orderstatus) m from c.c_orders) v
+group by c_custkey order by 1 limit 1
+---- RESULTS
+---- TYPES
+====