You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/11/14 20:23:01 UTC

[1/9] impala git commit: IMPALA-7848: Enable ParserTest.TestAdminFns

Repository: impala
Updated Branches:
  refs/heads/master cd26e807f -> 60095a4c6


IMPALA-7848: Enable ParserTest.TestAdminFns

This patch enables ParserTest.TestAdminFns.

Testing:
- Ran all FE tests

Change-Id: If4ade49cf7c40de77c39b6996cf9b9797b5d32b3
Reviewed-on: http://gerrit.cloudera.org:8080/11925
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3f647577
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3f647577
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3f647577

Branch: refs/heads/master
Commit: 3f647577b471c3d4d954890a12b5ff829516e08c
Parents: cd26e80
Author: Fredy Wijaya <fw...@cloudera.com>
Authored: Tue Nov 13 13:13:01 2018 -0800
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Nov 14 02:59:58 2018 +0000

----------------------------------------------------------------------
 fe/src/test/java/org/apache/impala/analysis/ParserTest.java | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3f647577/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index f7d6bda..7231faf 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -3841,6 +3841,7 @@ public class ParserTest extends FrontendTestBase {
     }
   }
 
+  @Test
   public void TestAdminFns() {
     // Any combination of whitespace is ok.
     ParsesOk(":foobar()");


[3/9] impala git commit: IMPALA-941: Fix support for an identifier that starts with a number

Posted by ta...@apache.org.
IMPALA-941: Fix support for an identifier that starts with a number

Impala has supported an identifier that starts with a number for quite
some time. However, its implementation was broken, especially when it
comes to using it in a fully-qualified name because IDENTIFIER DOT
IDENTIFIER was lexed to DECIMAL_LITERAL. This patch fixes the scanner
to fix support for an idenfifier that starts with a number.
For example:

Before the fix:
> select * from 123_bar; --> this is OK
> select * from foo. 123_bar; --> this is OK
> select * from foo .123_bar; --> this is not OK
> select * from foo.123_bar; --> this is not OK

After the fix:
> select * from 123_bar; --> this is OK
> select * from foo. 123_bar; --> this is OK
> select * from foo .123_bar; --> this is OK
> select * from foo.123_bar; --> this is OK

Testing:
- Added a new parser test
- Ran all FE tests

Change-Id: I8a715be73553247ce80a1ba841712191d64f9730
Reviewed-on: http://gerrit.cloudera.org:8080/11927
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c4c1eba6
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c4c1eba6
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c4c1eba6

Branch: refs/heads/master
Commit: c4c1eba6b74de1e3902005e56141839833592954
Parents: c66d44e
Author: Fredy Wijaya <fw...@cloudera.com>
Authored: Tue Nov 13 14:28:43 2018 -0800
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Nov 14 10:17:13 2018 +0000

----------------------------------------------------------------------
 fe/src/main/jflex/sql-scanner.flex              | 11 ++++++-
 .../org/apache/impala/analysis/ParserTest.java  | 30 ++++++++++++++++++++
 2 files changed, 40 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c4c1eba6/fe/src/main/jflex/sql-scanner.flex
----------------------------------------------------------------------
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 5536b16..3d3fea6 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -410,7 +410,10 @@ FLit3 = [0-9]+
 Exponent = [eE] [+-]? [0-9]+
 DecimalLiteral = ({FLit1}|{FLit2}|{FLit3}) {Exponent}?
 
-IdentifierOrKw = [:digit:]*[:jletter:][:jletterdigit:]* | "&&" | "||"
+Identifier = [:digit:]*[:jletter:][:jletterdigit:]*
+// Without \. {Identifier}, a dot followed by an identifier starting with digits will
+// always be lexed to Flit2.
+IdentifierOrKw =  {Identifier} | \. {Identifier} | "&&" | "||"
 QuotedIdentifier = \`(\\.|[^\\\`])*\`
 SingleQuoteStringLiteral = \'(\\.|[^\\\'])*\'
 DoubleQuoteStringLiteral = \"(\\.|[^\\\"])*\"
@@ -502,6 +505,12 @@ EndOfLineComment = "--" !({HintContent}|{ContainsLineTerminator}) {LineTerminato
 
 {IdentifierOrKw} {
   String text = yytext();
+  if (text.startsWith(".")) {
+    // If we see an identifier that starts with a dot, we push back the identifier
+    // minus the dot back into the input stream.
+    yypushback(text.length() - 1);
+    return newToken(SqlParserSymbols.DOT, yytext());
+  }
   Integer kw_id = keywordMap.get(text.toLowerCase());
   if (kw_id != null) {
     return newToken(kw_id, text);

http://git-wip-us.apache.org/repos/asf/impala/blob/c4c1eba6/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index 7231faf..d7c3de9 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -3876,4 +3876,34 @@ public class ParserTest extends FrontendTestBase {
     ParserError(": shutdown() :other()");
     ParserError(": shutdown('hostA'); :shutdown('hostB');");
   }
+
+  @Test
+  public void TestIdentifier() {
+    ParsesOk("select * from foo.bar_123");
+    ParsesOk("select * from foo. bar_123");
+    ParsesOk("select * from foo .bar_123");
+    ParsesOk("select * from foo . bar_123");
+
+    ParsesOk("select * from foo.123_bar");
+    ParsesOk("select * from foo. 123_bar");
+    ParsesOk("select * from foo .123_bar");
+    ParsesOk("select * from foo . 123_bar");
+
+    ParsesOk("select * from 123_foo.bar");
+    ParsesOk("select * from 123_foo. bar");
+    ParsesOk("select * from 123_foo .bar");
+    ParsesOk("select * from 123_foo . bar");
+
+    ParsesOk("select * from 123_foo.123_bar");
+    ParsesOk("select * from 123_foo. 123_bar");
+    ParsesOk("select * from 123_foo .123_bar");
+    ParsesOk("select * from 123_foo . 123_bar");
+
+    // We do not allow identifiers that are ambiguous with exponential.
+    ParserError("select * from foo.4e1");
+    ParserError("select * from 4e1.bar");
+
+    ParserError("select * from .123_bar");
+    ParserError("select * from . 123_bar");
+  }
 }


[4/9] impala git commit: IMPALA-7805: Emit zero as "0" in toSql()

Posted by ta...@apache.org.
IMPALA-7805: Emit zero as "0" in toSql()

It turns out that Impala has a somewhat Baroque way to represent the
value of a numeric 0.  NumericLiteral.toSql() uses the Java
BigDecimal class to convert a numeric value to a string for use in
explained plans and in verifying expression rewrites.

The default Java behavior is to consider scale when rendering numbers,
including 0. Thus, depending on precision and scale, you may get:

0
0.0
0.00
0.000
...
0E-38

However, mathematically, zero is zero. Plans attach no special meaning
to the extra decimal points or trailing zeros.

To make testing easier, changed the behavior to always emit "0" when the
value is zero, regardless of precision or scale.

Testing: Reran the planner tests and modified captured plans that had
the 0.0, 0.00 variations of zero.

Since this change affects only EXPLAIN output, it cannot affect the
operation of queries. If may impact other tests that compare EXPLAIN
output to a "golden" copy.

Change-Id: I0b2f2f34fe5e6003de407301310ccf433841b9f1
Reviewed-on: http://gerrit.cloudera.org:8080/11878
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b2dbc0f0
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b2dbc0f0
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b2dbc0f0

Branch: refs/heads/master
Commit: b2dbc0f0bc106c89c5722ae909e067c4dabff4d1
Parents: c4c1eba
Author: Paul Rogers <pr...@cloudera.com>
Authored: Mon Nov 5 11:30:56 2018 -0800
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Nov 14 10:30:30 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/analysis/NumericLiteral.java  |  9 ++++++++-
 .../queries/PlannerTest/kudu-selectivity.test       |  2 +-
 .../queries/PlannerTest/tpch-all.test               | 16 ++++++++--------
 .../queries/PlannerTest/tpch-kudu.test              |  4 ++--
 .../queries/PlannerTest/tpch-nested.test            | 14 +++++++-------
 .../queries/PlannerTest/tpch-views.test             |  4 ++--
 6 files changed, 28 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b2dbc0f0/fe/src/main/java/org/apache/impala/analysis/NumericLiteral.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/NumericLiteral.java b/fe/src/main/java/org/apache/impala/analysis/NumericLiteral.java
index f3fea9b..4444755 100644
--- a/fe/src/main/java/org/apache/impala/analysis/NumericLiteral.java
+++ b/fe/src/main/java/org/apache/impala/analysis/NumericLiteral.java
@@ -150,8 +150,15 @@ public class NumericLiteral extends LiteralExpr {
 
   @Override
   public String toSqlImpl() { return getStringValue(); }
+
   @Override
-  public String getStringValue() { return value_.toString(); }
+  public String getStringValue() {
+    // BigDecimal returns CAST(0, DECIMAL(38, 38))
+    // as 0E-38. We want just 0.
+    return value_.compareTo(BigDecimal.ZERO) == 0
+        ? "0" : value_.toString();
+  }
+
   public double getDoubleValue() { return value_.doubleValue(); }
   public long getLongValue() { return value_.longValue(); }
   public long getIntValue() { return value_.intValue(); }

http://git-wip-us.apache.org/repos/asf/impala/blob/b2dbc0f0/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
index 6ae50f3..05b1c92 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
@@ -165,7 +165,7 @@ Per-Host Resources: mem-estimate=9.75MB mem-reservation=0B thread-reservation=2
   |
   00:SCAN KUDU [functional_kudu.alltypes]
      predicates: id IN (int_col), bigint_col IN (9999999999999999999), double_col IN (CAST('inf' AS DOUBLE)), float_col IN (CAST('NaN' AS FLOAT)), int_col IN (9999999999), smallint_col IN (99999, 2), tinyint_col IN (1, 999), bool_col IN (1), string_col NOT IN ('bar')
-     kudu predicates: double_col IN (0.0), float_col IN (0.0), bigint_col IN (1, 2), int_col IN (1, 2), smallint_col IN (0, 2), string_col IN ('foo', 'foo       '), tinyint_col IN (1, 2), bool_col IN (TRUE)
+     kudu predicates: double_col IN (0), float_col IN (0), bigint_col IN (1, 2), int_col IN (1, 2), smallint_col IN (0, 2), string_col IN ('foo', 'foo       '), tinyint_col IN (1, 2), bool_col IN (TRUE)
      mem-estimate=9.75MB mem-reservation=0B thread-reservation=1
      tuple-ids=0 row-size=97B cardinality=5
      in pipelines: 00(GETNEXT)

http://git-wip-us.apache.org/repos/asf/impala/blob/b2dbc0f0/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
index f6f7672..10efec9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
@@ -2677,7 +2677,7 @@ Per-Host Resource Estimates: Memory=346MB
 PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
-|  output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum(l_extendedprice * (1 - l_discount))
+|  output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0 END), sum(l_extendedprice * (1 - l_discount))
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: l_partkey = p_partkey
@@ -2696,12 +2696,12 @@ Per-Host Resource Estimates: Memory=364MB
 PLAN-ROOT SINK
 |
 07:AGGREGATE [FINALIZE]
-|  output: sum:merge(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum:merge(l_extendedprice * (1 - l_discount))
+|  output: sum:merge(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0 END), sum:merge(l_extendedprice * (1 - l_discount))
 |
 06:EXCHANGE [UNPARTITIONED]
 |
 03:AGGREGATE
-|  output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum(l_extendedprice * (1 - l_discount))
+|  output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0 END), sum(l_extendedprice * (1 - l_discount))
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: l_partkey = p_partkey
@@ -2724,12 +2724,12 @@ Per-Host Resource Estimates: Memory=299MB
 PLAN-ROOT SINK
 |
 07:AGGREGATE [FINALIZE]
-|  output: sum:merge(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum:merge(l_extendedprice * (1 - l_discount))
+|  output: sum:merge(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0 END), sum:merge(l_extendedprice * (1 - l_discount))
 |
 06:EXCHANGE [UNPARTITIONED]
 |
 03:AGGREGATE
-|  output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum(l_extendedprice * (1 - l_discount))
+|  output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0 END), sum(l_extendedprice * (1 - l_discount))
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: l_partkey = p_partkey
@@ -4197,7 +4197,7 @@ PLAN-ROOT SINK
 |  |  |
 |  |  01:SCAN HDFS [tpch.customer]
 |  |     partitions=1/1 files=1 size=23.08MB
-|  |     predicates: c_acctbal > 0.00, substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
+|  |     predicates: c_acctbal > 0, substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
 |  |
 |  00:SCAN HDFS [tpch.customer]
 |     partitions=1/1 files=1 size=23.08MB
@@ -4246,7 +4246,7 @@ PLAN-ROOT SINK
 |  |  |
 |  |  01:SCAN HDFS [tpch.customer]
 |  |     partitions=1/1 files=1 size=23.08MB
-|  |     predicates: c_acctbal > 0.00, substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
+|  |     predicates: c_acctbal > 0, substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
 |  |
 |  00:SCAN HDFS [tpch.customer]
 |     partitions=1/1 files=1 size=23.08MB
@@ -4305,7 +4305,7 @@ PLAN-ROOT SINK
 |  |  |
 |  |  01:SCAN HDFS [tpch.customer]
 |  |     partitions=1/1 files=1 size=23.08MB
-|  |     predicates: c_acctbal > 0.00, substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
+|  |     predicates: c_acctbal > 0, substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
 |  |
 |  00:SCAN HDFS [tpch.customer]
 |     partitions=1/1 files=1 size=23.08MB

http://git-wip-us.apache.org/repos/asf/impala/blob/b2dbc0f0/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
index 2a5cbd7..373fa87 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
@@ -895,7 +895,7 @@ Per-Host Resource Estimates: Memory=33MB
 PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
-|  output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum(l_extendedprice * (1 - l_discount))
+|  output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0 END), sum(l_extendedprice * (1 - l_discount))
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: l_partkey = p_partkey
@@ -1447,7 +1447,7 @@ PLAN-ROOT SINK
 |  |  |
 |  |  01:SCAN KUDU [tpch_kudu.customer]
 |  |     predicates: substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
-|  |     kudu predicates: c_acctbal > 0.00
+|  |     kudu predicates: c_acctbal > 0
 |  |
 |  00:SCAN KUDU [tpch_kudu.customer]
 |     predicates: substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')

http://git-wip-us.apache.org/repos/asf/impala/blob/b2dbc0f0/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
index dfcdc4f..12bb1c9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
@@ -1683,7 +1683,7 @@ Per-Host Resource Estimates: Memory=402MB
 PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
-|  output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum(l_extendedprice * (1 - l_discount))
+|  output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0 END), sum(l_extendedprice * (1 - l_discount))
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: l_partkey = p_partkey
@@ -1702,12 +1702,12 @@ Per-Host Resource Estimates: Memory=422MB
 PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
-|  output: sum:merge(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum:merge(l_extendedprice * (1 - l_discount))
+|  output: sum:merge(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0 END), sum:merge(l_extendedprice * (1 - l_discount))
 |
 05:EXCHANGE [UNPARTITIONED]
 |
 03:AGGREGATE
-|  output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum(l_extendedprice * (1 - l_discount))
+|  output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0 END), sum(l_extendedprice * (1 - l_discount))
 |
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: l_partkey = p_partkey
@@ -2636,8 +2636,8 @@ PLAN-ROOT SINK
 |  |  output: avg(c_acctbal)
 |  |
 |  01:SCAN HDFS [tpch_nested_parquet.customer c]
-|     partitions=1/1 files=4 size=288.99MB
-|     predicates: c_acctbal > 0.00, substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
+|     partitions=1/1 files=4 size=289.00MB
+|     predicates: c_acctbal > 0, substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
    partitions=1/1 files=4 size=288.99MB
@@ -2685,8 +2685,8 @@ PLAN-ROOT SINK
 |  |  output: avg(c_acctbal)
 |  |
 |  01:SCAN HDFS [tpch_nested_parquet.customer c]
-|     partitions=1/1 files=4 size=288.99MB
-|     predicates: c_acctbal > 0.00, substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
+|     partitions=1/1 files=4 size=289.00MB
+|     predicates: c_acctbal > 0, substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
    partitions=1/1 files=4 size=288.99MB

http://git-wip-us.apache.org/repos/asf/impala/blob/b2dbc0f0/testdata/workloads/functional-planner/queries/PlannerTest/tpch-views.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-views.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-views.test
index 52af979..7bc22c3 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-views.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-views.test
@@ -935,7 +935,7 @@ where
 PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
-|  output: sum(CASE WHEN tpch.part.p_type LIKE 'PROMO%' THEN tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount) ELSE 0.0 END), sum(tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount))
+|  output: sum(CASE WHEN tpch.part.p_type LIKE 'PROMO%' THEN tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount) ELSE 0 END), sum(tpch.lineitem.l_extendedprice * (1 - tpch.lineitem.l_discount))
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: tpch.lineitem.l_partkey = tpch.part.p_partkey
@@ -1507,7 +1507,7 @@ PLAN-ROOT SINK
 |  |  |
 |  |  01:SCAN HDFS [tpch.customer]
 |  |     partitions=1/1 files=1 size=23.08MB
-|  |     predicates: tpch.customer.c_acctbal > 0.00, substr(tpch.customer.c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
+|  |     predicates: tpch.customer.c_acctbal > 0, substr(tpch.customer.c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
 |  |
 |  00:SCAN HDFS [tpch.customer]
 |     partitions=1/1 files=1 size=23.08MB


[7/9] impala git commit: IMPALA-7634: [DOCS] Document the new SHUTDOWN statement

Posted by ta...@apache.org.
IMPALA-7634: [DOCS] Document the new SHUTDOWN statement

Change-Id: I0cfe4bae1b7966980cdeececa6b959bbecb4a24a
Reviewed-on: http://gerrit.cloudera.org:8080/11872
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/71f8d0eb
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/71f8d0eb
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/71f8d0eb

Branch: refs/heads/master
Commit: 71f8d0eba33c5d5d0a0cd2d2b617ac871dbcbb8a
Parents: 731254b
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Fri Nov 2 17:17:40 2018 -0700
Committer: Alex Rodoni <ar...@cloudera.com>
Committed: Wed Nov 14 18:37:43 2018 +0000

----------------------------------------------------------------------
 docs/impala.ditamap             |   1 +
 docs/topics/impala_shutdown.xml | 222 +++++++++++++++++++++++++++++++++++
 2 files changed, 223 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/71f8d0eb/docs/impala.ditamap
----------------------------------------------------------------------
diff --git a/docs/impala.ditamap b/docs/impala.ditamap
index 8eecf06..e4c35a7 100644
--- a/docs/impala.ditamap
+++ b/docs/impala.ditamap
@@ -243,6 +243,7 @@ under the License.
         </topicref>
       </topicref>
       <topicref href="topics/impala_show.xml"/>
+      <topicref href="topics/impala_shutdown.xml"/>
       <topicref href="topics/impala_truncate_table.xml"/>
       <topicref href="topics/impala_update.xml"/>
       <topicref href="topics/impala_upsert.xml"/>

http://git-wip-us.apache.org/repos/asf/impala/blob/71f8d0eb/docs/topics/impala_shutdown.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_shutdown.xml b/docs/topics/impala_shutdown.xml
new file mode 100644
index 0000000..1677fac
--- /dev/null
+++ b/docs/topics/impala_shutdown.xml
@@ -0,0 +1,222 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<!DOCTYPE concept PUBLIC "-//OASIS//DTD DITA Concept//EN" "concept.dtd">
+<concept id="shutdown">
+
+  <title>SHUTDOWN Statement</title>
+
+  <titlealts audience="PDF">
+
+    <navtitle>SHUTDOWN</navtitle>
+
+  </titlealts>
+
+  <prolog>
+    <metadata>
+      <data name="Category" value="Impala"/>
+      <data name="Category" value="SQL"/>
+      <data name="Category" value="Developers"/>
+    </metadata>
+  </prolog>
+
+  <conbody>
+
+    <p>
+      The <codeph>SHUTDOWN</codeph> statement performs a graceful shutdown of Impala Daemon. The
+      Impala daemon will notify other Impala daemons that it is shutting down, wait for a grace
+      period, then shut itself down once no more queries or fragments are executing on that
+      daemon. The <codeph>--shutdown_grace_period_s</codeph> flag determines the duration of the
+      grace period in seconds.
+    </p>
+
+    <p>
+      <b>Syntax:</b>
+    </p>
+
+<codeblock>:SHUTDOWN()
+:SHUTDOWN([<varname>host_name</varname>[:<varname>port_number</varname>] )
+:SHUTDOWN(<varname>deadline</varname>)
+:SHUTDOWN([<varname>host_name</varname>[:<varname>port_number</varname>], <varname>deadline</varname>)</codeblock>
+
+    <p>
+      <b>Usage notes:</b>
+    </p>
+
+    <p>
+      All arguments are optional for <codeph>SHUTDOWN</codeph>.
+    </p>
+
+    <simpletable frame="all" id="simpletable_sly_wrf_rfb">
+
+      <sthead>
+
+        <stentry>Argument</stentry>
+
+        <stentry>Type</stentry>
+
+        <stentry>Default</stentry>
+
+        <stentry>Description</stentry>
+
+      </sthead>
+
+      <strow>
+
+        <stentry><codeph><varname>host_name</varname></codeph>
+
+        </stentry>
+
+        <stentry><codeph>STRING</codeph>
+
+        </stentry>
+
+        <stentry>The current <codeph>impalad</codeph> host</stentry>
+
+        <stentry>
+
+          <p>
+            Address of the backend to be shut down.
+          </p>
+
+        </stentry>
+
+      </strow>
+
+      <strow>
+
+        <stentry><codeph><varname>port_number</varname></codeph>
+
+        </stentry>
+
+        <stentry><codeph>INT</codeph>
+
+        </stentry>
+
+        <stentry><codeph>0</codeph> which is treated the same port as current
+            <codeph>impalad</codeph>
+
+        </stentry>
+
+        <stentry>n/a</stentry>
+
+      </strow>
+
+      <strow>
+
+        <stentry><codeph><varname>deadline</varname></codeph>
+
+        </stentry>
+
+        <stentry><codeph>INT</codeph>
+
+        </stentry>
+
+        <stentry><codeph>The value of the <codeph>--shutdown_deadline_s</codeph>
+            flag, which defaults to 1 hour.</codeph>
+
+        </stentry>
+
+        <stentry>
+
+          <p>
+            <codeph><varname>deadline</varname></codeph> must be a non-negative number,
+            specified in seconds.
+          </p>
+
+          <p>
+            The value, 0, for <varname>deadline</varname> specifies an immediate shutdown.
+          </p>
+
+        </stentry>
+
+      </strow>
+
+    </simpletable>
+
+    <p>
+      Take the following points into consideration when running the <codeph>SHUTDOWN</codeph>
+      statement:
+    </p>
+
+    <ul>
+      <li>
+        A client can shut down the
+        <xref
+          href="impala_components.xml#intro_impalad">coordinator</xref>
+        <codeph>impalad</codeph> that it is connected to via <codeph>:SHUTDOWN()</codeph>.
+      </li>
+
+      <li>
+        A client can remotely shut down another non-coordinator <codeph>impalad</codeph> via
+        <codeph>:SHUTDOWN('<varname>hostname</varname>')</codeph>.
+      </li>
+
+      <li>
+        The shutdown time limit can be overridden to force a quicker or slower shutdown by
+        specifying a deadline. The default deadline is determined by the
+        <codeph>--shutdown_deadline_s</codeph> flag, which defaults to 1 hour.
+      </li>
+
+      <li>
+        <xref href="impala_components.xml#intro_impalad">Executors</xref> can be shut down
+        without disrupting running queries. Short-running queries will finish, and long-running
+        queries will continue until a threshold time limit is reached.
+      </li>
+
+      <li>
+        If queries are submitted to a coordinator after shutdown of that coordinator has
+        started, they will fail.
+      </li>
+
+      <li>
+        Long running queries or other issues, such as stuck fragments, will slow down but not
+        prevent eventual shutdown.
+      </li>
+    </ul>
+
+    <p>
+      <b>Security considerations:</b>
+    </p>
+
+    <p>
+      The <codeph>ALL</codeph> privilege is required on the server.
+    </p>
+
+    <p conref="../shared/impala_common.xml#common/cancel_blurb_no"/>
+
+    <p>
+      <b>Examples:</b>
+    </p>
+
+<codeblock>:SHUTDOWN(); -- Shut down the current impalad  with the default deadline.
+:SHUTDOWN('hostname'); --  Shut down impalad running on hostname  with the default deadline.
+:SHUTDOWN(\"hostname:1234\"); -- Shut down impalad running on host at port 1234  with the default deadline.
+:SHUTDOWN(10); - Shut down the current impalad after 10 seconds.
+:SHUTDOWN('hostname', 10); - Shut down impalad running on hostname when all queries running on hostname finish, or after 10 seconds.
+:SHUTDOWN('hostname:11', 10 * 60); -- Shut down impalad running on hostname at port 11 when all queries running on hostname finish, or after 600 seconds.
+:SHUTDOWN(0); -- Perform an immdediate shutdown of the current impalad.</codeblock>
+
+    <p>
+      <b>Added in:</b> <keyword keyref="impala31"/>
+    </p>
+
+  </conbody>
+
+</concept>


[2/9] impala git commit: IMPALA-7764: Improve SentryProxy test coverage

Posted by ta...@apache.org.
IMPALA-7764: Improve SentryProxy test coverage

This patch refactors the SentryProxy to be more testable as well as
adding tests for SentryProxy to improve the test coverage.

Testing:
- Added a new FE test for SentryProxy
- Ran all FE tests
- Ran all E2E authorization tests

Change-Id: I1190638283f051cc37fda7ea5dd0c5e8a70d40db
Reviewed-on: http://gerrit.cloudera.org:8080/11880
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c66d44ea
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c66d44ea
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c66d44ea

Branch: refs/heads/master
Commit: c66d44ea7539e821c3b9e2f4a815ecfcb94218c9
Parents: 3f64757
Author: Fredy Wijaya <fw...@cloudera.com>
Authored: Mon Nov 5 13:22:57 2018 -0800
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Nov 14 07:40:53 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/util/SentryProxy.java     | 324 +++++-----
 .../org/apache/impala/util/SentryProxyTest.java | 612 +++++++++++++++++++
 2 files changed, 784 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c66d44ea/fe/src/main/java/org/apache/impala/util/SentryProxy.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/SentryProxy.java b/fe/src/main/java/org/apache/impala/util/SentryProxy.java
index 2470bc3..0087652 100644
--- a/fe/src/main/java/org/apache/impala/util/SentryProxy.java
+++ b/fe/src/main/java/org/apache/impala/util/SentryProxy.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.impala.catalog.AuthorizationException;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
@@ -142,173 +143,192 @@ public class SentryProxy {
 
     public void run() {
       synchronized (SentryProxy.this) {
-        Set<String> rolesToRemove;
-        Set<String> usersToRemove;
-        long startTime = System.currentTimeMillis();
-        try {
-          rolesToRemove = refreshRolePrivileges();
-          usersToRemove = refreshUserPrivileges();
-        } catch (Exception e) {
-          LOG.error("Error refreshing Sentry policy: ", e);
-          if (swallowException_) return;
-          // We need to differentiate between Sentry not available exception and
-          // any other exceptions.
-          if (e.getCause() != null && e.getCause() instanceof SentryUserException) {
-            Throwable sentryException = e.getCause();
-            if (sentryException.getCause() != null &&
-                sentryException.getCause() instanceof TTransportException) {
-              throw new SentryUnavailableException(e);
-            }
-          }
-          throw new SentryPolicyReaderException(e);
-        } finally {
-          LOG.debug("Refreshing Sentry policy took " +
-              (System.currentTimeMillis() - startTime) + "ms");
-        }
-
-        // Remove all the roles, incrementing the catalog version to indicate
-        // a change.
-        for (String roleName: rolesToRemove) {
-          catalog_.removeRole(roleName);
-        }
-        // Remove all the users, incrementing the catalog version to indicate
-        // a change.
-        for (String userName: usersToRemove) {
-          catalog_.removeUser(userName);
-        }
+        refreshSentryAuthorization(catalog_, sentryPolicyService_, processUser_,
+            resetVersions_, swallowException_);
       }
     }
+  }
 
-    /**
-     * Updates all roles and their associated privileges in the catalog by adding,
-     * removing, and replacing the catalog objects to match those in Sentry since
-     * the last sentry sync update. This method returns a list of roles to be removed.
-     */
-    private Set<String> refreshRolePrivileges() throws ImpalaException {
-      // Assume all roles should be removed. Then query the Policy Service and remove
-      // roles from this set that actually exist.
-      Set<String> rolesToRemove = catalog_.getAuthPolicy().getAllRoleNames();
-      // The keys (role names) in listAllRolesPrivileges here are always in lower case.
-      Map<String, Set<TSentryPrivilege>> allRolesPrivileges =
-          sentryPolicyService_.listAllRolesPrivileges(processUser_);
-      // Read the full policy, adding new/modified roles to "updatedRoles".
-      for (TSentryRole sentryRole:
-          sentryPolicyService_.listAllRoles(processUser_)) {
-        // This role exists and should not be removed, delete it from the
-        // rolesToRemove set.
-        rolesToRemove.remove(sentryRole.getRoleName().toLowerCase());
-
-        Set<String> grantGroups = Sets.newHashSet();
-        for (TSentryGroup group: sentryRole.getGroups()) {
-          grantGroups.add(group.getGroupName());
-        }
-        Role existingRole =
-            catalog_.getAuthPolicy().getRole(sentryRole.getRoleName());
-        Role role;
-        // These roles are the same, use the current role.
-        if (existingRole != null &&
-            existingRole.getGrantGroups().equals(grantGroups)) {
-          role = existingRole;
-          if (resetVersions_) {
-            role.setCatalogVersion(catalog_.incrementAndGetCatalogVersion());
-          }
-        } else {
-          role = catalog_.addRole(sentryRole.getRoleName(), grantGroups);
+  /**
+   * Refreshes Sentry authorization and updates the catalog.
+   */
+  @VisibleForTesting
+  static void refreshSentryAuthorization(CatalogServiceCatalog catalog,
+      SentryPolicyService sentryPolicyService, User processUser, boolean resetVersions,
+      boolean swallowException) {
+    Set<String> rolesToRemove;
+    Set<String> usersToRemove;
+    long startTime = System.currentTimeMillis();
+    try {
+      rolesToRemove = refreshRolePrivileges(catalog, sentryPolicyService, processUser,
+          resetVersions);
+      usersToRemove = refreshUserPrivileges(catalog, sentryPolicyService, processUser,
+          resetVersions);
+    } catch (Exception e) {
+      LOG.error("Error refreshing Sentry policy: ", e);
+      if (swallowException) return;
+      // We need to differentiate between Sentry not available exception and
+      // any other exceptions.
+      if (e.getCause() != null && e.getCause() instanceof SentryUserException) {
+        Throwable sentryException = e.getCause();
+        if (sentryException.getCause() != null &&
+            sentryException.getCause() instanceof TTransportException) {
+          throw new SentryUnavailableException(e);
         }
-        // allRolesPrivileges keys and sentryRole.getName() are used here since they both
-        // come from Sentry so they agree in case.
-        refreshPrivilegesInCatalog(sentryRole.getRoleName(), role, allRolesPrivileges);
       }
-      return rolesToRemove;
+      throw new SentryPolicyReaderException(e);
+    } finally {
+      LOG.debug("Refreshing Sentry policy took " +
+          (System.currentTimeMillis() - startTime) + "ms");
+    }
+
+    // Remove all the roles, incrementing the catalog version to indicate
+    // a change.
+    for (String roleName: rolesToRemove) {
+      catalog.removeRole(roleName);
     }
+    // Remove all the users, incrementing the catalog version to indicate
+    // a change.
+    for (String userName: usersToRemove) {
+      catalog.removeUser(userName);
+    }
+  }
 
-    /**
-     * Updates all users and their associated privileges in the catalog by adding,
-     * removing, and replacing the catalog objects to match those in Sentry since the
-     * last Sentry sync update. Take note that we only store the users with privileges
-     * stored in Sentry and not all available users in the system. This method returns a
-     * list of users to be removed. User privileges do not support grant groups.
-     */
-    private Set<String> refreshUserPrivileges() throws ImpalaException {
-      // Assume all users should be removed. Then query the Policy Service and remove
-      // users from this set that actually exist.
-      Set<String> usersToRemove = catalog_.getAuthPolicy().getAllUserNames();
-      // The keys (user names) in listAllUsersPrivileges here are always in lower case.
-      Map<String, Set<TSentryPrivilege>> allUsersPrivileges =
-          sentryPolicyService_.listAllUsersPrivileges(processUser_);
-      for (Map.Entry<String, Set<TSentryPrivilege>> userPrivilegesEntry:
-          allUsersPrivileges.entrySet()) {
-        String userName = userPrivilegesEntry.getKey();
-        // This user exists and should not be removed so remove it from the
-        // usersToRemove set.
-        usersToRemove.remove(userName);
-
-        Reference<Boolean> existingUser = new Reference<>();
-        org.apache.impala.catalog.User user = catalog_.addUserIfNotExists(userName,
-            existingUser);
-        if (existingUser.getRef() && resetVersions_) {
-          user.setCatalogVersion(catalog_.incrementAndGetCatalogVersion());
+  /**
+   * Updates all roles and their associated privileges in the catalog by adding,
+   * removing, and replacing the catalog objects to match those in Sentry since
+   * the last sentry sync update. This method returns a list of roles to be removed.
+   */
+  private static Set<String> refreshRolePrivileges(CatalogServiceCatalog catalog,
+      SentryPolicyService sentryPolicyService, User processUser, boolean resetVersions)
+      throws ImpalaException {
+    // Assume all roles should be removed. Then query the Policy Service and remove
+    // roles from this set that actually exist.
+    Set<String> rolesToRemove = catalog.getAuthPolicy().getAllRoleNames();
+    // The keys (role names) in listAllRolesPrivileges here are always in lower case.
+    Map<String, Set<TSentryPrivilege>> allRolesPrivileges =
+        sentryPolicyService.listAllRolesPrivileges(processUser);
+    // Read the full policy, adding new/modified roles to "updatedRoles".
+    for (TSentryRole sentryRole:
+        sentryPolicyService.listAllRoles(processUser)) {
+      // This role exists and should not be removed, delete it from the
+      // rolesToRemove set.
+      rolesToRemove.remove(sentryRole.getRoleName().toLowerCase());
+
+      Set<String> grantGroups = Sets.newHashSet();
+      for (TSentryGroup group: sentryRole.getGroups()) {
+        grantGroups.add(group.getGroupName());
+      }
+      Role existingRole =
+          catalog.getAuthPolicy().getRole(sentryRole.getRoleName());
+      Role role;
+      // These roles are the same, use the current role.
+      if (existingRole != null &&
+          existingRole.getGrantGroups().equals(grantGroups)) {
+        role = existingRole;
+        if (resetVersions) {
+          role.setCatalogVersion(catalog.incrementAndGetCatalogVersion());
         }
-        // allUsersPrivileges keys and userPrivilegesEntry.getKey() are used here since
-        // they both come from Sentry so they agree in case.
-        refreshPrivilegesInCatalog(userPrivilegesEntry.getKey(), user,
-            allUsersPrivileges);
+      } else {
+        role = catalog.addRole(sentryRole.getRoleName(), grantGroups);
       }
-      return usersToRemove;
+      // allRolesPrivileges keys and sentryRole.getName() are used here since they both
+      // come from Sentry so they agree in case.
+      refreshPrivilegesInCatalog(catalog, resetVersions, sentryRole.getRoleName(), role,
+          allRolesPrivileges);
     }
+    return rolesToRemove;
+  }
 
-    /**
-     * Updates the privileges for a given principal in the catalog since the last Sentry
-     * sync update. The sentryPrincipalName is used to match against the key in
-     * allPrincipalPrivileges, which both come from Sentry, so they should have the
-     * same case sensitivity.
-     */
-    private void refreshPrivilegesInCatalog(String sentryPrincipalName,
-        Principal principal, Map<String, Set<TSentryPrivilege>> allPrincipalPrivileges)
-        throws CatalogException {
-      // Assume all privileges should be removed. Privileges that still exist are
-      // deleted from this set and we are left with the set of privileges that need
-      // to be removed.
-      Set<String> privilegesToRemove = principal.getPrivilegeNames();
-      // It is important to get a set of privileges using sentryPrincipalName
-      // and not principal.getName() because principal.getName() may return a
-      // principal name with a different case than the principal names stored
-      // in allPrincipalPrivileges. See IMPALA-7729 for more information.
-      Set<TSentryPrivilege> sentryPrivileges = allPrincipalPrivileges.get(
-          sentryPrincipalName);
-      if (sentryPrivileges == null) {
-        sentryPrivileges = Sets.newHashSet();
+  /**
+   * Updates all users and their associated privileges in the catalog by adding,
+   * removing, and replacing the catalog objects to match those in Sentry since the
+   * last Sentry sync update. Take note that we only store the users with privileges
+   * stored in Sentry and not all available users in the system. This method returns a
+   * list of users to be removed. User privileges do not support grant groups.
+   */
+  private static Set<String> refreshUserPrivileges(CatalogServiceCatalog catalog,
+      SentryPolicyService sentryPolicyService, User processUser, boolean resetVersions)
+      throws ImpalaException {
+    // Assume all users should be removed. Then query the Policy Service and remove
+    // users from this set that actually exist.
+    Set<String> usersToRemove = catalog.getAuthPolicy().getAllUserNames();
+    // The keys (user names) in listAllUsersPrivileges here are always in lower case.
+    Map<String, Set<TSentryPrivilege>> allUsersPrivileges =
+        sentryPolicyService.listAllUsersPrivileges(processUser);
+    for (Map.Entry<String, Set<TSentryPrivilege>> userPrivilegesEntry:
+        allUsersPrivileges.entrySet()) {
+      String userName = userPrivilegesEntry.getKey();
+      // This user exists and should not be removed so remove it from the
+      // usersToRemove set.
+      usersToRemove.remove(userName);
+
+      Reference<Boolean> existingUser = new Reference<>();
+      org.apache.impala.catalog.User user = catalog.addUserIfNotExists(userName,
+          existingUser);
+      if (existingUser.getRef() && resetVersions) {
+        user.setCatalogVersion(catalog.incrementAndGetCatalogVersion());
       }
-      // Check all the privileges that are part of this principal.
-      for (TSentryPrivilege sentryPriv: sentryPrivileges) {
-        TPrivilege thriftPriv =
-            SentryPolicyService.sentryPrivilegeToTPrivilege(sentryPriv, principal);
-        String privilegeName = PrincipalPrivilege.buildPrivilegeName(thriftPriv);
-        privilegesToRemove.remove(privilegeName.toLowerCase());
-        PrincipalPrivilege existingPrincipalPriv = principal.getPrivilege(privilegeName);
-        // We already know about this privilege (privileges cannot be modified).
-        if (existingPrincipalPriv != null &&
-            existingPrincipalPriv.getCreateTimeMs() == sentryPriv.getCreateTime()) {
-          if (resetVersions_) {
-            existingPrincipalPriv.setCatalogVersion(
-                catalog_.incrementAndGetCatalogVersion());
-          }
-          continue;
-        }
-        if (principal.getPrincipalType() == TPrincipalType.ROLE) {
-          catalog_.addRolePrivilege(principal.getName(), thriftPriv);
-        } else {
-          catalog_.addUserPrivilege(principal.getName(), thriftPriv);
+      // allUsersPrivileges keys and userPrivilegesEntry.getKey() are used here since
+      // they both come from Sentry so they agree in case.
+      refreshPrivilegesInCatalog(catalog, resetVersions, userPrivilegesEntry.getKey(),
+          user, allUsersPrivileges);
+    }
+    return usersToRemove;
+  }
+
+  /**
+   * Updates the privileges for a given principal in the catalog since the last Sentry
+   * sync update. The sentryPrincipalName is used to match against the key in
+   * allPrincipalPrivileges, which both come from Sentry, so they should have the
+   * same case sensitivity.
+   */
+  private static void refreshPrivilegesInCatalog(CatalogServiceCatalog catalog,
+      boolean resetVersions, String sentryPrincipalName, Principal principal,
+      Map<String, Set<TSentryPrivilege>> allPrincipalPrivileges)
+      throws CatalogException {
+    // Assume all privileges should be removed. Privileges that still exist are
+    // deleted from this set and we are left with the set of privileges that need
+    // to be removed.
+    Set<String> privilegesToRemove = principal.getPrivilegeNames();
+    // It is important to get a set of privileges using sentryPrincipalName
+    // and not principal.getName() because principal.getName() may return a
+    // principal name with a different case than the principal names stored
+    // in allPrincipalPrivileges. See IMPALA-7729 for more information.
+    Set<TSentryPrivilege> sentryPrivileges = allPrincipalPrivileges.get(
+        sentryPrincipalName);
+    if (sentryPrivileges == null) {
+      sentryPrivileges = Sets.newHashSet();
+    }
+    // Check all the privileges that are part of this principal.
+    for (TSentryPrivilege sentryPriv: sentryPrivileges) {
+      TPrivilege thriftPriv =
+          SentryPolicyService.sentryPrivilegeToTPrivilege(sentryPriv, principal);
+      String privilegeName = PrincipalPrivilege.buildPrivilegeName(thriftPriv);
+      privilegesToRemove.remove(privilegeName.toLowerCase());
+      PrincipalPrivilege existingPrincipalPriv = principal.getPrivilege(privilegeName);
+      // We already know about this privilege (privileges cannot be modified).
+      if (existingPrincipalPriv != null &&
+          existingPrincipalPriv.getCreateTimeMs() == sentryPriv.getCreateTime()) {
+        if (resetVersions) {
+          existingPrincipalPriv.setCatalogVersion(
+              catalog.incrementAndGetCatalogVersion());
         }
+        continue;
+      }
+      if (principal.getPrincipalType() == TPrincipalType.ROLE) {
+        catalog.addRolePrivilege(principal.getName(), thriftPriv);
+      } else {
+        catalog.addUserPrivilege(principal.getName(), thriftPriv);
       }
+    }
 
-      // Remove the privileges that no longer exist.
-      for (String privilegeName: privilegesToRemove) {
-        if (principal.getPrincipalType() == TPrincipalType.ROLE) {
-          catalog_.removeRolePrivilege(principal.getName(), privilegeName);
-        } else {
-          catalog_.removeUserPrivilege(principal.getName(), privilegeName);
-        }
+    // Remove the privileges that no longer exist.
+    for (String privilegeName: privilegesToRemove) {
+      if (principal.getPrincipalType() == TPrincipalType.ROLE) {
+        catalog.removeRolePrivilege(principal.getName(), privilegeName);
+      } else {
+        catalog.removeUserPrivilege(principal.getName(), privilegeName);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/c66d44ea/fe/src/test/java/org/apache/impala/util/SentryProxyTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/util/SentryProxyTest.java b/fe/src/test/java/org/apache/impala/util/SentryProxyTest.java
new file mode 100644
index 0000000..a95bc7f
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/util/SentryProxyTest.java
@@ -0,0 +1,612 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.impala.authorization.AuthorizationConfig;
+import org.apache.impala.authorization.SentryConfig;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.Role;
+import org.apache.impala.catalog.User;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.Pair;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.thrift.TPrincipalType;
+import org.apache.impala.thrift.TPrivilege;
+import org.apache.impala.thrift.TPrivilegeLevel;
+import org.apache.impala.thrift.TPrivilegeScope;
+import org.apache.sentry.api.service.thrift.TSentryPrivilege;
+import org.apache.sentry.api.service.thrift.TSentryRole;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SentryProxyTest {
+  private final String PRINCIPAL_NAME_PREFIX = "sentry_proxy";
+  private static final String SENTRY_SERVER = "server1";
+  private static final org.apache.impala.authorization.User USER =
+      new org.apache.impala.authorization.User(System.getProperty("user.name"));
+  private final SentryPolicyService sentryService_;
+  private final AuthorizationConfig authzConfig_;
+
+  public SentryProxyTest() {
+    authzConfig_ = AuthorizationConfig.createHadoopGroupAuthConfig(
+        SENTRY_SERVER, null, System.getenv("IMPALA_HOME") +
+            "/fe/src/test/resources/sentry-site.xml");
+    authzConfig_.validateConfig();
+    sentryService_ = new SentryPolicyService(authzConfig_.getSentryConfig());
+  }
+
+  @Before
+  public void setUp() throws ImpalaException {
+    cleanUpRoles();
+  }
+
+  @After
+  public void cleanUp() throws ImpalaException {
+    cleanUpRoles();
+  }
+
+  private void cleanUpRoles() throws ImpalaException {
+    for (TSentryRole role: sentryService_.listAllRoles(USER)) {
+      if (role.getRoleName().startsWith(PRINCIPAL_NAME_PREFIX)) {
+        sentryService_.dropRole(USER, role.getRoleName(), true);
+      }
+    }
+  }
+
+  @Test
+  public void testEmptyToNonEmptyCatalog() {
+    withAllPrincipalTypes(ctx -> {
+      String[] principalNames = new String[3];
+      for (int i = 0; i < principalNames.length; i++) {
+        principalNames[i] = String.format("%s_add_%d", PRINCIPAL_NAME_PREFIX, i);
+      }
+
+      for (String principalName: principalNames) {
+        addSentryPrincipalPrivileges(ctx.type_, ctx.sentryService_, principalName,
+            "functional");
+      }
+
+      CatalogState noReset = refreshSentryAuthorization(ctx.catalog_, ctx.sentryService_,
+          false);
+
+      for (String principalName: principalNames) {
+        checkCatalogPrincipalPrivileges(ctx.type_, ctx.catalog_, principalName,
+            "server=server1->db=functional->grantoption=false");
+      }
+
+      CatalogState reset = refreshSentryAuthorization(ctx.catalog_, ctx.sentryService_,
+          true);
+      checkCatalogState(noReset, reset);
+    });
+  }
+
+  @Test
+  public void testNonEmptyToEmptyCatalog() {
+    withAllPrincipalTypes(ctx -> {
+      String[] principalNames = new String[3];
+      for (int i = 0; i < principalNames.length; i++) {
+        principalNames[i] = String.format("%s_add_%d", PRINCIPAL_NAME_PREFIX, i);
+      }
+
+      for (String principalName: principalNames) {
+        addCatalogPrincipalPrivileges(ctx.type_, ctx.catalog_, principalName,
+            "functional");
+      }
+
+      CatalogState noReset = refreshSentryAuthorization(ctx.catalog_, ctx.sentryService_,
+          false);
+
+      for (String principalName: principalNames) {
+        checkNoCatalogPrincipalPrivileges(ctx.type_, ctx.catalog_, principalName);
+      }
+
+      CatalogState reset = refreshSentryAuthorization(ctx.catalog_, ctx.sentryService_,
+          true);
+      checkCatalogState(noReset, reset);
+    });
+  }
+
+  @Test
+  public void testAddCatalog() {
+    withAllPrincipalTypes(ctx -> {
+      String principalName = String.format("%s_add", PRINCIPAL_NAME_PREFIX);
+      addCatalogPrincipalPrivileges(ctx.type_, ctx.catalog_, principalName, "functional");
+      addSentryPrincipalPrivileges(ctx.type_, ctx.sentryService_, principalName,
+          "functional");
+
+      CatalogState noReset = refreshSentryAuthorization(ctx.catalog_, ctx.sentryService_,
+          false);
+
+      checkCatalogPrincipalPrivileges(ctx.type_, ctx.catalog_, principalName,
+          "server=server1->db=functional->grantoption=false");
+
+      CatalogState reset = refreshSentryAuthorization(ctx.catalog_, ctx.sentryService_,
+          true);
+      checkCatalogState(noReset, reset);
+    });
+  }
+
+  @Test
+  public void testUpdateCatalog() {
+    withAllPrincipalTypes(ctx -> {
+      String principalName = String.format("%s_update", PRINCIPAL_NAME_PREFIX);
+      addCatalogPrincipalPrivileges(ctx.type_, ctx.catalog_, principalName, "functional");
+      addSentryPrincipalPrivileges(ctx.type_, ctx.sentryService_, principalName,
+          "functional", "functional_kudu");
+
+      CatalogState noReset = refreshSentryAuthorization(ctx.catalog_, ctx.sentryService_,
+          false);
+
+      checkCatalogPrincipalPrivileges(ctx.type_, ctx.catalog_, principalName,
+          "server=server1->db=functional->grantoption=false",
+          "server=server1->db=functional_kudu->grantoption=false");
+
+      CatalogState reset = refreshSentryAuthorization(ctx.catalog_, ctx.sentryService_,
+          true);
+      checkCatalogState(noReset, reset);
+    });
+  }
+
+  @Test
+  public void testDropCatalog() {
+    withAllPrincipalTypes(ctx -> {
+      String principalName = String.format("%s_remove", PRINCIPAL_NAME_PREFIX);
+      addCatalogPrincipalPrivileges(ctx.type_, ctx.catalog_, principalName, "functional");
+      dropSentryPrincipalPrivileges(ctx.type_, ctx.sentryService_, principalName);
+
+      CatalogState noReset = refreshSentryAuthorization(ctx.catalog_, ctx.sentryService_,
+          false);
+
+      checkNoCatalogPrincipalPrivileges(ctx.type_, ctx.catalog_, principalName);
+
+      CatalogState reset = refreshSentryAuthorization(ctx.catalog_, ctx.sentryService_,
+          true);
+      checkCatalogState(noReset, reset);
+    });
+  }
+
+  @Test
+  public void testMergeCatalog() {
+    withAllPrincipalTypes(ctx -> {
+      String addPrincipal = String.format("%s_add", PRINCIPAL_NAME_PREFIX);
+      String updatePrincipal = String.format("%s_update", PRINCIPAL_NAME_PREFIX);
+      String removePrincipal = String.format("%s_remove", PRINCIPAL_NAME_PREFIX);
+
+      addCatalogPrincipalPrivileges(ctx.type_, ctx.catalog_, updatePrincipal,
+          "functional");
+      addCatalogPrincipalPrivileges(ctx.type_, ctx.catalog_, removePrincipal,
+          "functional");
+
+      addSentryPrincipalPrivileges(ctx.type_, ctx.sentryService_, addPrincipal,
+          "functional");
+      addSentryPrincipalPrivileges(ctx.type_, ctx.sentryService_, updatePrincipal,
+          "functional", "functional_kudu");
+      dropSentryPrincipalPrivileges(ctx.type_, ctx.sentryService_, removePrincipal);
+
+      CatalogState noReset = refreshSentryAuthorization(ctx.catalog_, ctx.sentryService_,
+          false);
+
+      checkCatalogPrincipalPrivileges(ctx.type_, ctx.catalog_, addPrincipal,
+          "server=server1->db=functional->grantoption=false");
+      checkCatalogPrincipalPrivileges(ctx.type_, ctx.catalog_, updatePrincipal,
+          "server=server1->db=functional->grantoption=false",
+          "server=server1->db=functional_kudu->grantoption=false");
+      checkNoCatalogPrincipalPrivileges(ctx.type_, ctx.catalog_, removePrincipal);
+
+      CatalogState reset = refreshSentryAuthorization(ctx.catalog_, ctx.sentryService_,
+          true);
+      checkCatalogState(noReset, reset);
+    });
+  }
+
+  @Test
+  public void testUpdateSentryNotInCatalog() {
+    withAllPrincipalTypes(ctx -> {
+      String principalName = String.format("%s_update", PRINCIPAL_NAME_PREFIX);
+      addSentryPrincipalPrivileges(ctx.type_, ctx.sentryService_, principalName,
+          "functional", "functional_kudu");
+
+      CatalogState noReset = refreshSentryAuthorization(ctx.catalog_, ctx.sentryService_,
+          false);
+
+      checkCatalogPrincipalPrivileges(ctx.type_, ctx.catalog_, principalName,
+          "server=server1->db=functional->grantoption=false",
+          "server=server1->db=functional_kudu->grantoption=false");
+
+      CatalogState reset = refreshSentryAuthorization(ctx.catalog_, ctx.sentryService_,
+          true);
+      checkCatalogState(noReset, reset);
+    });
+  }
+
+  @Test
+  public void testDropSentryNotInCatalog() {
+    withAllPrincipalTypes(ctx -> {
+      String principalName = String.format("%s_remove", PRINCIPAL_NAME_PREFIX);
+      dropSentryPrincipalPrivileges(ctx.type_, ctx.sentryService_, principalName);
+
+      CatalogState noReset = refreshSentryAuthorization(ctx.catalog_, ctx.sentryService_,
+          false);
+
+      checkNoCatalogPrincipalPrivileges(ctx.type_, ctx.catalog_, principalName);
+
+      CatalogState reset = refreshSentryAuthorization(ctx.catalog_, ctx.sentryService_,
+          true);
+      checkCatalogState(noReset, reset);
+    });
+  }
+
+  @Test
+  public void testRoleNameCaseInsensitive() throws ImpalaException {
+    String lowerCaseRoleName = String.format("%s_case_insensitive_role",
+        PRINCIPAL_NAME_PREFIX);
+    String upperCaseRoleName = lowerCaseRoleName.toUpperCase();
+    String mixedCaseRoleName = lowerCaseRoleName.substring(0, 1).toUpperCase() +
+        lowerCaseRoleName.substring(1);
+
+    CatalogServiceCatalog catalog = CatalogServiceTestCatalog.createWithAuth(
+        authzConfig_.getSentryConfig());
+
+    addSentryRolePrivileges(sentryService_, lowerCaseRoleName, "functional");
+    CatalogState noReset = refreshSentryAuthorization(catalog, sentryService_, false);
+
+    // Impala stores the role name in case insensitive way.
+    for (String roleName: new String[]{
+        lowerCaseRoleName, upperCaseRoleName, mixedCaseRoleName}) {
+      Role role = catalog.getAuthPolicy().getRole(roleName);
+      assertEquals(lowerCaseRoleName, role.getName());
+      assertEquals(1, role.getPrivileges().size());
+      assertNotNull(role.getPrivilege(
+          "server=server1->db=functional->grantoption=false"));
+    }
+
+    try {
+      sentryService_.createRole(USER, upperCaseRoleName, false);
+      fail("Exception should be thrown when creating a duplicate role name.");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().startsWith("Error creating role"));
+    }
+
+    // No new role should be added.
+    for (String roleName: new String[]{
+        lowerCaseRoleName, upperCaseRoleName, mixedCaseRoleName}) {
+      Role role = catalog.getAuthPolicy().getRole(roleName);
+      assertEquals(lowerCaseRoleName, role.getName());
+      assertEquals(1, role.getPrivileges().size());
+      assertNotNull(role.getPrivilege(
+          "server=server1->db=functional->grantoption=false"));
+    }
+
+    CatalogState reset = refreshSentryAuthorization(catalog, sentryService_, true);
+    checkCatalogState(noReset, reset);
+  }
+
+  @Test
+  public void testUserNameCaseSensitive() throws ImpalaException {
+    String lowerCaseUserName = String.format("%s_case_sensitive_user",
+        PRINCIPAL_NAME_PREFIX);
+    String upperCaseUserName = lowerCaseUserName.toUpperCase();
+    String mixedCaseUserName = lowerCaseUserName.substring(0, 1).toUpperCase() +
+        lowerCaseUserName.substring(1);
+
+    CatalogServiceCatalog catalog = CatalogServiceTestCatalog.createWithAuth(
+        authzConfig_.getSentryConfig());
+
+    SentryPolicyServiceStub sentryService = createSentryPolicyServiceStub(
+        authzConfig_.getSentryConfig());
+    // We grant different privileges to different users to ensure each user is
+    // granted with a distinct privilege.
+    addSentryUserPrivileges(sentryService, lowerCaseUserName, "functional");
+    addSentryUserPrivileges(sentryService, upperCaseUserName, "functional_kudu");
+    addSentryUserPrivileges(sentryService, mixedCaseUserName, "functional_parquet");
+
+    CatalogState noReset = refreshSentryAuthorization(catalog, sentryService, false);
+
+    // Impala stores the user name in case sensitive way.
+    for (Pair<String, String> userPrivilege: new Pair[]{
+        new Pair(lowerCaseUserName, "server=server1->db=functional->grantoption=false"),
+        new Pair(upperCaseUserName, "server=server1->db=functional_kudu" +
+            "->grantoption=false"),
+        new Pair(mixedCaseUserName, "server=server1->db=functional_parquet" +
+            "->grantoption=false")}) {
+      User user = catalog.getAuthPolicy().getUser(userPrivilege.first);
+      assertEquals(userPrivilege.first, user.getName());
+      assertEquals(1, user.getPrivileges().size());
+      assertNotNull(user.getPrivilege(userPrivilege.second));
+    }
+
+    CatalogState reset = refreshSentryAuthorization(catalog, sentryService, true);
+    checkCatalogState(noReset, reset);
+  }
+
+  private static void addCatalogPrincipalPrivileges(TPrincipalType type,
+      CatalogServiceCatalog catalog, String principalName, String... dbNames) {
+    try {
+      switch (type) {
+        case ROLE:
+          addCatalogRolePrivileges(catalog, principalName, dbNames);
+          break;
+        case USER:
+          addCatalogUserPrivileges(catalog, principalName, dbNames);
+          break;
+      }
+    } catch (ImpalaException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void addCatalogRolePrivileges(CatalogServiceCatalog catalog,
+      String roleName, String... dbNames) throws ImpalaException {
+    Role role = catalog.addRole(roleName, Sets.newHashSet(USER.getShortName()));
+    for (String dbName: dbNames) {
+      catalog.addRolePrivilege(roleName, createRolePrivilege(role.getId(), dbName));
+    }
+  }
+
+  private static void addCatalogUserPrivileges(CatalogServiceCatalog catalog,
+      String userName, String... dbNames) throws ImpalaException {
+    User user = catalog.addUser(userName);
+    for (String dbName: dbNames) {
+      catalog.addUserPrivilege(userName, createUserPrivilege(user.getId(), dbName));
+    }
+  }
+
+  private static void addSentryPrincipalPrivileges(TPrincipalType type,
+      SentryPolicyService sentryService, String principalName, String... dbNames) {
+    try {
+      switch (type) {
+        case ROLE:
+          addSentryRolePrivileges(sentryService, principalName, dbNames);
+          break;
+        case USER:
+          addSentryUserPrivileges(sentryService, principalName, dbNames);
+          break;
+      }
+    } catch (ImpalaException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void addSentryRolePrivileges(SentryPolicyService sentryService,
+      String roleName, String... dbNames) throws ImpalaException {
+    sentryService.createRole(USER, roleName, false);
+    for (String dbName: dbNames) {
+      sentryService.grantRolePrivilege(USER, roleName, createRolePrivilege(dbName));
+    }
+  }
+
+  private static void addSentryUserPrivileges(SentryPolicyService sentryService,
+      String userName, String... dbNames) throws ImpalaException {
+    Preconditions.checkArgument(sentryService instanceof SentryPolicyServiceStub);
+    SentryPolicyServiceStub stub = (SentryPolicyServiceStub) sentryService;
+    stub.createUser(userName);
+    for (String dbName: dbNames) {
+      stub.grantUserPrivilege(userName, createUserPrivilege(dbName));
+    }
+  }
+
+  private static void dropSentryPrincipalPrivileges(TPrincipalType type,
+      SentryPolicyService sentryService, String principalName) {
+    try {
+      switch (type) {
+        case ROLE:
+          dropSentryRolePrivileges(sentryService, principalName);
+          break;
+        case USER:
+          dropSentryUserPrivileges(sentryService, principalName);
+          break;
+      }
+    } catch (ImpalaException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void dropSentryRolePrivileges(SentryPolicyService sentryService,
+      String roleName) throws ImpalaException {
+    sentryService.dropRole(USER, roleName, true);
+  }
+
+  private static void dropSentryUserPrivileges(SentryPolicyService sentryService,
+      String userName) throws ImpalaException {
+    Preconditions.checkArgument(sentryService instanceof SentryPolicyServiceStub);
+    SentryPolicyServiceStub stub = (SentryPolicyServiceStub) sentryService;
+    stub.dropRole(USER, userName, true);
+  }
+
+  private static void checkCatalogPrincipalPrivileges(TPrincipalType type,
+      CatalogServiceCatalog catalog, String principalName, String... privileges) {
+    switch (type) {
+      case ROLE:
+        checkCatalogRolePrivileges(catalog, principalName, privileges);
+        break;
+      case USER:
+        checkCatalogUserPrivileges(catalog, principalName, privileges);
+        break;
+    }
+  }
+
+  private static void checkCatalogRolePrivileges(CatalogServiceCatalog catalog,
+      String roleName, String... privileges) {
+    Role role = catalog.getAuthPolicy().getRole(roleName);
+    assertEquals(role.getName(), roleName);
+    assertEquals(privileges.length, role.getPrivileges().size());
+    for (String privilege: privileges) {
+      assertNotNull(role.getPrivilege(privilege));
+    }
+  }
+
+  private static void checkCatalogUserPrivileges(CatalogServiceCatalog catalog,
+      String userName, String... privileges) {
+    User user = catalog.getAuthPolicy().getUser(userName);
+    assertEquals(user.getName(), userName);
+    assertEquals(privileges.length, user.getPrivileges().size());
+    for (String privilege: privileges) {
+      assertNotNull(user.getPrivilege(privilege));
+    }
+  }
+
+  private static void checkNoCatalogPrincipalPrivileges(TPrincipalType type,
+      CatalogServiceCatalog catalog, String principalName) {
+    switch (type) {
+      case ROLE:
+        assertNull(catalog.getAuthPolicy().getRole(principalName));
+        break;
+      case USER:
+        assertNull(catalog.getAuthPolicy().getUser(principalName));
+        break;
+    }
+  }
+
+  private static long getAuthCatalogSize(CatalogServiceCatalog catalog) {
+    return catalog.getAuthPolicy().getAllRoles().size() +
+        catalog.getAuthPolicy().getAllRoles().stream()
+            .mapToInt(p -> p.getPrivileges().size()).sum() +
+        catalog.getAuthPolicy().getAllUsers().size() +
+        catalog.getAuthPolicy().getAllUsers().stream()
+            .mapToInt(p -> p.getPrivileges().size()).sum();
+  }
+
+  private static void checkCatalogState(CatalogState noReset, CatalogState reset) {
+    // Resetting the version means the new version will be current version + the number
+    // of all authorization catalog objects.
+    assertEquals(noReset.catalogVersion_ + noReset.catalogSize_, reset.catalogVersion_);
+    // Catalog size should be the same regardless whether or not the versions are reset.
+    assertEquals(noReset.catalogSize_, reset.catalogSize_);
+  }
+
+  private static class CatalogState {
+    private final long catalogVersion_;
+    private final long catalogSize_;
+
+    public CatalogState(long catalogVersion, long catalogSize) {
+      catalogVersion_ = catalogVersion;
+      catalogSize_ = catalogSize;
+    }
+  }
+
+  private static CatalogState refreshSentryAuthorization(CatalogServiceCatalog catalog,
+      SentryPolicyService sentryService, boolean resetVersions) {
+    SentryProxy.refreshSentryAuthorization(catalog, sentryService, USER, resetVersions,
+        false);
+    return new CatalogState(catalog.getCatalogVersion(), getAuthCatalogSize(catalog));
+  }
+
+  private static class SentryPolicyServiceStub extends SentryPolicyService {
+    private final Map<String, Set<TSentryPrivilege>> allUserPrivileges_ =
+        Maps.newHashMap();
+
+    public SentryPolicyServiceStub(SentryConfig sentryConfig) {
+      super(sentryConfig);
+    }
+
+    @Override
+    public Map<String, Set<TSentryPrivilege>> listAllUsersPrivileges(
+        org.apache.impala.authorization.User requestingUser) throws ImpalaException {
+      return allUserPrivileges_;
+    }
+
+    public void createUser(String userName) {
+      allUserPrivileges_.put(userName, Sets.newHashSet());
+    }
+
+    public void grantUserPrivilege(String userName, TPrivilege privilege) {
+      allUserPrivileges_.get(userName).add(createSentryPrivilege(privilege.getDb_name()));
+    }
+
+    public void dropUser(String userName) {
+      allUserPrivileges_.remove(userName);
+    }
+  }
+
+  private static SentryPolicyServiceStub createSentryPolicyServiceStub(
+      SentryConfig sentryConfig) {
+    return new SentryPolicyServiceStub(sentryConfig);
+  }
+
+  private static TPrivilege createRolePrivilege(int roleId, String dbName) {
+    return createPrincipalPrivilege(TPrincipalType.ROLE, roleId, dbName);
+  }
+
+  private static TPrivilege createRolePrivilege(String dbName) {
+    return createRolePrivilege(0, dbName);
+  }
+
+  private static TPrivilege createUserPrivilege(int userId, String dbName) {
+    return createPrincipalPrivilege(TPrincipalType.USER, userId, dbName);
+  }
+
+  private static TPrivilege createUserPrivilege(String dbName) {
+    return createPrincipalPrivilege(TPrincipalType.USER, 0, dbName);
+  }
+
+  private static TPrivilege createPrincipalPrivilege(TPrincipalType principalType,
+      int principalId, String dbName) {
+    TPrivilege privilege = new TPrivilege(TPrivilegeLevel.ALL, TPrivilegeScope.DATABASE,
+        false);
+    privilege.setPrincipal_id(principalId);
+    privilege.setPrincipal_type(principalType);
+    privilege.setServer_name("server1");
+    privilege.setDb_name(dbName);
+    return privilege;
+  }
+
+  private static TSentryPrivilege createSentryPrivilege(String dbName) {
+    TSentryPrivilege privilege = new TSentryPrivilege("DATABASE", "server1", "*");
+    privilege.setDbName(dbName);
+    return privilege;
+  }
+
+  private static class SentryProxyTestContext {
+    private final TPrincipalType type_;
+    private final CatalogServiceCatalog catalog_;
+    private final SentryPolicyService sentryService_;
+
+    public SentryProxyTestContext(TPrincipalType type, CatalogServiceCatalog catalog,
+        SentryPolicyService sentryService) {
+      type_ = type;
+      catalog_ = catalog;
+      sentryService_ = sentryService;
+    }
+  }
+
+  private void withAllPrincipalTypes(Consumer<SentryProxyTestContext> consumer) {
+    for (TPrincipalType type: TPrincipalType.values()) {
+      CatalogServiceCatalog catalog = CatalogServiceTestCatalog.createWithAuth(
+          authzConfig_.getSentryConfig());
+      SentryPolicyService sentryService = sentryService_;
+      if (type == TPrincipalType.USER) {
+        sentryService = createSentryPolicyServiceStub(authzConfig_.getSentryConfig());
+      }
+      consumer.accept(new SentryProxyTestContext(type, catalog, sentryService));
+    }
+  }
+}
\ No newline at end of file


[6/9] impala git commit: IMPALA-7836: [DOCS] Document TOPN_BYTES_LIMIT query option

Posted by ta...@apache.org.
IMPALA-7836: [DOCS] Document TOPN_BYTES_LIMIT query option

Change-Id: Ib7109c2949ee5137d8b4a748227948b79bd93f52
Reviewed-on: http://gerrit.cloudera.org:8080/11914
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/731254b5
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/731254b5
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/731254b5

Branch: refs/heads/master
Commit: 731254b52934c17d953da541df8bc4493beb037a
Parents: 74354a7
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Thu Nov 8 14:55:32 2018 -0800
Committer: Alex Rodoni <ar...@cloudera.com>
Committed: Wed Nov 14 18:26:42 2018 +0000

----------------------------------------------------------------------
 docs/impala.ditamap                     |  1 +
 docs/topics/impala_topn_bytes_limit.xml | 84 ++++++++++++++++++++++++++++
 2 files changed, 85 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/731254b5/docs/impala.ditamap
----------------------------------------------------------------------
diff --git a/docs/impala.ditamap b/docs/impala.ditamap
index 051b838..8eecf06 100644
--- a/docs/impala.ditamap
+++ b/docs/impala.ditamap
@@ -239,6 +239,7 @@ under the License.
           <topicref href="topics/impala_thread_reservation_aggregate_limit.xml"/>
           <topicref href="topics/impala_thread_reservation_limit.xml"/>
           <topicref href="topics/impala_timezone.xml"/>
+          <topicref href="topics/impala_topn_bytes_limit.xml"/>
         </topicref>
       </topicref>
       <topicref href="topics/impala_show.xml"/>

http://git-wip-us.apache.org/repos/asf/impala/blob/731254b5/docs/topics/impala_topn_bytes_limit.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_topn_bytes_limit.xml b/docs/topics/impala_topn_bytes_limit.xml
new file mode 100644
index 0000000..c6329f9
--- /dev/null
+++ b/docs/topics/impala_topn_bytes_limit.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<!DOCTYPE concept PUBLIC "-//OASIS//DTD DITA Concept//EN" "concept.dtd">
+<concept rev="3.1.0" id="topn_bytes_limit">
+
+  <title>TOPN_BYTES_LIMIT Query Option (<keyword keyref="impala31_full"/> or higher only)</title>
+
+  <titlealts audience="PDF">
+
+    <navtitle>TOPN_BYTES_LIMIT</navtitle>
+
+  </titlealts>
+
+  <prolog>
+    <metadata>
+      <data name="Category" value="Impala"/>
+      <data name="Category" value="Impala Query Options"/>
+      <data name="Category" value="Querying"/>
+      <data name="Category" value="Developers"/>
+      <data name="Category" value="Data Analysts"/>
+    </metadata>
+  </prolog>
+
+  <conbody>
+
+    <p>
+      The <codeph>TOPN_BYTES_LIMIT</codeph> query option places a limit on the amount of
+      estimated memory that Impala can process for <term>top-N</term> queries.
+    </p>
+
+    <p>
+      <term>top-N</term> queries are the queries that include both <codeph>ORDER BY</codeph> and
+      <codeph>LIMIT</codeph> clauses. <term>top-N</term> queries don't spill to disk so they
+      have to keep all rows they process in memory, and those queries can cause out-of-memory
+      issues when running with a large limit and an offset. If the Impala planner estimates that
+      a <term>top-N</term> operator will process more bytes than the
+      <codeph>TOPN_BYTES_LIMIT</codeph> value, it will replace the <term>top-N</term> operator
+      with the <term>sort</term> operator. Switching to the <term>sort</term> operator allows
+      Impala to spill to disk, thus requiring less memory than <term>top-N</term>, but
+      potentially with performance penalties.
+    </p>
+
+    <p>
+      The option has no effect when set to 0 or -1.
+    </p>
+
+    <p>
+      <b>Syntax:</b>
+    </p>
+
+<codeblock>SET TOPN_BYTES_LIMIT=<varname>limit</varname></codeblock>
+
+    <p>
+      <b>Type:</b> Number
+    </p>
+
+    <p>
+      <b>Default:</b> 536870912 (512 MB)
+    </p>
+
+    <p>
+      <b>Added in:</b> <keyword keyref="impala31"/>
+    </p>
+
+  </conbody>
+
+</concept>


[5/9] impala git commit: IMPALA-7806: [DOCS] Updated Known Issues in 3.1

Posted by ta...@apache.org.
IMPALA-7806: [DOCS] Updated Known Issues in 3.1

- Reviewed and updated the currently known issues for the 3.1 release

Change-Id: Iae68b308f5c0d8bfe124054480d9b2333f70b249
Reviewed-on: http://gerrit.cloudera.org:8080/11921
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
Reviewed-by: Alex Rodoni <ar...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/74354a71
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/74354a71
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/74354a71

Branch: refs/heads/master
Commit: 74354a7189491625f3afa7e628eb4b04af7ecaed
Parents: b2dbc0f
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Mon Nov 12 16:44:07 2018 -0800
Committer: Alex Rodoni <ar...@cloudera.com>
Committed: Wed Nov 14 17:47:45 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_known_issues.xml | 73 --------------------------------
 1 file changed, 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/74354a71/docs/topics/impala_known_issues.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_known_issues.xml b/docs/topics/impala_known_issues.xml
index b7c439d..b498197 100644
--- a/docs/topics/impala_known_issues.xml
+++ b/docs/topics/impala_known_issues.xml
@@ -173,8 +173,6 @@ under the License.
 
     <concept id="IMPALA-1792" rev="IMPALA-1792">
 
-<!-- Not part of Alex's spreadsheet -->
-
       <title>ImpalaODBC: Can not get the value in the SQLGetData(m-x th column) after the SQLBindCol(m th column)</title>
 
       <conbody>
@@ -243,28 +241,6 @@ under the License.
       </p>
       </conbody>
     </concept>
-    <concept id="IMPALLA-7298">
-      <title>Kerberos authentication fails with the reverse DNS lookup
-        disabled</title>
-      <conbody>
-        <p> Kerberos authentication does not function correctly if <codeph>rdns
-            = false</codeph> is configured in <codeph>krb5.conf</codeph>. If the
-          flag <codeph>rdns = false</codeph>, when Impala tries to match
-          principals, it will fail because Kerberos receives a SPN (Service
-          Principal Name) with an IP address in it, but Impala expects a
-          principal with a FQDN in it.</p>
-        <p>
-          <b>Bug:</b>
-          <xref keyref="IMPALA-7298">IMPALA-7298</xref></p>
-        <p><b>Affected Versions:</b> Impala 2.12.0 and 3.0</p>
-        <p>
-          <b>Workaround:</b> Set the following flags in
-            <codeph>krb5.conf</codeph>: <ul>
-            <li><codeph>dns_canonicalize_hostname = true</codeph></li>
-            <li><codeph>rdns = true</codeph></li>
-          </ul></p>
-      </conbody>
-    </concept>
 </concept>
 
   <concept id="known_issues_resources">
@@ -279,27 +255,6 @@ under the License.
       </p>
 
     </conbody>
-    <!--AR: The workaround topic does not exist. Not sure if this was ever fully documented upstream.-->
-
-    <concept id="IMPALA-6028" audience="hidden">
-      <title>Handling large rows during upgrade to <keyword
-          keyref="impala210_full"/> or higher</title>
-      <conbody>
-        <p> After an upgrade to <keyword keyref="impala210_full"/> or higher,
-          users who process very large column values (long strings), or have
-          increased the <codeph>--read_size</codeph> configuration setting from
-          its default of 8 MB, might encounter capacity errors for some queries
-          that previously worked. </p>
-        <p>
-          <b>Resolution:</b> After the upgrade, follow the instructions in <xref
-            keyref="convert_read_size"/> to check if your queries are affected
-          by these changes and to modify your configuration settings if so. </p>
-        <p>
-          <b>Apache Issue:</b>
-          <xref keyref="IMPALA-6028">IMPALA-6028</xref>
-        </p>
-      </conbody>
-    </concept>
 
     <concept id="IMPALA-5605">
 
@@ -518,8 +473,6 @@ explain SELECT 1 FROM alltypestiny a1
 
     </conbody>
 
-<!-- Opened based on internal JIRA. Not part of Alex's spreadsheet AFAIK. -->
-
     <concept id="describe_formatted_avro">
 
       <title>DESCRIBE FORMATTED gives error on Avro table</title>
@@ -564,8 +517,6 @@ ALTER TABLE table_name SET TBLPROPERTIES('EXTERNAL'='TRUE');
 
     <concept id="IMP-175">
 
-<!-- Not part of Alex's spreadsheet. Perhaps it really is a permanent limitation and nobody is tracking it? -->
-
       <title>Deviation from Hive behavior: Out of range values float/double values are returned as maximum allowed value of type (Hive returns NULL)</title>
 
       <conbody>
@@ -585,8 +536,6 @@ ALTER TABLE table_name SET TBLPROPERTIES('EXTERNAL'='TRUE');
 
     <concept id="flume_writeformat_text">
 
-<!-- Not part of Alex's spreadsheet. From a non-public JIRA. -->
-
       <title>Configuration needed for Flume to be compatible with Impala</title>
 
       <conbody>
@@ -610,8 +559,6 @@ ALTER TABLE table_name SET TBLPROPERTIES('EXTERNAL'='TRUE');
 
     <concept id="IMPALA-635" rev="IMPALA-635">
 
-<!-- Not part of Alex's spreadsheet -->
-
       <title>Avro Scanner fails to parse some schemas</title>
 
       <conbody>
@@ -641,8 +588,6 @@ ALTER TABLE table_name SET TBLPROPERTIES('EXTERNAL'='TRUE');
 
     <concept id="IMPALA-1024" rev="IMPALA-1024">
 
-<!-- Not part of Alex's spreadsheet -->
-
       <title>Impala BE cannot parse Avro schema that contains a trailing semi-colon</title>
 
       <conbody>
@@ -666,8 +611,6 @@ ALTER TABLE table_name SET TBLPROPERTIES('EXTERNAL'='TRUE');
 
     <concept id="IMPALA-1652" rev="IMPALA-1652">
 
-<!-- To do: Isn't this more a correctness issue? -->
-
       <title>Incorrect results with basic predicate on CHAR typed column</title>
 
       <conbody>
@@ -725,10 +668,6 @@ ALTER TABLE table_name SET TBLPROPERTIES('EXTERNAL'='TRUE');
         </p>
 
         <p>
-          <b>Resolution:</b>
-        </p>
-
-        <p>
           <b>Workaround:</b> Avoid queries with extremely large expression trees. Setting the
           query option <codeph>disable_codegen=true</codeph> may reduce the impact, at a cost of
           longer query runtime.
@@ -740,8 +679,6 @@ ALTER TABLE table_name SET TBLPROPERTIES('EXTERNAL'='TRUE');
 
     <concept id="IMPALA-77" rev="IMPALA-77">
 
-<!-- Not part of Alex's spreadsheet. Perhaps it really is a permanent limitation and nobody is tracking it? -->
-
       <title>Impala does not support running on clusters with federated namespaces</title>
 
       <conbody>
@@ -785,8 +722,6 @@ ALTER TABLE table_name SET TBLPROPERTIES('EXTERNAL'='TRUE');
 
     <concept id="IMPALA-2005" rev="IMPALA-2005">
 
-<!-- Not part of Alex's spreadsheet -->
-
       <title>A failed CTAS does not drop the table if the insert fails</title>
 
       <conbody>
@@ -812,8 +747,6 @@ ALTER TABLE table_name SET TBLPROPERTIES('EXTERNAL'='TRUE');
 
     <concept id="IMPALA-1821" rev="IMPALA-1821">
 
-<!-- Not part of Alex's spreadsheet -->
-
       <title>Casting scenarios with invalid/inconsistent results</title>
 
       <conbody>
@@ -837,8 +770,6 @@ ALTER TABLE table_name SET TBLPROPERTIES('EXTERNAL'='TRUE');
 
     <concept id="IMPALA-941" rev="IMPALA-941">
 
-<!-- Not part of Alex's spreadsheet. Maybe this is interop? -->
-
       <title>Impala Parser issue when using fully qualified table names that start with a number</title>
 
       <conbody>
@@ -864,8 +795,6 @@ ALTER TABLE table_name SET TBLPROPERTIES('EXTERNAL'='TRUE');
 
     <concept id="IMPALA-532" rev="IMPALA-532">
 
-<!-- Not part of Alex's spreadsheet. Perhaps it really is a permanent limitation and nobody is tracking it? -->
-
       <title>Impala should tolerate bad locale settings</title>
 
       <conbody>
@@ -897,8 +826,6 @@ ALTER TABLE table_name SET TBLPROPERTIES('EXTERNAL'='TRUE');
 
     <concept id="IMP-1203">
 
-<!-- Not part of Alex's spreadsheet. Perhaps it really is a permanent limitation and nobody is tracking it? -->
-
       <title>Log Level 3 Not Recommended for Impala</title>
 
       <conbody>


[9/9] impala git commit: IMPALA-5050: Add support to read TIMESTAMP_MILLIS and TIMESTAMP_MICROS from Parquet

Posted by ta...@apache.org.
IMPALA-5050: Add support to read TIMESTAMP_MILLIS and TIMESTAMP_MICROS from Parquet

Changes:
- parquet.thrift is updated to a newer version which contains the
  timestamp logical type.
- INT64 columns with converted types TIMESTAMP_MILLIS and
  TIMESTAMP_MICROS can be read as TIMESTAMP.
- If the logical type is timestamp, then the type will contain the
  information whether the UTC->local conversion is necessary. This
  feature is only supported for the new timestamp types, so INT96
  timestamps must still use flag
  convert_legacy_hive_parquet_utc_timestamps.
- Min/max stat filtering is enabled again for columns that need
  UTC->local conversion. This was disabled in IMPALA-7559 because
  it could incorrectly drop column chunks.
- CREATE TABLE LIKE PARQUET converts these columns to
  TIMESTAMP - before the change, an error was returned instead.
- Bulk of the Parquet column stat logic was moved to a new class
  called "ColumnStatsReader".

Testing:
- Added unit tests for timezone conversion (this needed a new public
  function in timezone_db.h and adding CET to tzdb_tiny).
- Added parquet files (created with parquet-mr) with int64 timestamp
  columns.

Change-Id: I4c7c01fffa31b3d2ca3480adf6ff851137dadac3
Reviewed-on: http://gerrit.cloudera.org:8080/11057
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/60095a4c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/60095a4c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/60095a4c

Branch: refs/heads/master
Commit: 60095a4c6bebc412a040d5b4a723e528ba0b2278
Parents: c75b371
Author: Csaba Ringhofer <cs...@cloudera.com>
Authored: Fri Jun 29 21:40:57 2018 +0200
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Nov 14 20:16:14 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc             |  49 ++---
 be/src/exec/hdfs-parquet-scanner.h              |   6 +-
 be/src/exec/parquet-column-readers.cc           | 150 +++++++++----
 be/src/exec/parquet-column-readers.h            |  10 +-
 be/src/exec/parquet-column-stats.cc             | 103 +++++----
 be/src/exec/parquet-column-stats.h              | 102 +++++----
 be/src/exec/parquet-column-stats.inline.h       |  17 +-
 be/src/exec/parquet-common.cc                   |  41 ++++
 be/src/exec/parquet-common.h                    |  82 ++++++-
 be/src/exec/parquet-metadata-utils.cc           |  32 ++-
 be/src/exprs/timezone_db.h                      |   6 +
 be/src/runtime/timestamp-test.cc                |  66 ++++++
 be/src/runtime/timestamp-value.cc               |  50 +++--
 be/src/runtime/timestamp-value.h                |  14 +-
 be/src/util/dict-encoding.h                     |  26 ++-
 common/thrift/parquet.thrift                    | 212 ++++++++++++++++---
 .../apache/impala/analysis/ParquetHelper.java   |   7 +
 testdata/data/README                            |  26 +++
 .../int64_timestamps_at_dst_changes.parquet     | Bin 0 -> 2301 bytes
 testdata/data/int64_timestamps_dict.parquet     | Bin 0 -> 1970 bytes
 testdata/data/int64_timestamps_plain.parquet    | Bin 0 -> 1046 bytes
 testdata/tzdb_tiny/CET                          | Bin 0 -> 2102 bytes
 .../QueryTest/parquet-int64-timestamps.test     |  89 ++++++++
 tests/query_test/test_scanners.py               |  48 +++++
 24 files changed, 927 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 0d5b0cc..7635f31 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -453,21 +453,6 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
 
   if (!state_->query_options().parquet_read_statistics) return Status::OK();
 
-  // IMPALA-7559: if the values are converted from UTC to local time, then either the
-  // stats need to be converted from UTC to local, or the predicate's min/max values
-  // need to be converted from local to UTC. Doing this correctly is quite complex if
-  // the timestamps fall into timezone rules changes (DST change or historical rule
-  // change), so currently stat filtering is simply disabled for these columns.
-  //
-  // Note that parquet-mr only writes stats if min and max are equal, because it cannot
-  // order timestamps correctly, so the only case affected here is when every value is
-  // the same in the column chunk.
-  // TODO: This topic needs more investigation related to IMPALA-5050, which will add
-  // support for INT64 millisec/microsec timestamp columns, and also a metadata field
-  // whether utc->local conversion is necessary. I am not sure how parquet-mr handles
-  // stats for these types at the moment.
-  bool disable_min_max_filter_for_timestamps = IsTimezoneConversionNeededForTimestamps();
-
   const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
   if (!min_max_tuple_desc) return Status::OK();
 
@@ -516,30 +501,36 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
     const parquet::ColumnChunk& col_chunk = row_group.columns[col_idx];
     const ColumnType& col_type = slot_desc->type();
 
+    DCHECK(node->element != nullptr);
+
+    ColumnStatsReader stat_reader(col_chunk, col_type, col_order,  *node->element);
+    if (col_type.IsTimestampType()) {
+      stat_reader.SetTimestampDecoder(CreateTimestampDecoder(*node->element));
+    }
+
     int64_t null_count = 0;
-    bool null_count_result = ColumnStatsBase::ReadNullCountStat(col_chunk, &null_count);
+    bool null_count_result = stat_reader.ReadNullCountStat(&null_count);
     if (null_count_result && null_count == col_chunk.meta_data.num_values) {
       *skip_row_group = true;
       break;
     }
 
-    if (col_type.IsTimestampType() && disable_min_max_filter_for_timestamps) continue;
-
-    bool stats_read = false;
-    void* slot = min_max_tuple_->GetSlot(slot_desc->tuple_offset());
     const string& fn_name = eval->root().function_name();
+    ColumnStatsReader::StatsField stats_field;
     if (fn_name == "lt" || fn_name == "le") {
       // We need to get min stats.
-      stats_read = ColumnStatsBase::ReadFromThrift(
-          col_chunk, col_type, col_order, ColumnStatsBase::StatsField::MIN, slot);
+      stats_field = ColumnStatsReader::StatsField::MIN;
     } else if (fn_name == "gt" || fn_name == "ge") {
       // We need to get max stats.
-      stats_read = ColumnStatsBase::ReadFromThrift(
-          col_chunk, col_type, col_order, ColumnStatsBase::StatsField::MAX, slot);
+      stats_field = ColumnStatsReader::StatsField::MAX;
     } else {
       DCHECK(false) << "Unsupported function name for statistics evaluation: " << fn_name;
+      continue;
     }
 
+    void* slot = min_max_tuple_->GetSlot(slot_desc->tuple_offset());
+    bool stats_read = stat_reader.ReadFromThrift(stats_field, slot);
+
     if (stats_read) {
       TupleRow row;
       row.SetTuple(0, min_max_tuple_);
@@ -1677,9 +1668,13 @@ Status HdfsParquetScanner::ValidateEndOfRowGroup(
   return Status::OK();
 }
 
-bool HdfsParquetScanner::IsTimezoneConversionNeededForTimestamps() {
-  return FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
+ParquetTimestampDecoder HdfsParquetScanner::CreateTimestampDecoder(
+    const parquet::SchemaElement& element) {
+  bool timestamp_conversion_needed_for_int96_timestamps =
+      FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
       file_version_.application == "parquet-mr";
-}
 
+  return ParquetTimestampDecoder(element, &state_->local_time_zone(),
+      timestamp_conversion_needed_for_int96_timestamps);
+}
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 5493926..d559b8e 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -343,9 +343,9 @@ class HdfsParquetScanner : public HdfsScanner {
       llvm::Function** process_scratch_batch_fn)
       WARN_UNUSED_RESULT;
 
-  /// Returns true if the timestamps are expected to be in UTC and need to be
-  /// converted to local time.
-  bool IsTimezoneConversionNeededForTimestamps();
+  /// Initializes a ParquetTimestampDecoder depending on writer, timezone, and the schema
+  /// of the column.
+  ParquetTimestampDecoder CreateTimestampDecoder(const parquet::SchemaElement& element);
 
   /// The rep and def levels are set to this value to indicate the end of a row group.
   static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min();

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 9a9d704..5bf6543 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -372,16 +372,15 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   /// the max length for VARCHAR columns. Unused otherwise.
   int fixed_len_size_;
 
-  /// Query-global timezone used as local timezone when executing the query.
-  const Timezone& local_time_zone_;
+  /// Contains extra data needed for Timestamp decoding.
+  ParquetTimestampDecoder timestamp_decoder_;
 };
 
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ScalarColumnReader(
     HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc)
   : BaseScalarColumnReader(parent, node, slot_desc),
-    dict_decoder_(parent->scan_node_->mem_tracker()),
-    local_time_zone_(parent->state_->local_time_zone()) {
+    dict_decoder_(parent->scan_node_->mem_tracker()) {
   if (!MATERIALIZED) {
     // We're not materializing any values, just counting them. No need (or ability) to
     // initialize state used to materialize values.
@@ -399,9 +398,14 @@ ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ScalarColumnReader
   } else {
     fixed_len_size_ = -1;
   }
-  needs_conversion_ = slot_desc_->type().type == TYPE_CHAR ||
-      (slot_desc_->type().type == TYPE_TIMESTAMP &&
-      parent->IsTimezoneConversionNeededForTimestamps());
+
+  needs_conversion_ = slot_desc_->type().type == TYPE_CHAR;
+
+  if (slot_desc_->type().type == TYPE_TIMESTAMP) {
+    timestamp_decoder_ = parent->CreateTimestampDecoder(*node.element);
+    dict_decoder_.SetTimestampHelper(timestamp_decoder_);
+    needs_conversion_ = timestamp_decoder_.NeedsConversion();
+  }
 }
 
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
@@ -641,6 +645,30 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValue(
   return true;
 }
 
+template <>
+template <Encoding::type ENCODING>
+bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>::DecodeValue(
+    uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end,
+    TimestampValue* RESTRICT val) RESTRICT {
+  DCHECK_EQ(page_encoding_, ENCODING);
+  if (ENCODING == Encoding::PLAIN_DICTIONARY) {
+    if (UNLIKELY(!dict_decoder_.GetNextValue(val))) {
+      SetDictDecodeError();
+      return false;
+    }
+  } else {
+    DCHECK_EQ(ENCODING, Encoding::PLAIN);
+    int encoded_len =
+        timestamp_decoder_.Decode<parquet::Type::INT64>(*data, data_end, val);
+    if (UNLIKELY(encoded_len < 0)) {
+      SetPlainDecodeError();
+      return false;
+    }
+    *data += encoded_len;
+  }
+  return true;
+}
+
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>
     ::ReadPositionBatched(int16_t rep_level, int64_t* pos) {
@@ -675,13 +703,30 @@ inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
 }
 
 template <>
+inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>
+::NeedsConversionInline() const {
+  return needs_conversion_;
+}
+
+template <>
 bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ConvertSlot(
     const TimestampValue* src, void* slot) {
   // Conversion should only happen when this flag is enabled.
   DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
+  DCHECK(timestamp_decoder_.NeedsConversion());
   TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
   *dst_ts = *src;
-  if (dst_ts->HasDateAndTime()) dst_ts->UtcToLocal(local_time_zone_);
+  timestamp_decoder_.ConvertToLocalTime(dst_ts);
+  return true;
+}
+
+template <>
+bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>::ConvertSlot(
+    const TimestampValue* src, void* slot) {
+  DCHECK(timestamp_decoder_.NeedsConversion());
+  TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
+  *dst_ts = *src;
+  timestamp_decoder_.ConvertToLocalTime(static_cast<TimestampValue*>(dst_ts));
   return true;
 }
 
@@ -692,6 +737,12 @@ inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
 }
 
 template <>
+inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>
+::NeedsValidationInline() const {
+  return true;
+}
+
+template <>
 bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ValidateValue(
     TimestampValue* val) const {
   if (UNLIKELY(!TimestampValue::IsValidDate(val->date())
@@ -711,6 +762,23 @@ bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ValidateVal
   return true;
 }
 
+template <>
+bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>::ValidateValue(
+    TimestampValue* val) const {
+  // The range was already checked during the int64_t->TimestampValue conversion, which
+  // sets the date to invalid if it was out of range.
+  if (UNLIKELY(!val->HasDate())) {
+    ErrorMsg msg(TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE,
+        filename(), node_.element->name);
+    Status status = parent_->state_->LogOrReturnError(msg);
+    if (!status.ok()) parent_->parse_status_ = status;
+    return false;
+  }
+  DCHECK(TimestampValue::IsValidDate(val->date()));
+  DCHECK(TimestampValue::IsValidTime(val->time()));
+  return true;
+}
+
 class BoolColumnReader : public BaseScalarColumnReader {
  public:
   BoolColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
@@ -1507,7 +1575,7 @@ void CollectionColumnReader::UpdateDerivedState() {
 }
 
 /// Returns a column reader for decimal types based on its size and parquet type.
-static ParquetColumnReader* GetDecimalColumnReader(const SchemaNode& node,
+static ParquetColumnReader* CreateDecimalColumnReader(const SchemaNode& node,
     const SlotDescriptor* slot_desc, HdfsParquetScanner* parent) {
   switch (node.element->type) {
     case parquet::Type::FIXED_LEN_BYTE_ARRAY:
@@ -1554,84 +1622,82 @@ static ParquetColumnReader* GetDecimalColumnReader(const SchemaNode& node,
 ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node,
     bool is_collection_field, const SlotDescriptor* slot_desc,
     HdfsParquetScanner* parent) {
-  ParquetColumnReader* reader = nullptr;
   if (is_collection_field) {
     // Create collection reader (note this handles both NULL and non-NULL 'slot_desc')
-    reader = new CollectionColumnReader(parent, node, slot_desc);
+    return new CollectionColumnReader(parent, node, slot_desc);
   } else if (slot_desc != nullptr) {
     // Create the appropriate ScalarColumnReader type to read values into 'slot_desc'
     switch (slot_desc->type().type) {
       case TYPE_BOOLEAN:
-        reader = new BoolColumnReader(parent, node, slot_desc);
-        break;
+        return new BoolColumnReader(parent, node, slot_desc);
       case TYPE_TINYINT:
-        reader = new ScalarColumnReader<int8_t, parquet::Type::INT32, true>(parent, node,
+        return new ScalarColumnReader<int8_t, parquet::Type::INT32, true>(parent, node,
             slot_desc);
-        break;
       case TYPE_SMALLINT:
-        reader = new ScalarColumnReader<int16_t, parquet::Type::INT32, true>(parent, node,
+        return new ScalarColumnReader<int16_t, parquet::Type::INT32, true>(parent, node,
             slot_desc);
-        break;
       case TYPE_INT:
-        reader = new ScalarColumnReader<int32_t, parquet::Type::INT32, true>(parent, node,
+        return new ScalarColumnReader<int32_t, parquet::Type::INT32, true>(parent, node,
             slot_desc);
-        break;
       case TYPE_BIGINT:
         switch (node.element->type) {
           case parquet::Type::INT32:
-            reader = new ScalarColumnReader<int64_t, parquet::Type::INT32, true>(parent,
+            return new ScalarColumnReader<int64_t, parquet::Type::INT32, true>(parent,
                 node, slot_desc);
-            break;
           default:
-            reader = new ScalarColumnReader<int64_t, parquet::Type::INT64, true>(parent,
+            return new ScalarColumnReader<int64_t, parquet::Type::INT64, true>(parent,
                 node, slot_desc);
-            break;
         }
-        break;
       case TYPE_FLOAT:
-        reader = new ScalarColumnReader<float, parquet::Type::FLOAT, true>(parent, node,
+        return new ScalarColumnReader<float, parquet::Type::FLOAT, true>(parent, node,
             slot_desc);
-        break;
       case TYPE_DOUBLE:
         switch (node.element->type) {
           case parquet::Type::INT32:
-            reader = new ScalarColumnReader<double , parquet::Type::INT32, true>(parent,
+            return new ScalarColumnReader<double , parquet::Type::INT32, true>(parent,
                 node, slot_desc);
-            break;
           case parquet::Type::FLOAT:
-            reader = new ScalarColumnReader<double, parquet::Type::FLOAT, true>(parent,
+            return new ScalarColumnReader<double, parquet::Type::FLOAT, true>(parent,
                 node, slot_desc);
-            break;
           default:
-            reader = new ScalarColumnReader<double, parquet::Type::DOUBLE, true>(parent,
+            return new ScalarColumnReader<double, parquet::Type::DOUBLE, true>(parent,
                 node, slot_desc);
-            break;
         }
-        break;
       case TYPE_TIMESTAMP:
-        reader = new ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>(
-            parent, node, slot_desc);
-        break;
+        return CreateTimestampColumnReader(node, slot_desc, parent);
       case TYPE_STRING:
       case TYPE_VARCHAR:
       case TYPE_CHAR:
-        reader = new ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>(
+        return new ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>(
             parent, node, slot_desc);
-        break;
       case TYPE_DECIMAL:
-        reader = GetDecimalColumnReader(node, slot_desc, parent);
-        break;
+        return CreateDecimalColumnReader(node, slot_desc, parent);
       default:
         DCHECK(false) << slot_desc->type().DebugString();
+        return nullptr;
     }
   } else {
     // Special case for counting scalar values (e.g. count(*), no materialized columns in
     // the file, only materializing a position slot). We won't actually read any values,
     // only the rep and def levels, so it doesn't matter what kind of reader we make.
-    reader = new ScalarColumnReader<int8_t, parquet::Type::INT32, false>(parent, node,
+    return new ScalarColumnReader<int8_t, parquet::Type::INT32, false>(parent, node,
         slot_desc);
   }
-  return parent->obj_pool_.Add(reader);
+}
+
+ParquetColumnReader* ParquetColumnReader::CreateTimestampColumnReader(
+    const SchemaNode& node, const SlotDescriptor* slot_desc,
+    HdfsParquetScanner* parent) {
+  if (node.element->type == parquet::Type::INT96) {
+    return new ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>(
+        parent, node, slot_desc);
+  }
+  else if (node.element->type == parquet::Type::INT64) {
+    return new ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>(
+        parent, node, slot_desc);
+  }
+  DCHECK(false) << slot_desc->type().DebugString();
+  return nullptr;
 }
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 790bde4..bce114a 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -132,10 +132,12 @@ class ParquetLevelDecoder {
 /// level pair at a time. The current def and rep level are exposed to the user, and the
 /// corresponding value (if defined) can optionally be copied into a slot via
 /// ReadValue(). Can also write position slots.
+///
+/// The constructor adds the object to the obj_pool of the parent HdfsParquetScanner.
 class ParquetColumnReader {
  public:
   /// Creates a column reader for 'node' and associates it with the given parent scanner.
-  /// Adds the new column reader to the parent's object pool.
+  /// The constructor of column readers add the new object to the parent's object pool.
   /// 'slot_desc' may be NULL, in which case the returned column reader can only be used
   /// to read def/rep levels.
   /// 'is_collection_field' should be set to true if the returned reader is reading a
@@ -155,6 +157,9 @@ class ParquetColumnReader {
   static ParquetColumnReader* Create(const SchemaNode& node, bool is_collection_field,
       const SlotDescriptor* slot_desc, HdfsParquetScanner* parent);
 
+  static ParquetColumnReader* CreateTimestampColumnReader(const SchemaNode& node,
+      const SlotDescriptor* slot_desc, HdfsParquetScanner* parent);
+
   virtual ~ParquetColumnReader() { }
 
   int def_level() const { return def_level_; }
@@ -303,6 +308,9 @@ class ParquetColumnReader {
       tuple_offset_(slot_desc == NULL ? -1 : slot_desc->tuple_offset()),
       null_indicator_offset_(slot_desc == NULL ? NullIndicatorOffset() :
           slot_desc->null_indicator_offset()) {
+    DCHECK(parent != nullptr);
+    parent->obj_pool_.Add(this);
+
     DCHECK_GE(node_.max_rep_level, 0);
     DCHECK_LE(node_.max_rep_level, std::numeric_limits<int16_t>::max());
     DCHECK_GE(node_.max_def_level, 0);

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/be/src/exec/parquet-column-stats.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.cc b/be/src/exec/parquet-column-stats.cc
index 2f6f7fc..478bba4 100644
--- a/be/src/exec/parquet-column-stats.cc
+++ b/be/src/exec/parquet-column-stats.cc
@@ -25,33 +25,31 @@
 
 namespace impala {
 
-bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk,
-    const ColumnType& col_type, const parquet::ColumnOrder* col_order,
-    StatsField stats_field, void* slot) {
-  if (!(col_chunk.__isset.meta_data && col_chunk.meta_data.__isset.statistics)) {
+bool ColumnStatsReader::ReadFromThrift(StatsField stats_field, void* slot) const {
+  if (!(col_chunk_.__isset.meta_data && col_chunk_.meta_data.__isset.statistics)) {
     return false;
   }
-  const parquet::Statistics& stats = col_chunk.meta_data.statistics;
+  const parquet::Statistics& stats = col_chunk_.meta_data.statistics;
 
   // Try to read the requested stats field. If it is not set, we may fall back to reading
   // the old stats, based on the column type.
   const string* stat_value = nullptr;
   switch (stats_field) {
     case StatsField::MIN:
-      if (stats.__isset.min_value && CanUseStats(col_type, col_order)) {
+      if (stats.__isset.min_value && CanUseStats()) {
         stat_value = &stats.min_value;
         break;
       }
-      if (stats.__isset.min && CanUseDeprecatedStats(col_type, col_order)) {
+      if (stats.__isset.min && CanUseDeprecatedStats()) {
         stat_value = &stats.min;
       }
       break;
     case StatsField::MAX:
-      if (stats.__isset.max_value && CanUseStats(col_type, col_order)) {
+      if (stats.__isset.max_value && CanUseStats()) {
         stat_value = &stats.max_value;
         break;
       }
-      if (stats.__isset.max && CanUseDeprecatedStats(col_type, col_order)) {
+      if (stats.__isset.max && CanUseDeprecatedStats()) {
         stat_value = &stats.max;
       }
       break;
@@ -60,7 +58,7 @@ bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk,
   }
   if (stat_value == nullptr) return false;
 
-  switch (col_type.type) {
+  switch (col_type_.type) {
     case TYPE_BOOLEAN:
       return ColumnStats<bool>::DecodePlainValue(*stat_value, slot,
           parquet::Type::BOOLEAN);
@@ -89,55 +87,76 @@ bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk,
       return true;
     }
     case TYPE_INT:
-      return ColumnStats<int32_t>::DecodePlainValue(*stat_value, slot,
-          col_chunk.meta_data.type);
+      return ColumnStats<int32_t>::DecodePlainValue(*stat_value, slot, element_.type);
     case TYPE_BIGINT:
-      return ColumnStats<int64_t>::DecodePlainValue(*stat_value, slot,
-          col_chunk.meta_data.type);
+      return ColumnStats<int64_t>::DecodePlainValue(*stat_value, slot, element_.type);
     case TYPE_FLOAT:
       // IMPALA-6527, IMPALA-6538: ignore min/max stats if NaN
-      return ColumnStats<float>::DecodePlainValue(*stat_value, slot,
-          col_chunk.meta_data.type) && !std::isnan(*reinterpret_cast<float*>(slot));
+      return ColumnStats<float>::DecodePlainValue(*stat_value, slot, element_.type)
+          && !std::isnan(*reinterpret_cast<float*>(slot));
     case TYPE_DOUBLE:
       // IMPALA-6527, IMPALA-6538: ignore min/max stats if NaN
-      return ColumnStats<double>::DecodePlainValue(*stat_value, slot,
-          col_chunk.meta_data.type) && !std::isnan(*reinterpret_cast<double*>(slot));
+      return ColumnStats<double>::DecodePlainValue(*stat_value, slot, element_.type)
+          && !std::isnan(*reinterpret_cast<double*>(slot));
     case TYPE_TIMESTAMP:
-      return ColumnStats<TimestampValue>::DecodePlainValue(*stat_value, slot,
-          col_chunk.meta_data.type);
+      return DecodeTimestamp(*stat_value, stats_field,
+          static_cast<TimestampValue*>(slot));
     case TYPE_STRING:
     case TYPE_VARCHAR:
-      return ColumnStats<StringValue>::DecodePlainValue(*stat_value, slot,
-          col_chunk.meta_data.type);
+      return ColumnStats<StringValue>::DecodePlainValue(*stat_value, slot, element_.type);
     case TYPE_CHAR:
       /// We don't read statistics for CHAR columns, since CHAR support is broken in
       /// Impala (IMPALA-1652).
       return false;
     case TYPE_DECIMAL:
-      switch (col_type.GetByteSize()) {
+      switch (col_type_.GetByteSize()) {
         case 4:
           return ColumnStats<Decimal4Value>::DecodePlainValue(*stat_value, slot,
-              col_chunk.meta_data.type);
+              element_.type);
         case 8:
           return ColumnStats<Decimal8Value>::DecodePlainValue(*stat_value, slot,
-              col_chunk.meta_data.type);
+              element_.type);
         case 16:
           return ColumnStats<Decimal16Value>::DecodePlainValue(*stat_value, slot,
-              col_chunk.meta_data.type);
+              element_.type);
         }
-      DCHECK(false) << "Unknown decimal byte size: " << col_type.GetByteSize();
+      DCHECK(false) << "Unknown decimal byte size: " << col_type_.GetByteSize();
     default:
-      DCHECK(false) << col_type.DebugString();
+      DCHECK(false) << col_type_.DebugString();
   }
   return false;
 }
 
-bool ColumnStatsBase::ReadNullCountStat(const parquet::ColumnChunk& col_chunk,
-    int64_t* null_count) {
-  if (!(col_chunk.__isset.meta_data && col_chunk.meta_data.__isset.statistics)) {
+bool ColumnStatsReader::DecodeTimestamp(const std::string& stat_value,
+    ColumnStatsReader::StatsField stats_field, TimestampValue* slot) const {
+  bool stats_read = false;
+  if (element_.type == parquet::Type::INT96) {
+    stats_read =
+        ColumnStats<TimestampValue>::DecodePlainValue(stat_value, slot, element_.type);
+  } else if (element_.type == parquet::Type::INT64) {
+    int64_t tmp;
+    stats_read = ColumnStats<int64_t>::DecodePlainValue(stat_value, &tmp, element_.type);
+    if (stats_read) *slot = timestamp_decoder_.Int64ToTimestampValue(tmp);
+  } else {
+    DCHECK(false) << element_.name;
     return false;
   }
-  const parquet::Statistics& stats = col_chunk.meta_data.statistics;
+
+  if (stats_read && timestamp_decoder_.NeedsConversion()) {
+    if (stats_field == ColumnStatsReader::StatsField::MIN) {
+      timestamp_decoder_.ConvertMinStatToLocalTime(slot);
+    } else {
+      timestamp_decoder_.ConvertMaxStatToLocalTime(slot);
+    }
+  }
+  return stats_read && slot->HasDateAndTime();
+}
+
+bool ColumnStatsReader::ReadNullCountStat(int64_t* null_count) const {
+  if (!(col_chunk_.__isset.meta_data && col_chunk_.meta_data.__isset.statistics)) {
+    return false;
+  }
+  const parquet::Statistics& stats = col_chunk_.meta_data.statistics;
   if (stats.__isset.null_count) {
     *null_count = stats.null_count;
     return true;
@@ -153,24 +172,22 @@ Status ColumnStatsBase::CopyToBuffer(StringBuffer* buffer, StringValue* value) {
   return Status::OK();
 }
 
-bool ColumnStatsBase::CanUseStats(
-    const ColumnType& col_type, const parquet::ColumnOrder* col_order) {
+bool ColumnStatsReader::CanUseStats() const {
   // If column order is not set, only statistics for numeric types can be trusted.
-  if (col_order == nullptr) {
-    return col_type.IsBooleanType() || col_type.IsIntegerType()
-        || col_type.IsFloatingPointType();
+  if (col_order_ == nullptr) {
+    return col_type_.IsBooleanType() || col_type_.IsIntegerType()
+        || col_type_.IsFloatingPointType();
   }
   // Stats can be used if the column order is TypeDefinedOrder (see parquet.thrift).
-  return col_order->__isset.TYPE_ORDER;
+  return col_order_->__isset.TYPE_ORDER;
 }
 
-bool ColumnStatsBase::CanUseDeprecatedStats(
-    const ColumnType& col_type, const parquet::ColumnOrder* col_order) {
+bool ColumnStatsReader::CanUseDeprecatedStats() const {
   // If column order is set to something other than TypeDefinedOrder, we shall not use the
   // stats (see parquet.thrift).
-  if (col_order != nullptr && !col_order->__isset.TYPE_ORDER) return false;
-  return col_type.IsBooleanType() || col_type.IsIntegerType()
-      || col_type.IsFloatingPointType();
+  if (col_order_ != nullptr && !col_order_->__isset.TYPE_ORDER) return false;
+  return col_type_.IsBooleanType() || col_type_.IsIntegerType()
+      || col_type_.IsFloatingPointType();
 }
 
 }  // end ns impala

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/be/src/exec/parquet-column-stats.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.h b/be/src/exec/parquet-column-stats.h
index 6d2743a..fc880f9 100644
--- a/be/src/exec/parquet-column-stats.h
+++ b/be/src/exec/parquet-column-stats.h
@@ -21,6 +21,7 @@
 #include <string>
 #include <type_traits>
 
+#include "exec/parquet-common.h"
 #include "runtime/decimal-value.h"
 #include "runtime/string-buffer.h"
 #include "runtime/timestamp-value.h"
@@ -59,11 +60,6 @@ namespace impala {
 /// TODO: Populate null_count and distinct_count.
 class ColumnStatsBase {
  public:
-  /// Enum to select whether to read minimum or maximum statistics. Values do not
-  /// correspond to fields in parquet::Statistics, but instead select between retrieving
-  /// the minimum or maximum value.
-  enum class StatsField { MIN, MAX };
-
   /// min and max functions for types that are not floating point numbers
   template <typename T, typename Enable = void>
   struct MinMaxTrait {
@@ -93,20 +89,6 @@ class ColumnStatsBase {
   ColumnStatsBase() : has_min_max_values_(false), null_count_(0) {}
   virtual ~ColumnStatsBase() {}
 
-  /// Decodes the parquet::Statistics from 'col_chunk' and writes the value selected by
-  /// 'stats_field' into the buffer pointed to by 'slot', based on 'col_type'. Returns
-  /// true if reading statistics for columns of type 'col_type' is supported and decoding
-  /// was successful, false otherwise.
-  static bool ReadFromThrift(const parquet::ColumnChunk& col_chunk,
-      const ColumnType& col_type, const parquet::ColumnOrder* col_order,
-      StatsField stats_field, void* slot);
-
-  // Gets the null_count statistics from the given column chunk's metadata and returns
-  // it via an output parameter.
-  // Returns true if the null_count stats were read successfully, false otherwise.
-  static bool ReadNullCountStat(const parquet::ColumnChunk& col_chunk,
-      int64_t* null_count);
-
   /// Merges this statistics object with values from 'other'. If other has not been
   /// initialized, then this object will not be changed. It maintains internal state that
   /// tracks whether the min/max values are ordered.
@@ -166,20 +148,6 @@ class ColumnStatsBase {
   // If true, min/max values are descending.
   // See description of 'ascending_boundary_order_'.
   bool descending_boundary_order_ = true;
-
- private:
-  /// Returns true if we support reading statistics stored in the fields 'min_value' and
-  /// 'max_value' in parquet::Statistics for the type 'col_type' and the column order
-  /// 'col_order'. Otherwise, returns false. If 'col_order' is nullptr, only primitive
-  /// numeric types are supported.
-  static bool CanUseStats(
-      const ColumnType& col_type, const parquet::ColumnOrder* col_order);
-
-  /// Returns true if we consider statistics stored in the deprecated fields 'min' and
-  /// 'max' in parquet::Statistics to be correct for the type 'col_type' and the column
-  /// order 'col_order'. Otherwise, returns false.
-  static bool CanUseDeprecatedStats(
-      const ColumnType& col_type, const parquet::ColumnOrder* col_order);
 };
 
 /// This class contains behavior specific to our in-memory formats for different types.
@@ -231,18 +199,18 @@ class ColumnStats : public ColumnStatsBase {
   virtual int64_t BytesNeeded() const override;
   virtual void EncodeToThrift(parquet::Statistics* out) const override;
 
- protected:
-  /// Encodes a single value using parquet's plain encoding and stores it into the binary
-  /// string 'out'. String values are stored without additional encoding. 'bytes_needed'
-  /// must be positive.
-  static void EncodePlainValue(const T& v, int64_t bytes_needed, std::string* out);
-
   /// Decodes the plain encoded stats value from 'buffer' and writes the result into the
   /// buffer pointed to by 'slot'. Returns true if decoding was successful, false
   /// otherwise. For timestamps, an additional validation will be performed.
   static bool DecodePlainValue(const std::string& buffer, void* slot,
       parquet::Type::type parquet_type);
 
+ protected:
+  /// Encodes a single value using parquet's plain encoding and stores it into the binary
+  /// string 'out'. String values are stored without additional encoding. 'bytes_needed'
+  /// must be positive.
+  static void EncodePlainValue(const T& v, int64_t bytes_needed, std::string* out);
+
   /// Returns the number of bytes needed to encode value 'v'.
   int64_t BytesNeeded(const T& v) const;
 
@@ -271,5 +239,61 @@ class ColumnStats : public ColumnStatsBase {
   StringBuffer prev_page_max_buffer_;
 };
 
+/// Class that handles the decoding of Parquet stats (min/max/null_count) for a given
+/// column chunk.
+class ColumnStatsReader {
+public:
+  /// Enum to select whether to read minimum or maximum statistics. Values do not
+  /// correspond to fields in parquet::Statistics, but instead select between retrieving
+  /// the minimum or maximum value.
+  enum class StatsField { MIN, MAX };
+
+  ColumnStatsReader(const parquet::ColumnChunk& col_chunk,  const ColumnType& col_type,
+      const parquet::ColumnOrder* col_order, const parquet::SchemaElement& element)
+  : col_chunk_(col_chunk),
+    col_type_(col_type),
+    col_order_(col_order),
+    element_(element) {}
+
+  /// Sets extra information that is only needed for decoding TIMESTAMP stats.
+  void SetTimestampDecoder(ParquetTimestampDecoder timestamp_decoder) {
+    timestamp_decoder_ = timestamp_decoder;
+  }
+
+  /// Decodes the parquet::Statistics from 'col_chunk_' and writes the value selected by
+  /// 'stats_field' into the buffer pointed to by 'slot', based on 'col_type_'. Returns
+  /// true if reading statistics for columns of type 'col_type_' is supported and decoding
+  /// was successful, false otherwise.
+  bool ReadFromThrift(StatsField stats_field, void* slot) const;
+
+  // Gets the null_count statistics from the column chunk's metadata and returns
+  // it via an output parameter.
+  // Returns true if the null_count stats were read successfully, false otherwise.
+  bool ReadNullCountStat(int64_t* null_count) const;
+
+private:
+  /// Returns true if we support reading statistics stored in the fields 'min_value' and
+  /// 'max_value' in parquet::Statistics for the type 'col_type_' and the column order
+  /// 'col_order_'. Otherwise, returns false. If 'col_order_' is nullptr, only primitive
+  /// numeric types are supported.
+  bool CanUseStats() const;
+
+  /// Returns true if we consider statistics stored in the deprecated fields 'min' and
+  /// 'max' in parquet::Statistics to be correct for the type 'col_type_' and the column
+  /// order 'col_order_'. Otherwise, returns false.
+  bool CanUseDeprecatedStats() const;
+
+  /// Decodes 'stat_value' and does INT64->TimestampValue and timezone conversions if
+  /// necessary. Returns true if the decoding and conversions were successful.
+  bool DecodeTimestamp(const std::string& stat_value,
+      ColumnStatsReader::StatsField stats_field,
+      TimestampValue* slot) const;
+
+  const parquet::ColumnChunk& col_chunk_;
+  const ColumnType& col_type_;
+  const parquet::ColumnOrder* col_order_;
+  const parquet::SchemaElement& element_;
+  ParquetTimestampDecoder timestamp_decoder_;
+};
 } // end ns impala
 #endif

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/be/src/exec/parquet-column-stats.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.inline.h b/be/src/exec/parquet-column-stats.inline.h
index 094fadd..6e78b82 100644
--- a/be/src/exec/parquet-column-stats.inline.h
+++ b/be/src/exec/parquet-column-stats.inline.h
@@ -148,14 +148,19 @@ inline bool ColumnStats<TimestampValue>::DecodePlainValue(
   TimestampValue* result = reinterpret_cast<TimestampValue*>(slot);
   int size = buffer.size();
   const uint8_t* data = reinterpret_cast<const uint8_t*>(buffer.data());
-  if (ParquetPlainEncoder::Decode<TimestampValue, parquet::Type::INT96>(data, data + size,
-      size, result) == -1) {
+  if (parquet_type == parquet::Type::INT96) {
+    if (ParquetPlainEncoder::Decode<TimestampValue, parquet::Type::INT96>(data,
+        data + size, size, result) == -1) {
+      return false;
+    }
+  } else {
+    DCHECK(false);
     return false;
   }
-  // We don't need to convert the value here, since we don't support reading timestamp
-  // statistics written by Hive / old versions of parquet-mr. Should Hive add support for
-  // writing new statistics for the deprecated timestamp type, we will have to add support
-  // for conversion here.
+
+  // We don't need to convert the value here, because it is done by the caller.
+  // If this function were not static, then it would be possible to store the information
+  // needed for timezone conversion in the object and do the conversion here.
   return TimestampValue::IsValidDate(result->date());
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/be/src/exec/parquet-common.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-common.cc b/be/src/exec/parquet-common.cc
index 651e7fd..f967f9f 100644
--- a/be/src/exec/parquet-common.cc
+++ b/be/src/exec/parquet-common.cc
@@ -88,4 +88,45 @@ parquet::CompressionCodec::type ConvertImpalaToParquetCodec(
   DCHECK_LT(codec, IMPALA_TO_PARQUET_CODEC_SIZE);
   return IMPALA_TO_PARQUET_CODEC[codec];
 }
+
+ParquetTimestampDecoder::ParquetTimestampDecoder(const parquet::SchemaElement& e,
+    const Timezone* timezone, bool convert_int96_timestamps) {
+  bool needs_conversion = false;
+  if (e.__isset.logicalType) {
+    DCHECK(e.logicalType.__isset.TIMESTAMP);
+    needs_conversion = e.logicalType.TIMESTAMP.isAdjustedToUTC;
+    precision_ = e.logicalType.TIMESTAMP.unit.__isset.MILLIS
+        ? ParquetTimestampDecoder::MILLI : ParquetTimestampDecoder::MICRO;
+  } else {
+    if (e.__isset.converted_type) {
+      // Timestamp with converted type but without logical type are/were never written
+      // by Impala, so it is assumed that the writer is Parquet-mr and that timezone
+      // conversion is needed.
+      needs_conversion = true;
+      precision_ = e.converted_type == parquet::ConvertedType::TIMESTAMP_MILLIS
+          ? ParquetTimestampDecoder::MILLI : ParquetTimestampDecoder::MICRO;
+    } else {
+      // INT96 timestamps needs conversion depending on the writer.
+      needs_conversion = convert_int96_timestamps;
+      precision_ = ParquetTimestampDecoder::NANO;
+    }
+  }
+  if (needs_conversion) timezone_ = timezone;
+}
+
+void ParquetTimestampDecoder::ConvertMinStatToLocalTime(TimestampValue* v) const {
+  DCHECK(timezone_ != nullptr);
+  if (!v->HasDateAndTime()) return;
+  TimestampValue repeated_period_start;
+  v->UtcToLocal(*timezone_, &repeated_period_start);
+  if (repeated_period_start.HasDateAndTime()) *v = repeated_period_start;
+}
+
+void ParquetTimestampDecoder::ConvertMaxStatToLocalTime(TimestampValue* v) const {
+  DCHECK(timezone_ != nullptr);
+  if (!v->HasDateAndTime()) return;
+  TimestampValue repeated_period_end;
+  v->UtcToLocal(*timezone_, nullptr, &repeated_period_end);
+  if (repeated_period_end.HasDateAndTime()) *v = repeated_period_end;
+}
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/be/src/exec/parquet-common.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-common.h b/be/src/exec/parquet-common.h
index 24aafae..a42ac35 100644
--- a/be/src/exec/parquet-common.h
+++ b/be/src/exec/parquet-common.h
@@ -24,14 +24,13 @@
 #include "gen-cpp/parquet_types.h"
 #include "runtime/decimal-value.h"
 #include "runtime/string-value.h"
+#include "runtime/timestamp-value.inline.h"
 #include "util/bit-util.h"
 #include "util/decimal-util.h"
 
 /// This file contains common elements between the parquet Writer and Scanner.
 namespace impala {
 
-class TimestampValue;
-
 const uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
 const uint32_t PARQUET_CURRENT_VERSION = 1;
 
@@ -177,6 +176,7 @@ class ParquetPlainEncoder {
             parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer_end, fixed_len_size, v);
       default:
         DCHECK(false) << "Unexpected physical type";
+        return -1;
     }
   }
 
@@ -428,5 +428,83 @@ inline int ParquetPlainEncoder::Decode<Decimal16Value, parquet::Type::BYTE_ARRAY
   return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v);
 }
 
+/// Helper class that contains the parameters needed for Timestamp decoding.
+/// Can be safely passed by value.
+class ParquetTimestampDecoder {
+public:
+  ParquetTimestampDecoder() {}
+
+  ParquetTimestampDecoder( const parquet::SchemaElement& e, const Timezone* timezone,
+      bool convert_int96_timestamps);
+
+  bool NeedsConversion() const { return timezone_ != nullptr; }
+
+  /// Decodes next PARQUET_TYPE from 'buffer', reading up to the byte before 'buffer_end'
+  /// and converts it TimestampValue. 'buffer' need not be aligned.
+  template <parquet::Type::type PARQUET_TYPE>
+  int Decode(const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* v) const;
+
+  TimestampValue Int64ToTimestampValue(int64_t unix_time) const {
+    DCHECK(precision_ == MILLI || precision_ == MICRO);
+    return precision_ == MILLI
+        ? TimestampValue::UtcFromUnixTimeMillis(unix_time)
+        : TimestampValue::UtcFromUnixTimeMicros(unix_time);
+  }
+
+  void ConvertToLocalTime(TimestampValue* v) const {
+    DCHECK(timezone_ != nullptr);
+    if (v->HasDateAndTime()) v->UtcToLocal(*timezone_);
+  }
+
+  /// Timezone conversion of min/max stats need some extra logic because UTC->local
+  /// conversion can change ordering near timezone rule changes. The max value is
+  /// increased and min value is decreased to avoid incorrectly dropping column chunks
+  /// (or pages once IMPALA-5843 is ready).
+
+  /// If timestamp t >= v before conversion, then this function converts v in such a
+  /// way that the same will be true after t is converted.
+  void ConvertMinStatToLocalTime(TimestampValue* v) const;
+
+  /// If timestamp t <= v before conversion, then this function converts v in such a
+  /// way that the same will be true after t is converted.
+  void ConvertMaxStatToLocalTime(TimestampValue* v) const;
+
+private:
+  enum Precision { MILLI, MICRO, NANO };
+
+  /// Timezone used for UTC->Local conversions. If nullptr, no conversion is needed.
+  const Timezone* timezone_ = nullptr;
+
+  /// Unit of the encoded timestamp. Used to decide between milli and microseconds during
+  /// INT64 decoding. INT64 with nanosecond precision (and reduced range) is also planned
+  /// to be implemented once it is added in Parquet (PARQUET-1387).
+  Precision precision_ = NANO;
+};
+
+template <>
+inline int ParquetTimestampDecoder::Decode<parquet::Type::INT64>(
+    const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* v) const {
+  DCHECK(precision_ == MILLI || precision_ == MICRO);
+  int64_t unix_time;
+  int bytes_read = ParquetPlainEncoder::Decode<int64_t, parquet::Type::INT64>(
+      buffer, buffer_end, 0, &unix_time);
+  if (UNLIKELY(bytes_read < 0)) {
+    return bytes_read;
+  }
+  *v = Int64ToTimestampValue(unix_time);
+  // TODO: It would be more efficient to do the timezone conversion in the same step
+  //       as the int64_t -> TimestampValue conversion. This would be also needed to
+  //       move conversion/validation to dictionary construction (IMPALA-4994) and to
+  //       implement dictionary filtering for TimestampValues.
+  return bytes_read;
+}
+
+template <>
+inline int ParquetTimestampDecoder::Decode<parquet::Type::INT96>(
+    const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* v) const {
+  DCHECK_EQ(precision_, NANO);
+  return ParquetPlainEncoder::Decode<TimestampValue, parquet::Type::INT96>(
+      buffer, buffer_end, 0, v);
+}
 }
 #endif

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/be/src/exec/parquet-metadata-utils.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.cc b/be/src/exec/parquet-metadata-utils.cc
index 26dea5f..de3a2c5 100644
--- a/be/src/exec/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet-metadata-utils.cc
@@ -64,16 +64,34 @@ const map<PrimitiveType, set<parquet::Type::type>> SUPPORTED_PHYSICAL_TYPES = {
     {PrimitiveType::TYPE_VARCHAR, {parquet::Type::BYTE_ARRAY}},
 };
 
+/// Physical types that are only supported with specific converted types.
+const map<PrimitiveType, set<pair<parquet::Type::type, parquet::ConvertedType::type>>>
+    SUPPORTED_CONVERTED_TYPES = {
+    {PrimitiveType::TYPE_TIMESTAMP,
+        {{parquet::Type::INT64, parquet::ConvertedType::TIMESTAMP_MICROS},
+         {parquet::Type::INT64, parquet::ConvertedType::TIMESTAMP_MILLIS}}}};
+};
+
 /// Returns true if 'parquet_type' is a supported physical encoding for the Impala
-/// primitive type, false otherwise.
-bool IsSupportedPhysicalType(PrimitiveType impala_type,
-    parquet::Type::type parquet_type) {
+/// primitive type, false otherwise. Some physical types are accepted only for certain
+/// converted types.
+bool IsSupportedType(PrimitiveType impala_type,
+    const parquet::SchemaElement& element) {
   auto encodings = SUPPORTED_PHYSICAL_TYPES.find(impala_type);
   DCHECK(encodings != SUPPORTED_PHYSICAL_TYPES.end());
-  return encodings->second.find(parquet_type) != encodings->second.end();
-}
+  parquet::Type::type parquet_type = element.type;
+  if (encodings->second.find(parquet_type) != encodings->second.end()) return true;
 
+  if(!element.__isset.converted_type) return false;
+  parquet::ConvertedType::type converted_type = element.converted_type;
+  auto converted_types = SUPPORTED_CONVERTED_TYPES.find(impala_type);
+  if (converted_types == SUPPORTED_CONVERTED_TYPES.end()) return false;
+  if (converted_types->second.find({parquet_type, converted_type})
+      != converted_types->second.end()) return true;
+
+  return false;
 }
+
 // Needs to be in sync with the order of enum values declared in TParquetArrayResolution.
 const std::vector<ParquetSchemaResolver::ArrayEncoding>
     ParquetSchemaResolver::ORDERED_ARRAY_ENCODINGS[] =
@@ -181,7 +199,7 @@ Status ParquetMetadataUtils::ValidateColumn(const char* filename,
   // Following validation logic is only for non-complex types.
   if (slot_desc->type().IsComplexType()) return Status::OK();
 
-  if (UNLIKELY(!IsSupportedPhysicalType(slot_desc->type().type, schema_element.type))) {
+  if (UNLIKELY(!IsSupportedType(slot_desc->type().type, schema_element))) {
     return Status(Substitute("Unsupported Parquet type in file '$0' metadata. Logical "
         "type: $1, physical type: $2. File may be corrupt.",
         filename, slot_desc->type().type, schema_element.type));
@@ -703,7 +721,7 @@ Status ParquetSchemaResolver::ValidateScalarNode(const SchemaNode& node,
         PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString());
     return Status::Expected(msg);
   }
-  if (!IsSupportedPhysicalType(col_type.type, node.element->type)) {
+  if (!IsSupportedType(col_type.type, *node.element)) {
     ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
         PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString());
     return Status::Expected(msg);

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/be/src/exprs/timezone_db.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/timezone_db.h b/be/src/exprs/timezone_db.h
index 1a39344..f30a7b1 100644
--- a/be/src/exprs/timezone_db.h
+++ b/be/src/exprs/timezone_db.h
@@ -75,6 +75,12 @@ class TimezoneDatabase {
 
   static const Timezone& GetUtcTimezone() { return UTC_TIMEZONE_; }
 
+  /// Public proxy for LoadZoneInfo. Should be only used in BE tests.
+  static Status LoadZoneInfoBeTestOnly(
+      const std::string& zone_info_dir) WARN_UNUSED_RESULT {
+    return LoadZoneInfo(zone_info_dir);
+  }
+
  private:
   // For BE tests
   friend class TimezoneDbNamesTest;

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/be/src/runtime/timestamp-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/timestamp-test.cc b/be/src/runtime/timestamp-test.cc
index 4e5b4b9..6389402 100644
--- a/be/src/runtime/timestamp-test.cc
+++ b/be/src/runtime/timestamp-test.cc
@@ -896,6 +896,72 @@ TEST(TimestampTest, SubSecond) {
       TimestampValue::FromUnixTimeNanos(0, -1, tz).ToString());
 }
 
+// Convenience function to create TimestampValues from strings.
+TimestampValue StrToTs(const char * str) {
+  return TimestampValue::Parse(str);
+}
+
+TEST(TimestampTest, TimezoneConversions) {
+  const string& path = Substitute("$0/testdata/tzdb_tiny", getenv("IMPALA_HOME"));
+  Status status = TimezoneDatabase::LoadZoneInfoBeTestOnly(path);
+  ASSERT_TRUE(status.ok());
+
+  const Timezone* tz = TimezoneDatabase::FindTimezone("CET");
+  ASSERT_NE(tz, nullptr);
+
+  // Timestamp that does not fall to DST change.
+  const TimestampValue unique_utc = StrToTs("2017-01-01 00:00:00");
+  const TimestampValue unique_local = StrToTs("2017-01-01 01:00:00");
+  {
+    // UtcToLocal / LocalToUtc changes the TimestampValue, so an extra copy is needed
+    // to avoid changing the original.
+    TimestampValue tmp = unique_utc;
+    TimestampValue repeated_period_start, repeated_period_end;
+    tmp.UtcToLocal(*tz, &repeated_period_start, &repeated_period_end);
+    EXPECT_EQ(tmp, unique_local);
+    EXPECT_FALSE(repeated_period_start.HasDate());
+    EXPECT_FALSE(repeated_period_end.HasDate());
+    tmp.LocalToUtc(*tz);
+    EXPECT_EQ(tmp, unique_utc);
+  }
+
+  // Timestamps that fall to UTC+2->UTC+1 DST change:
+  // - Up to 2017-10-29 02:59:59.999999999 AM offset was UTC+02:00.
+  // - At 2017-10-29 03:00:00 AM clocks were moved backward to 2017-10-29 02:00:00 AM,
+  //   so offset became UTC+01:00.
+  const TimestampValue repeated_utc1 = StrToTs("2017-10-29 00:30:00");
+  const TimestampValue repeated_utc2 = StrToTs("2017-10-29 01:30:00");
+  const TimestampValue repeated_local = StrToTs("2017-10-29 02:30:00");
+  {
+    TimestampValue tmp = repeated_utc1;
+    TimestampValue repeated_period_start1, repeated_period_end1;
+    TimestampValue repeated_period_start2, repeated_period_end2;
+    tmp.UtcToLocal(*tz, &repeated_period_start1, &repeated_period_end1);
+    EXPECT_EQ(tmp, repeated_local);
+    tmp = repeated_utc2;
+    tmp.UtcToLocal(*tz, &repeated_period_start2, &repeated_period_end2);
+    EXPECT_EQ(tmp, repeated_local);
+    tmp.LocalToUtc(*tz);
+    EXPECT_FALSE(tmp.HasDate());
+
+    EXPECT_EQ(repeated_period_start1, repeated_period_start2);
+    EXPECT_EQ(repeated_period_end1, repeated_period_end2);
+    EXPECT_EQ(repeated_period_start1.ToString(), "2017-10-29 02:00:00");
+    EXPECT_EQ(repeated_period_end1.ToString(), "2017-10-29 02:59:59.999999999");
+
+  }
+
+  // Timestamp that falls to UTC+1->UTC+2 DST change:
+  // - Up to 2017-03-26 01:59:59.999999999 AM offset was UTC+01:00.
+  // - At 2017-03-26 02:00:00 AM Clocks were moved forward to 2017-03-26 03:00:00 AM,
+  //   so offset became UTC+02:00.
+  const TimestampValue skipped_local = StrToTs("2017-03-26 02:30:00");
+  {
+    TimestampValue tmp = skipped_local;
+    tmp.LocalToUtc(*tz);
+    EXPECT_FALSE(tmp.HasDate());
+  }
+}
 }
 
 IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/be/src/runtime/timestamp-value.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/timestamp-value.cc b/be/src/runtime/timestamp-value.cc
index 14c57b9..74fa6ba 100644
--- a/be/src/runtime/timestamp-value.cc
+++ b/be/src/runtime/timestamp-value.cc
@@ -101,25 +101,49 @@ inline bool IsDateOutOfRange(const cctz::civil_second& cs) {
   return cs.year() < MIN_YEAR || cs.year() > MAX_YEAR;
 }
 
+TimestampValue CivilSecondsToTimestampValue(const cctz::civil_second& cs, int64_t nanos) {
+  // boost::gregorian::date() throws boost::gregorian::bad_year if year is not in the
+  // 1400..9999 range. Need to check validity before creating the date object.
+  if (UNLIKELY(IsDateOutOfRange(cs))) {
+    return TimestampValue();
+  } else {
+    return TimestampValue(
+        date(cs.year(), cs.month(), cs.day()),
+        time_duration(cs.hour(), cs.minute(), cs.second(), nanos));
+  }
+}
+
 }
 
-void TimestampValue::UtcToLocal(const Timezone& local_tz) {
+void TimestampValue::UtcToLocal(const Timezone& local_tz,
+    TimestampValue* start_of_repeated_period, TimestampValue* end_of_repeated_period) {
   DCHECK(HasDateAndTime());
   time_t unix_time;
   if (UNLIKELY(!UtcToUnixTime(&unix_time))) {
     SetToInvalidDateTime();
-  } else {
-    cctz::time_point<cctz::sys_seconds> from_tp = UnixTimeToTimePoint(unix_time);
-    cctz::civil_second to_cs = cctz::convert(from_tp, local_tz);
-    // boost::gregorian::date() throws boost::gregorian::bad_year if year is not in the
-    // 1400..9999 range. Need to check validity before creating the date object.
-    if (UNLIKELY(IsDateOutOfRange(to_cs))) {
-      SetToInvalidDateTime();
-    } else {
-      date_ = boost::gregorian::date(to_cs.year(), to_cs.month(), to_cs.day());
-      // Time-zone conversion rules don't affect fractional seconds, leave them intact.
-      time_ = boost::posix_time::time_duration(to_cs.hour(), to_cs.minute(),
-          to_cs.second(), time_.fractional_seconds());
+    return;
+  }
+
+  cctz::time_point<cctz::sys_seconds> from_tp = UnixTimeToTimePoint(unix_time);
+  cctz::civil_second to_cs = cctz::convert(from_tp, local_tz);
+
+  *this = CivilSecondsToTimestampValue(to_cs, time_.fractional_seconds());
+
+  if (start_of_repeated_period == nullptr && end_of_repeated_period == nullptr) return;
+  // Do the reverse conversion if repeated period boundaries are needed.
+  const cctz::time_zone::civil_lookup from_cl = local_tz.lookup(to_cs);
+  if (UNLIKELY(from_cl.kind == cctz::time_zone::civil_lookup::REPEATED)) {
+    if (start_of_repeated_period != nullptr) {
+      // Start of the period is simply the transition time converted to local time.
+      to_cs = cctz::convert(from_cl.trans, local_tz);
+      *start_of_repeated_period = CivilSecondsToTimestampValue(to_cs, 0);
+    }
+    if (end_of_repeated_period != nullptr) {
+      // End of the period is last nanosecond before transition time converted to
+      // local time.
+      to_cs = cctz::convert(from_cl.trans - std::chrono::seconds(1), local_tz);
+      *end_of_repeated_period =
+          CivilSecondsToTimestampValue(to_cs, NANOS_PER_SEC - 1);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/be/src/runtime/timestamp-value.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/timestamp-value.h b/be/src/runtime/timestamp-value.h
index 6d87d9b..a73adcd 100644
--- a/be/src/runtime/timestamp-value.h
+++ b/be/src/runtime/timestamp-value.h
@@ -229,7 +229,19 @@ class TimestampValue {
 
   /// Converts from UTC to 'local_tz' time zone in-place. The caller must ensure the
   /// TimestampValue this function is called upon has both a valid date and time.
-  void UtcToLocal(const Timezone& local_tz);
+  ///
+  /// If start/end_of_repeated_period is not nullptr and timestamp falls into an interval
+  /// where LocalToUtc() is ambiguous (e.g. Summer->Winter DST change on Northern
+  /// hemisphere), then these arguments are set to the start/end of this period in local
+  /// time. This is useful to get some ordering guarantees in the case when the order of
+  /// two timestamps is different in UTC and local time (e.g CET Autumn dst change
+  /// 00:30:00 -> 02:30:00 vs 01:15:00 -> 02:15:00) - any timestamp that is earlier than
+  /// 'this' in UTC is guaranteed to be earlier than 'end_of_repeated_period' in local
+  /// time, and any timestamp later than 'this' in UTC is guaranteed to be later than
+  /// 'start_of_repeated_period' in local time.
+  void UtcToLocal(const Timezone& local_tz,
+      TimestampValue* start_of_repeated_period = nullptr,
+      TimestampValue* end_of_repeated_period = nullptr);
 
   /// Converts from 'local_tz' to UTC time zone in-place. The caller must ensure the
   /// TimestampValue this function is called upon has both a valid date and time.

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index a76d9b5..8e0f5df 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -334,6 +334,11 @@ class DictDecoder : public DictDecoderBase {
   template<parquet::Type::type PARQUET_TYPE>
   bool Reset(uint8_t* dict_buffer, int dict_len, int fixed_len_size) WARN_UNUSED_RESULT;
 
+  /// Should be only called for Timestamp columns.
+  void SetTimestampHelper(ParquetTimestampDecoder timestamp_decoder) {
+    timestamp_decoder_ = timestamp_decoder;
+  }
+
   virtual int num_entries() const { return dict_.size(); }
 
   virtual void GetValue(int index, void* buffer) {
@@ -359,6 +364,9 @@ class DictDecoder : public DictDecoderBase {
   /// List of decoded values stored in the dict_
   std::vector<T> dict_;
 
+  /// Contains extra data needed for Timestamp decoding.
+  ParquetTimestampDecoder timestamp_decoder_;
+
   /// Decoded values, buffered to allow caller to consume one-by-one. If in the middle of
   /// a repeated run, the first element is the current dict value. If in a literal run,
   /// this contains 'num_literal_values_' values, with the next value to be returned at
@@ -368,6 +376,15 @@ class DictDecoder : public DictDecoderBase {
   /// Slow path for GetNextValue() where we need to decode new values. Should not be
   /// inlined everywhere.
   bool DecodeNextValue(T* value);
+
+  /// Specialized for Timestamp columns, simple proxy to ParquetPlainEncoder::Decode
+  /// for other types.
+  template<parquet::Type::type PARQUET_TYPE>
+  int Decode(const uint8_t* buffer, const uint8_t* buffer_end,
+      int fixed_len_size, T* v) {
+    return  ParquetPlainEncoder::Decode<T, PARQUET_TYPE>(buffer, buffer_end,
+        fixed_len_size,  v);
+  }
 };
 
 template<typename T>
@@ -498,6 +515,13 @@ inline int DictEncoderBase::WriteData(uint8_t* buffer, int buffer_len) {
   return 1 + encoder.len();
 }
 
+template <>
+template<parquet::Type::type PARQUET_TYPE>
+inline int DictDecoder<TimestampValue>::Decode(const uint8_t* buffer,
+    const uint8_t* buffer_end, int fixed_len_size, TimestampValue* v) {
+  return timestamp_decoder_.Decode<PARQUET_TYPE>(buffer, buffer_end, v);
+}
+
 template<typename T>
 template<parquet::Type::type PARQUET_TYPE>
 inline bool DictDecoder<T>::Reset(uint8_t* dict_buffer, int dict_len,
@@ -507,7 +531,7 @@ inline bool DictDecoder<T>::Reset(uint8_t* dict_buffer, int dict_len,
   uint8_t* end = dict_buffer + dict_len;
   while (dict_buffer < end) {
     T value;
-    int decoded_len = ParquetPlainEncoder::Decode<T, PARQUET_TYPE>(dict_buffer, end,
+    int decoded_len = Decode<PARQUET_TYPE>(dict_buffer, end,
         fixed_len_size, &value);
     if (UNLIKELY(decoded_len < 0)) return false;
     dict_buffer += decoded_len;

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/common/thrift/parquet.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/parquet.thrift b/common/thrift/parquet.thrift
index 3666a43..3b15cfe 100644
--- a/common/thrift/parquet.thrift
+++ b/common/thrift/parquet.thrift
@@ -28,23 +28,12 @@ namespace java org.apache.parquet.format
  * with the encodings to control the on disk storage format.
  * For example INT16 is not included as a type since a good encoding of INT32
  * would handle this.
- *
- * When a logical type is not present, the type-defined sort order of these
- * physical types are:
- * * BOOLEAN - false, true
- * * INT32 - signed comparison
- * * INT64 - signed comparison
- * * INT96 - signed comparison
- * * FLOAT - signed comparison
- * * DOUBLE - signed comparison
- * * BYTE_ARRAY - unsigned byte-wise comparison
- * * FIXED_LEN_BYTE_ARRAY - unsigned byte-wise comparison
  */
 enum Type {
   BOOLEAN = 0;
   INT32 = 1;
   INT64 = 2;
-  INT96 = 3;
+  INT96 = 3;  // deprecated, only used by legacy implementations.
   FLOAT = 4;
   DOUBLE = 5;
   BYTE_ARRAY = 6;
@@ -212,12 +201,12 @@ struct Statistics {
     * Values are encoded using PLAIN encoding, except that variable-length byte
     * arrays do not include a length prefix.
     *
-    * These fields encode min and max values determined by SIGNED comparison
+    * These fields encode min and max values determined by signed comparison
     * only. New files should use the correct order for a column's logical type
     * and store the values in the min_value and max_value fields.
     *
     * To support older readers, these may be set when the column order is
-    * SIGNED.
+    * signed.
     */
    1: optional binary max;
    2: optional binary min;
@@ -235,6 +224,116 @@ struct Statistics {
    6: optional binary min_value;
 }
 
+/** Empty structs to use as logical type annotations */
+struct StringType {}  // allowed for BINARY, must be encoded with UTF-8
+struct UUIDType {}    // allowed for FIXED[16], must encoded raw UUID bytes
+struct MapType {}     // see LogicalTypes.md
+struct ListType {}    // see LogicalTypes.md
+struct EnumType {}    // allowed for BINARY, must be encoded with UTF-8
+struct DateType {}    // allowed for INT32
+
+/**
+ * Logical type to annotate a column that is always null.
+ *
+ * Sometimes when discovering the schema of existing data, values are always
+ * null and the physical type can't be determined. This annotation signals
+ * the case where the physical type was guessed from all null values.
+ */
+struct NullType {}    // allowed for any physical type, only null values stored
+
+/**
+ * Decimal logical type annotation
+ *
+ * To maintain forward-compatibility in v1, implementations using this logical
+ * type must also set scale and precision on the annotated SchemaElement.
+ *
+ * Allowed for physical types: INT32, INT64, FIXED, and BINARY
+ */
+struct DecimalType {
+  1: required i32 scale
+  2: required i32 precision
+}
+
+/** Time units for logical types */
+struct MilliSeconds {}
+struct MicroSeconds {}
+union TimeUnit {
+  1: MilliSeconds MILLIS
+  2: MicroSeconds MICROS
+}
+
+/**
+ * Timestamp logical type annotation
+ *
+ * Allowed for physical types: INT64
+ */
+struct TimestampType {
+  1: required bool isAdjustedToUTC
+  2: required TimeUnit unit
+}
+
+/**
+ * Time logical type annotation
+ *
+ * Allowed for physical types: INT32 (millis), INT64 (micros)
+ */
+struct TimeType {
+  1: required bool isAdjustedToUTC
+  2: required TimeUnit unit
+}
+
+/**
+ * Integer logical type annotation
+ *
+ * bitWidth must be 8, 16, 32, or 64.
+ *
+ * Allowed for physical types: INT32, INT64
+ */
+struct IntType {
+  1: required byte bitWidth
+  2: required bool isSigned
+}
+
+/**
+ * Embedded JSON logical type annotation
+ *
+ * Allowed for physical types: BINARY
+ */
+struct JsonType {
+}
+
+/**
+ * Embedded BSON logical type annotation
+ *
+ * Allowed for physical types: BINARY
+ */
+struct BsonType {
+}
+
+/**
+ * LogicalType annotations to replace ConvertedType.
+ *
+ * To maintain compatibility, implementations using LogicalType for a
+ * SchemaElement must also set the corresponding ConvertedType from the
+ * following table.
+ */
+union LogicalType {
+  1:  StringType STRING       // use ConvertedType UTF8
+  2:  MapType MAP             // use ConvertedType MAP
+  3:  ListType LIST           // use ConvertedType LIST
+  4:  EnumType ENUM           // use ConvertedType ENUM
+  5:  DecimalType DECIMAL     // use ConvertedType DECIMAL
+  6:  DateType DATE           // use ConvertedType DATE
+  7:  TimeType TIME           // use ConvertedType TIME_MICROS or TIME_MILLIS
+  8:  TimestampType TIMESTAMP // use ConvertedType TIMESTAMP_MICROS or TIMESTAMP_MILLIS
+  // 9: reserved for INTERVAL
+  10: IntType INTEGER         // use ConvertedType INT_* or UINT_*
+  11: NullType UNKNOWN        // no compatible ConvertedType
+  12: JsonType JSON           // use ConvertedType JSON
+  13: BsonType BSON           // use ConvertedType BSON
+  14: UUIDType UUID
+}
+
 /**
  * Represents a element inside a schema definition.
  *  - if it is a group (inner node) then type is undefined and num_children is defined
@@ -282,6 +381,13 @@ struct SchemaElement {
    */
   9: optional i32 field_id;
 
+  /**
+   * The logical type of this SchemaElement
+   *
+   * LogicalType replaces ConvertedType, but ConvertedType is still required
+   * for some logical types to ensure forward-compatibility in format v1.
+   */
+  10: optional LogicalType logicalType
 }
 
 /**
@@ -314,7 +420,7 @@ enum Encoding {
    */
   PLAIN_DICTIONARY = 2;
 
-  /** Group packed run length encoding. Usable for definition/reptition levels
+  /** Group packed run length encoding. Usable for definition/repetition levels
    * encoding and Booleans (on one bit: 0 is false; 1 is true.)
    */
   RLE = 3;
@@ -346,13 +452,20 @@ enum Encoding {
 
 /**
  * Supported compression algorithms.
+ *
+ * Codecs added in 2.4 can be read by readers based on 2.4 and later.
+ * Codec support may vary between readers based on the format version and
+ * libraries available at runtime. Gzip, Snappy, and LZ4 codecs are
+ * widely available, while Zstd and Brotli require additional libraries.
  */
 enum CompressionCodec {
   UNCOMPRESSED = 0;
   SNAPPY = 1;
   GZIP = 2;
   LZO = 3;
-  BROTLI = 4;
+  BROTLI = 4; // Added in 2.4
+  LZ4 = 5;    // Added in 2.4
+  ZSTD = 6;   // Added in 2.4
 }
 
 enum PageType {
@@ -406,7 +519,7 @@ struct DictionaryPageHeader {
 }
 
 /**
- * New page format alowing reading levels without decompressing the data
+ * New page format allowing reading levels without decompressing the data
  * Repetition and definition levels are uncompressed
  * The remaining section containing the data is compressed if is_compressed is true
  **/
@@ -423,9 +536,9 @@ struct DataPageHeaderV2 {
 
   // repetition levels and definition levels are always using RLE (without size in it)
 
-  /** length of the repetition levels */
-  5: required i32 definition_levels_byte_length;
   /** length of the definition levels */
+  5: required i32 definition_levels_byte_length;
+  /** length of the repetition levels */
   6: required i32 repetition_levels_byte_length;
 
   /**  whether the values are compressed.
@@ -597,7 +710,9 @@ struct RowGroup {
 struct TypeDefinedOrder {}
 
 /**
- * Union to specify the order used for min, max, and sorting values in a column.
+ * Union to specify the order used for the min_value and max_value fields for a
+ * column. This union takes the role of an enhanced enum that allows rich
+ * elements (which will be needed for a collation-based ordering in the future).
  *
  * Possible values are:
  * * TypeDefinedOrder - the column uses the order defined by its logical or
@@ -607,6 +722,50 @@ struct TypeDefinedOrder {}
  * for this column should be ignored.
  */
 union ColumnOrder {
+
+  /**
+   * The sort orders for logical types are:
+   *   UTF8 - unsigned byte-wise comparison
+   *   INT8 - signed comparison
+   *   INT16 - signed comparison
+   *   INT32 - signed comparison
+   *   INT64 - signed comparison
+   *   UINT8 - unsigned comparison
+   *   UINT16 - unsigned comparison
+   *   UINT32 - unsigned comparison
+   *   UINT64 - unsigned comparison
+   *   DECIMAL - signed comparison of the represented value
+   *   DATE - signed comparison
+   *   TIME_MILLIS - signed comparison
+   *   TIME_MICROS - signed comparison
+   *   TIMESTAMP_MILLIS - signed comparison
+   *   TIMESTAMP_MICROS - signed comparison
+   *   INTERVAL - unsigned comparison
+   *   JSON - unsigned byte-wise comparison
+   *   BSON - unsigned byte-wise comparison
+   *   ENUM - unsigned byte-wise comparison
+   *   LIST - undefined
+   *   MAP - undefined
+   *
+   * In the absence of logical types, the sort order is determined by the physical type:
+   *   BOOLEAN - false, true
+   *   INT32 - signed comparison
+   *   INT64 - signed comparison
+   *   INT96 (only used for legacy timestamps) - undefined
+   *   FLOAT - signed comparison of the represented value (*)
+   *   DOUBLE - signed comparison of the represented value (*)
+   *   BYTE_ARRAY - unsigned byte-wise comparison
+   *   FIXED_LEN_BYTE_ARRAY - unsigned byte-wise comparison
+   *
+   * (*) Because the sorting order is not specified properly for floating
+   *     point values (relations vs. total ordering) the following
+   *     compatibility rules should be applied when reading statistics:
+   *     - If the min is a NaN, it should be ignored.
+   *     - If the max is a NaN, it should be ignored.
+   *     - If the min is +0, the row group may contain -0 values as well.
+   *     - If the max is -0, the row group may contain +0 values as well.
+   *     - When looking for NaN values, min and max should be ignored.
+   */
   1: TypeDefinedOrder TYPE_ORDER;
 }
 
@@ -704,11 +863,16 @@ struct FileMetaData {
   6: optional string created_by
 
   /**
-   * Sort order used for each column in this file.
+   * Sort order used for the min_value and max_value fields of each column in
+   * this file. Each sort order corresponds to one column, determined by its
+   * position in the list, matching the position of the column in the schema.
+   *
+   * Without column_orders, the meaning of the min_value and max_value fields is
+   * undefined. To ensure well-defined behaviour, if min_value and max_value are
+   * written to a Parquet file, column_orders must be written as well.
    *
-   * If this list is not present, then the order for each column is assumed to
-   * be Signed. In addition, min and max values for INTERVAL or DECIMAL stored
-   * as fixed or bytes should be ignored.
+   * The obsolete min and max fields are always sorted by signed comparison
+   * regardless of column_orders.
    */
   7: optional list<ColumnOrder> column_orders;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/fe/src/main/java/org/apache/impala/analysis/ParquetHelper.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ParquetHelper.java b/fe/src/main/java/org/apache/impala/analysis/ParquetHelper.java
index 8c9bff8..e3d5554 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ParquetHelper.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ParquetHelper.java
@@ -269,6 +269,13 @@ class ParquetHelper {
       return Type.STRING;
     }
 
+    if (prim.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT64 &&
+        (orig == OriginalType.TIMESTAMP_MILLIS || orig == OriginalType.TIMESTAMP_MICROS)){
+      // IMPALA-7723: nanosecond timestamps are still interpreted as BIGINT - as they have
+      // no converted type, this function will not be reached in their case.
+      return Type.TIMESTAMP;
+    }
+
     if (prim.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32
         || prim.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT64) {
       // Map signed integer types to an supported Impala column type

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/testdata/data/README
----------------------------------------------------------------------
diff --git a/testdata/data/README b/testdata/data/README
index db055c7..14b37ba 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -233,3 +233,29 @@ valid range [0..24H). Before the fix, select * returned these values:
 
 strings_with_quotes.csv:
 Various strings with quotes in them to reproduce bugs like IMPALA-7586.
+
+int64_timestamps_plain.parq:
+Parquet file generated with Parquet-mr that contains plain encoded int64 columns with
+Timestamp logical types. Has the following columns:
+new_logical_milli_utc, new_logical_milli_local,
+new_logical_micro_utc, new_logical_micro_local
+
+int64_timestamps_dict.parq:
+Parquet file generated with Parquet-mr that contains dictionary encoded int64 columns
+with Timestamp logical types. Has the following columns:
+id,
+new_logical_milli_utc, new_logical_milli_local,
+new_logical_micro_utc, new_logical_micro_local
+
+int64_timestamps_at_dst_changes.parquet:
+Parquet file generated with Parquet-mr that contains plain encoded int64 columns with
+Timestamp logical types. The file contains 3 row groups, and all row groups contain
+3 distinct values, so there is a "min", a "max", and a "middle" value. The values were
+selected in such a way that the UTC->CET conversion changes the order of the values (this
+is possible during Summer->Winter DST change) and "middle" falls outside the "min".."max"
+range after conversion. This means that a naive stat filtering implementation could drop
+"middle" incorrectly.
+Example (all dates are 2017-10-29):
+UTC: 00:45:00, 01:00:00, 01:10:00 =>
+CET: 02:45:00, 02:00:00, 02:10:00
+Columns: rawvalue bigint, rowgroup int, millisutc timsestamp, microsutc timestamp

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/testdata/data/int64_timestamps_at_dst_changes.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/int64_timestamps_at_dst_changes.parquet b/testdata/data/int64_timestamps_at_dst_changes.parquet
new file mode 100644
index 0000000..6945d25
Binary files /dev/null and b/testdata/data/int64_timestamps_at_dst_changes.parquet differ

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/testdata/data/int64_timestamps_dict.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/int64_timestamps_dict.parquet b/testdata/data/int64_timestamps_dict.parquet
new file mode 100644
index 0000000..a0fe7b3
Binary files /dev/null and b/testdata/data/int64_timestamps_dict.parquet differ

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/testdata/data/int64_timestamps_plain.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/int64_timestamps_plain.parquet b/testdata/data/int64_timestamps_plain.parquet
new file mode 100644
index 0000000..1a00226
Binary files /dev/null and b/testdata/data/int64_timestamps_plain.parquet differ

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/testdata/tzdb_tiny/CET
----------------------------------------------------------------------
diff --git a/testdata/tzdb_tiny/CET b/testdata/tzdb_tiny/CET
new file mode 100644
index 0000000..4c4f8ef
Binary files /dev/null and b/testdata/tzdb_tiny/CET differ

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/testdata/workloads/functional-query/queries/QueryTest/parquet-int64-timestamps.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-int64-timestamps.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-int64-timestamps.test
new file mode 100644
index 0000000..debe547
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-int64-timestamps.test
@@ -0,0 +1,89 @@
+====
+---- QUERY
+# Check that int64 timestamp columns can be read as TIMESTAMP.
+set timezone=CET;
+select * from int64_timestamps_plain;
+---- TYPES
+TIMESTAMP,TIMESTAMP,TIMESTAMP,TIMESTAMP
+---- RESULTS
+1986-06-26 02:00:00,1986-06-26 00:00:00,1986-06-26 02:00:00,1986-06-26 00:00:00
+1986-12-05 01:00:00,1986-12-05 00:00:00,1986-12-05 01:00:00,1986-12-05 00:00:00
+====
+---- QUERY
+# Check that int64 timestamp columns can be read as BIGINT.
+select * from int64_bigints_plain;
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+520128000000,520128000000,520128000000000,520128000000000
+534124800000,534124800000,534124800000000,534124800000000
+====
+---- QUERY
+# Check that the test tables are correctly filled with data.
+select count(*)
+from int64_timestamps_dict t join int64_bigints_dict b on t.id = b.id;
+---- TYPES
+BIGINT
+---- RESULTS
+72
+====
+---- QUERY
+# List rows where one of the int64->timestamp conversion is incorrect.
+set timezone=CET;
+select *
+from int64_timestamps_dict t join int64_bigints_dict b on t.id = b.id
+where t.new_logical_milli_local != milliseconds_add("1970-01-01", b.new_logical_milli_local)
+  or t.new_logical_milli_utc != from_utc_timestamp(milliseconds_add("1970-01-01", b.new_logical_milli_utc), "CET")
+  or t.new_logical_micro_local != microseconds_add("1970-01-01", b.new_logical_micro_local)
+  or t.new_logical_micro_utc != from_utc_timestamp(microseconds_add("1970-01-01", b.new_logical_micro_utc), "CET");
+---- TYPES
+INT,TIMESTAMP,TIMESTAMP,TIMESTAMP,TIMESTAMP,INT,BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+# Expect empty result.
+====
+---- QUERY
+# Test that the table is filled correctly.
+set timezone=UTC;
+select * from int64_timestamps_at_dst_changes;
+---- TYPES
+BIGINT,INT,TIMESTAMP,TIMESTAMP
+---- RESULTS
+42,1,1970-01-01 00:00:42,1970-01-01 00:00:42
+1509237900,1,2017-10-29 00:45:00,2017-10-29 00:45:00
+1509240600,1,2017-10-29 01:30:00,2017-10-29 01:30:00
+1509237000,2,2017-10-29 00:30:00,2017-10-29 00:30:00
+1509239700,2,2017-10-29 01:15:00,2017-10-29 01:15:00
+1512520200,2,2017-12-06 00:30:00,2017-12-06 00:30:00
+1509237900,3,2017-10-29 00:45:00,2017-10-29 00:45:00
+1509238800,3,2017-10-29 01:00:00,2017-10-29 01:00:00
+1509239400,3,2017-10-29 01:10:00,2017-10-29 01:10:00
+====
+---- QUERY
+# Test filtering on timestamps that fall on the DST change. Stat filtering may skip this
+# value if not implemented carefully.
+set timezone=CET;
+select count(*) from int64_timestamps_at_dst_changes
+where millisutc = "2017-10-29 02:45:00" and microsutc = "2017-10-29 02:45:00";
+---- TYPES
+BIGINT
+---- RESULTS
+2
+====
+---- QUERY
+set timezone=CET;
+select count(*) from int64_timestamps_at_dst_changes
+where millisutc = "2017-10-29 02:00:00";
+---- TYPES
+BIGINT
+---- RESULTS
+1
+====
+---- QUERY
+set timezone=CET;
+select count(*) from int64_timestamps_at_dst_changes
+where microsutc = "2017-10-29 02:10:00";
+---- TYPES
+BIGINT
+---- RESULTS
+1
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/60095a4c/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index f4ddad4..bcb200b 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -745,6 +745,54 @@ class TestParquet(ImpalaTestSuite):
     self.run_test_case("QueryTest/parquet-error-propagation-race", vector,
                        unique_database)
 
+  def test_int64_timestamps(self, vector, unique_database):
+    """IMPALA-5050: Test that Parquet columns with int64 physical type and
+       timestamp_millis/timestamp_micros logical type can be read both as
+       int64 and as timestamp.
+    """
+    # Tiny plain encoded parquet file.
+    TABLE_NAME = "int64_timestamps_plain"
+    create_table_from_parquet(self.client, unique_database, TABLE_NAME)
+
+    TABLE_NAME = "int64_bigints_plain"
+    CREATE_SQL = """CREATE TABLE {0}.{1} (
+                      new_logical_milli_utc BIGINT,
+                      new_logical_milli_local BIGINT,
+                      new_logical_micro_utc BIGINT,
+                      new_logical_micro_local BIGINT
+                     ) STORED AS PARQUET""".format(unique_database, TABLE_NAME)
+    create_table_and_copy_files(self.client, CREATE_SQL, unique_database, TABLE_NAME,
+        ["/testdata/data/int64_timestamps_plain.parquet"])
+
+    # Larger dictionary encoded parquet file.
+    TABLE_NAME = "int64_timestamps_dict"
+    CREATE_SQL = """CREATE TABLE {0}.{1} (
+                      id INT,
+                      new_logical_milli_utc TIMESTAMP,
+                      new_logical_milli_local TIMESTAMP,
+                      new_logical_micro_utc TIMESTAMP,
+                      new_logical_micro_local TIMESTAMP
+                     ) STORED AS PARQUET""".format(unique_database, TABLE_NAME)
+    create_table_and_copy_files(self.client, CREATE_SQL, unique_database, TABLE_NAME,
+        ["/testdata/data/{0}.parquet".format(TABLE_NAME)])
+
+    TABLE_NAME = "int64_bigints_dict"
+    CREATE_SQL = """CREATE TABLE {0}.{1} (
+                      id INT,
+                      new_logical_milli_utc BIGINT,
+                      new_logical_milli_local BIGINT,
+                      new_logical_micro_utc BIGINT,
+                      new_logical_micro_local BIGINT
+                     ) STORED AS PARQUET""".format(unique_database, TABLE_NAME)
+    create_table_and_copy_files(self.client, CREATE_SQL, unique_database, TABLE_NAME,
+        ["/testdata/data/int64_timestamps_dict.parquet"])
+
+    TABLE_NAME = "int64_timestamps_at_dst_changes"
+    create_table_from_parquet(self.client, unique_database, TABLE_NAME)
+
+    self.run_test_case(
+        'QueryTest/parquet-int64-timestamps', vector, unique_database)
+
 # We use various scan range lengths to exercise corner cases in the HDFS scanner more
 # thoroughly. In particular, it will exercise:
 # 1. default scan range


[8/9] impala git commit: IMPALA-941: [DOCS] Update Impala identifier doc

Posted by ta...@apache.org.
IMPALA-941: [DOCS] Update Impala identifier doc

Change-Id: I481e4c9bedbbac0eb5c21a995beb71caeb88fa2a
Reviewed-on: http://gerrit.cloudera.org:8080/11930
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
Reviewed-by: Alex Rodoni <ar...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c75b3711
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c75b3711
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c75b3711

Branch: refs/heads/master
Commit: c75b3711bc1a5cbbab983cbf52bd81eb084b8559
Parents: 71f8d0e
Author: Fredy Wijaya <fw...@cloudera.com>
Authored: Wed Nov 14 12:21:53 2018 -0600
Committer: Alex Rodoni <ar...@cloudera.com>
Committed: Wed Nov 14 18:46:47 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_identifiers.xml | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c75b3711/docs/topics/impala_identifiers.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_identifiers.xml b/docs/topics/impala_identifiers.xml
index e140609..5f407fa 100644
--- a/docs/topics/impala_identifiers.xml
+++ b/docs/topics/impala_identifiers.xml
@@ -57,9 +57,8 @@ under the License.
 
       <li>
         <p>
-        An identifier must start with an alphabetic character. The remainder can contain any combination of
-        alphanumeric characters and underscores. Quoting the identifier with backticks has no effect on the allowed
-        characters in the name.
+        An identifier must start with an alphanumeric or underscore character. Quoting the identifier with
+        backticks has no effect on the allowed characters in the name.
         </p>
       </li>