You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ph...@apache.org on 2018/02/02 18:51:40 UTC

[1/3] impala git commit: IMPALA-3916: Reserve SQL:2016 reserved words

Repository: impala
Updated Branches:
  refs/heads/master 4bd7cc8db -> ff86feaa6


http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/testdata/workloads/functional-query/queries/QueryTest/partition-col-types.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/partition-col-types.test b/testdata/workloads/functional-query/queries/QueryTest/partition-col-types.test
index e364ce4..cd98972 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/partition-col-types.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/partition-col-types.test
@@ -220,7 +220,7 @@ DROP TABLE all_insert_partition_col_types;
 ---- QUERY
 # Regression test for IMPALA-1026
 drop table if exists test_dec_partition;
-create table test_dec_partition(id int, dec decimal(5,4))
+create table test_dec_partition(id int, `dec` decimal(5,4))
   partitioned by(decimal_col DECIMAL(5,4));
 alter table test_dec_partition drop if exists partition(decimal_col=4.34);
 insert into test_dec_partition partition(decimal_col=4.34) values (1, 3.14);

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test b/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test
index fa1ccfc..82370af 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test
@@ -173,9 +173,9 @@ INT,INT
 ====
 ---- QUERY
 # Right non-equi-join with empty build.
-select straight_join at.id
-from alltypes at
-  right join functional.alltypestiny att on at.id < att.id
+select straight_join atp.id
+from alltypes atp
+  right join functional.alltypestiny att on atp.id < att.id
 where att.int_col = 999
 ---- RESULTS
 ---- TYPES
@@ -183,11 +183,11 @@ INT
 ====
 ---- QUERY
 # Full outer non-equi-join with empty build.
-select straight_join at.id
-from alltypes at
+select straight_join atp.id
+from alltypes atp
   full outer join (
-    select * from functional.alltypestiny where int_col = 999) att on at.id < att.id
-order by at.id desc
+    select * from functional.alltypestiny where int_col = 999) att on atp.id < att.id
+order by atp.id desc
 limit 5
 ---- RESULTS
 7299
@@ -200,19 +200,19 @@ INT
 ====
 ---- QUERY
 # Right semi non-equi-join with empty build.
-select straight_join at.id
+select straight_join atp.id
 from (select * from functional.alltypestiny att where int_col = 999) att
-  right semi join alltypes at on at.id < att.id
+  right semi join alltypes atp on atp.id < att.id
 ---- RESULTS
 ---- TYPES
 INT
 ====
 ---- QUERY
 # Right anti non-equi-join with empty build.
-select straight_join at.id
+select straight_join atp.id
 from (select * from functional.alltypestiny att where int_col = 999) att
-  right anti join alltypes at on at.id < att.id
-order by at.id desc
+  right anti join alltypes atp on atp.id < att.id
+order by atp.id desc
 limit 5
 ---- RESULTS
 7299

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test
index 9b61986..cdd6f21 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test
@@ -55,20 +55,20 @@ int,bigint
 ====
 ---- QUERY
 # Row is too big to process in right side of hash join.
-select straight_join at.id, bs.id, at.string_col
-from functional.alltypes at
-  join bigstrs bs on repeat(at.string_col, 10000) = substring(bs.bigstr, 5000000, 10000) and at.id = bs.id
-where at.id < 100
+select straight_join atp.id, bs.id, atp.string_col
+from functional.alltypes atp
+  join bigstrs bs on repeat(atp.string_col, 10000) = substring(bs.bigstr, 5000000, 10000) and atp.id = bs.id
+where atp.id < 100
 ---- CATCH
 Row of size 9.54 MB could not be materialized in plan node with id 2. Increase the max_row_size query option (currently 512.00 KB) to process larger rows.
 ====
 ---- QUERY
 # Row is too big to process in right side of hash join.
 set max_row_size=18m;
-select straight_join at.id, bs.id, at.string_col
-from functional.alltypes at
-  join bigstrs bs on repeat(at.string_col, 10000) = substring(bs.bigstr, 5000000, 10000) and at.id = bs.id
-where at.id < 100
+select straight_join atp.id, bs.id, atp.string_col
+from functional.alltypes atp
+  join bigstrs bs on repeat(atp.string_col, 10000) = substring(bs.bigstr, 5000000, 10000) and atp.id = bs.id
+where atp.id < 100
 ---- TYPES
 int,int,string
 ---- RESULTS

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/testdata/workloads/functional-query/queries/QueryTest/values.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/values.test b/testdata/workloads/functional-query/queries/QueryTest/values.test
index ac33e3d..8630059 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/values.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/values.test
@@ -74,7 +74,7 @@ decimal
 # IMPALA-2749: Test that multiplying a DOUBLE and a DECIMAL results in a double
 # value and do not overflow.
 drop table if exists i_2749;
-create table i_2749 (dbl1 double, dec decimal(9,4), dbl2 double);
+create table i_2749 (dbl1 double, `dec` decimal(9,4), dbl2 double);
 insert overwrite table i_2749 values
     (0.0017,90,1.0113),
     (0.0342,90,1.0113),
@@ -82,7 +82,7 @@ insert overwrite table i_2749 values
     (0.0163,90,1.0113);
 ====
 ---- QUERY
-select dbl1 * dec * dbl2, dbl1 + dec, dbl1 - dec, dbl1 / dec from i_2749;
+select dbl1 * `dec` * dbl2, dbl1 + `dec`, dbl1 - `dec`, dbl1 / `dec` from i_2749;
 ---- RESULTS
 0.1547289,90.0017000000000000,-89.9983000000000000,0.0000188888889
 3.112781400000001,90.0342000000000000,-89.9658000000000000,0.0003800000000
@@ -92,7 +92,7 @@ select dbl1 * dec * dbl2, dbl1 + dec, dbl1 - dec, dbl1 / dec from i_2749;
 DOUBLE,DECIMAL,DECIMAL,DECIMAL
 ====
 ---- QUERY
-select dbl1 * dbl2 * dec from i_2749;
+select dbl1 * dbl2 * `dec` from i_2749;
 ---- RESULTS
 0.1547289
 3.112781400000001

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/tests/custom_cluster/test_reserved_words_version.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_reserved_words_version.py b/tests/custom_cluster/test_reserved_words_version.py
new file mode 100644
index 0000000..424d6bb
--- /dev/null
+++ b/tests/custom_cluster/test_reserved_words_version.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+
+class TestReservedWordsVersion(CustomClusterTestSuite):
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--reserved_words_version=3.0.0")
+  def test_3_0(self):
+    assert "A reserved word cannot be used as an identifier: at" in \
+        str(self.execute_query_expect_failure(self.client, "select 1 as at"))
+    self.execute_query_expect_success(self.client, "select 1 as `at`")
+    self.execute_query_expect_success(self.client, "select 1 as year")
+    self.execute_query_expect_success(self.client, "select 1 as avg")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--reserved_words_version=2.11.0")
+  def test_2_11(self):
+    self.execute_query_expect_success(self.client, "select 1 as at")
+    self.execute_query_expect_success(self.client, "select 1 as year")
+    self.execute_query_expect_success(self.client, "select 1 as avg")

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/tests/custom_cluster/test_stats_extrapolation.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_stats_extrapolation.py b/tests/custom_cluster/test_stats_extrapolation.py
index 65aa9f1..42c3820 100644
--- a/tests/custom_cluster/test_stats_extrapolation.py
+++ b/tests/custom_cluster/test_stats_extrapolation.py
@@ -76,7 +76,7 @@ class TestStatsExtrapolation(CustomClusterTestSuite):
     self.__run_sampling_test(nopart_test_tbl, "", nopart_test_tbl_exp, 100, 99)
 
     # Test empty table.
-    empty_test_tbl = unique_database + ".empty"
+    empty_test_tbl = unique_database + ".empty_tbl"
     self.clone_table("functional.alltypes", empty_test_tbl, False, vector)
     self.__run_sampling_test(empty_test_tbl, "", empty_test_tbl, 10, 7)
 

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/tests/query_test/test_sort.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py
index 0eae035..59a28cd 100644
--- a/tests/query_test/test_sort.py
+++ b/tests/query_test/test_sort.py
@@ -142,10 +142,10 @@ class TestQueryFullSort(ImpalaTestSuite):
     has to be handled differently because there are no var len blocks to point into."""
 
     query = """
-    select empty, l_orderkey, l_partkey, l_suppkey,
+    select empty_str, l_orderkey, l_partkey, l_suppkey,
         l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax
-    from (select substr(l_comment, 1000, 0) empty, * from lineitem) t
-    order by empty, l_orderkey, l_partkey, l_suppkey, l_linenumber
+    from (select substr(l_comment, 1000, 0) empty_str, * from lineitem) t
+    order by empty_str, l_orderkey, l_partkey, l_suppkey, l_linenumber
     limit 100000
     """
 


[2/3] impala git commit: IMPALA-3916: Reserve SQL:2016 reserved words

Posted by ph...@apache.org.
IMPALA-3916: Reserve SQL:2016 reserved words

This patch reserves SQL:2016 reserved words, excluding:
1. Impala builtin function names.
2. Time unit words(year, month, etc.).
3. An exception list based on a discussion.

Some test cases are modified to avoid these words. A impalad and
catalogd startup option reserved_words_version is added. The words are
reserved if the option is set to "3.0.0".

Change-Id: If1b295e6a77e840cf1b794c2eb73e1b9d2b8ddd6
Reviewed-on: http://gerrit.cloudera.org:8080/9096
Reviewed-by: Alex Behm <al...@cloudera.com>
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f0b3d9d1
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f0b3d9d1
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f0b3d9d1

Branch: refs/heads/master
Commit: f0b3d9d122f2c6eb4137bf93e3512a489ff8fab0
Parents: 4bd7cc8
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Tue Jan 16 18:00:02 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 2 01:13:08 2018 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc                   |   7 +-
 be/src/common/init.cc                           |   9 +-
 be/src/util/backend-gflag-util.cc               |  11 +-
 common/thrift/BackendGflags.thrift              |   7 +
 fe/src/main/cup/sql-parser.cup                  |  56 +-
 .../org/apache/impala/analysis/ToSqlUtils.java  |   7 +-
 .../org/apache/impala/catalog/BuiltinsDb.java   |   4 +-
 .../java/org/apache/impala/catalog/Catalog.java |  10 +-
 .../impala/catalog/CatalogServiceCatalog.java   |  11 +-
 .../main/java/org/apache/impala/catalog/Db.java |  18 +-
 .../apache/impala/catalog/ImpaladCatalog.java   |  14 +-
 .../apache/impala/service/BackendConfig.java    |  12 +-
 fe/src/main/jflex/sql-scanner.flex              | 577 +++++++++++--------
 .../apache/impala/analysis/AnalyzeDDLTest.java  |  13 +-
 .../impala/analysis/AnalyzeStmtsTest.java       |   6 +-
 .../org/apache/impala/analysis/ToSqlTest.java   |  14 +-
 .../org/apache/impala/catalog/CatalogTest.java  |  12 +-
 .../apache/impala/common/FrontendTestBase.java  |   2 +-
 .../impala/planner/StatsExtrapolationTest.java  |  24 +-
 .../org/apache/impala/service/JdbcTest.java     |   8 +-
 .../queries/QueryTest/empty-build-joins.test    |  84 +--
 .../queries/QueryTest/exprs.test                |  16 +-
 .../queries/QueryTest/partition-col-types.test  |   2 +-
 .../queries/QueryTest/single-node-nlj.test      |  24 +-
 .../queries/QueryTest/spilling-large-rows.test  |  16 +-
 .../queries/QueryTest/values.test               |   6 +-
 .../test_reserved_words_version.py              |  38 ++
 .../custom_cluster/test_stats_extrapolation.py  |   2 +-
 tests/query_test/test_sort.py                   |   6 +-
 29 files changed, 572 insertions(+), 444 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index d9d99c8..2d832da 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -182,10 +182,15 @@ DEFINE_int32(max_log_files, 10, "Maximum number of log files to retain per sever
     "retained.");
 
 // The read size is the preferred size of the reads issued to HDFS or the local FS.
-// There is a trade off of latency and throughout, trying to keep disks busy but
+// There is a trade off of latency and throughput, trying to keep disks busy but
 // not introduce seeks.  The literature seems to agree that with 8 MB reads, random
 // io and sequential io perform similarly.
 DEFINE_int32(read_size, 8 * 1024 * 1024, "(Advanced) The preferred I/O request size in "
     "bytes to issue to HDFS or the local filesystem. Increasing the read size will "
     "increase memory requirements. Decreasing the read size may decrease I/O "
     "throughput.");
+
+DEFINE_string(reserved_words_version, "3.0.0", "Reserved words compatibility version. "
+    "Reserved words cannot be used as identifiers in SQL. This flag determines the impala"
+    " version from which the reserved word list is taken. The value must be one of "
+    "[\"2.11.0\", \"3.0.0\"].");

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/be/src/common/init.cc
----------------------------------------------------------------------
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index abbe416..41d4549 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -63,9 +63,10 @@ DECLARE_string(heap_profile_dir);
 DECLARE_string(hostname);
 // TODO: rename this to be more generic when we have a good CM release to do so.
 DECLARE_int32(logbufsecs);
+DECLARE_int32(max_log_files);
 DECLARE_int32(max_minidumps);
 DECLARE_string(redaction_rules_file);
-DECLARE_int32(max_log_files);
+DECLARE_string(reserved_words_version);
 
 DEFINE_int32(max_audit_event_log_files, 0, "Maximum number of audit event log files "
     "to retain. The most recent audit event log files are retained. If set to 0, "
@@ -199,6 +200,12 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
     CLEAN_EXIT_WITH_ERROR(Substitute("read_size can not be lower than $0",
         READ_SIZE_MIN_VALUE));
   }
+  if (FLAGS_reserved_words_version != "2.11.0" && FLAGS_reserved_words_version != "3.0.0")
+  {
+    CLEAN_EXIT_WITH_ERROR(Substitute("Invalid flag reserved_words_version. The value must"
+        " be one of [\"2.11.0\", \"3.0.0\"], while the provided value is $0.",
+        FLAGS_reserved_words_version));
+  }
   impala::InitGoogleLoggingSafe(argv[0]);
   // Breakpad needs flags and logging to initialize.
   ABORT_IF_ERROR(RegisterMinidump(argv[0]));

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 91095e1..f7f49ac 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -16,13 +16,12 @@
 // under the License.
 
 #include "common/global-flags.h"
-#include "util/backend-gflag-util.h"
 
 #include "gen-cpp/BackendGflags_types.h"
 #include "rpc/jni-thrift-util.h"
+#include "util/backend-gflag-util.h"
 #include "util/logging-support.h"
 
-#include <gflags/gflags.h>
 
 // Configs for the Frontend and the Catalog.
 DECLARE_bool(load_catalog_in_background);
@@ -46,6 +45,7 @@ DECLARE_string(authorization_policy_provider_class);
 DECLARE_string(authorized_proxy_user_config);
 DECLARE_string(authorized_proxy_user_config_delimiter);
 DECLARE_string(kudu_master_hosts);
+DECLARE_string(reserved_words_version);
 DECLARE_string(sentry_config);
 
 namespace impala {
@@ -78,6 +78,13 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   cfg.__set_local_library_path(FLAGS_local_library_dir);
   cfg.__set_kudu_operation_timeout_ms(FLAGS_kudu_operation_timeout_ms);
   cfg.__set_sentry_catalog_polling_frequency_s(FLAGS_sentry_catalog_polling_frequency_s);
+  if (FLAGS_reserved_words_version == "2.11.0") {
+    cfg.__set_reserved_words_version(TReservedWordsVersion::IMPALA_2_11);
+  } else {
+    DCHECK_EQ(FLAGS_reserved_words_version, "3.0.0");
+    cfg.__set_reserved_words_version(TReservedWordsVersion::IMPALA_3_0);
+  }
+
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index baae9ba..36c4a7e 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -18,6 +18,11 @@
 namespace cpp impala
 namespace java org.apache.impala.thrift
 
+enum TReservedWordsVersion {
+  IMPALA_2_11
+  IMPALA_3_0
+}
+
 // Used to pass gflags from backend to frontend, JniCatalog and JniFrontend
 // Attributes without comments correspond to gflags
 struct TBackendGflags {
@@ -62,4 +67,6 @@ struct TBackendGflags {
   20: required i32 max_hdfs_partitions_parallel_load
 
   21: required i32 max_nonhdfs_partitions_parallel_load
+
+  22: required TReservedWordsVersion reserved_words_version
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/fe/src/main/cup/sql-parser.cup
----------------------------------------------------------------------
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index dc0199b..38183f8 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -215,6 +215,9 @@ parser code {:
       SqlScanner.tokenIdMap.get(Integer.valueOf(errorToken_.sym));
     if (lastToken != null) {
       result.append(lastToken);
+    } else if (SqlScanner.isReserved((String)errorToken_.value)) {
+      result.append("A reserved word cannot be used as an identifier: ")
+          .append((String)errorToken_.value);
     } else {
       result.append("Unknown last token with id: " + errorToken_.sym);
     }
@@ -244,7 +247,7 @@ parser code {:
 :};
 
 // List of keywords. Please keep them sorted alphabetically.
-// ALL KEYWORDS ALSO NEED TO BE ADDED TO THE ident_or_keyword PRODUCTION.
+// ALL KEYWORDS ALSO NEED TO BE ADDED TO THE word PRODUCTION.
 terminal
   KW_ADD, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_ANALYTIC, KW_AND, KW_ANTI, KW_API_VERSION,
   KW_ARRAY, KW_AS, KW_ASC, KW_AVRO, KW_BETWEEN, KW_BIGINT, KW_BINARY, KW_BLOCKSIZE,
@@ -270,8 +273,10 @@ terminal
   KW_SHOW, KW_SMALLINT, KW_SORT, KW_STORED, KW_STRAIGHT_JOIN, KW_STRING, KW_STRUCT,
   KW_SYMBOL, KW_TABLE, KW_TABLES, KW_TABLESAMPLE, KW_TBLPROPERTIES, KW_TERMINATED,
   KW_TEXTFILE, KW_THEN, KW_TIMESTAMP, KW_TINYINT, KW_TRUNCATE, KW_STATS, KW_TO, KW_TRUE,
-  KW_UNBOUNDED, KW_UNCACHED, KW_UNION, KW_UPDATE, KW_UPDATE_FN, KW_UPSERT, KW_USE,
-  KW_USING, KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN, KW_WHERE, KW_WITH;
+  KW_UNBOUNDED, KW_UNCACHED, KW_UNION, KW_UNKNOWN, KW_UPDATE, KW_UPDATE_FN, KW_UPSERT,
+  KW_USE, KW_USING, KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN, KW_WHERE, KW_WITH;
+
+terminal UNUSED_RESERVED_WORD;
 
 terminal COLON, SEMICOLON, COMMA, DOT, DOTDOTDOT, STAR, LPAREN, RPAREN, LBRACKET,
   RBRACKET, DIVIDE, MOD, ADD, SUBTRACT;
@@ -292,7 +297,11 @@ terminal String UNEXPECTED_CHAR;
 // IMPALA-3726 introduced the DEFAULT keyword which could break existing applications
 // that use the identifier "KEYWORD" as database, column or table names. To avoid that,
 // the ident_or_default non-terminal is introduced and should be used instead of IDENT.
-nonterminal String ident_or_keyword, ident_or_default;
+nonterminal String ident_or_default;
+// A word is an arbitrary token composed of digits and at least one letter. Reserved
+// words cannot be used as identifiers but they are words and can be used in query
+// options, column attributes, etc.
+nonterminal String word;
 nonterminal StatementBase stmt;
 // Single select statement.
 nonterminal SelectStmt select_stmt;
@@ -494,7 +503,6 @@ nonterminal Boolean server_ident;
 nonterminal Boolean source_ident;
 nonterminal Boolean sources_ident;
 nonterminal Boolean uri_ident;
-nonterminal Boolean unknown_ident;
 
 // For Create/Drop/Show function ddl
 nonterminal FunctionArgs function_def_args;
@@ -1633,13 +1641,13 @@ nullability_val ::=
   ;
 
 encoding_val ::=
-  KW_ENCODING ident_or_default:encoding_ident
-  {: RESULT = encoding_ident; :}
+  KW_ENCODING word:value
+  {: RESULT = value; :}
   ;
 
 compression_val ::=
-  KW_COMPRESSION ident_or_default:compression_ident
-  {: RESULT = compression_ident; :}
+  KW_COMPRESSION word:value
+  {: RESULT = value; :}
   ;
 
 default_val ::=
@@ -1740,16 +1748,6 @@ option_ident ::=
   :}
   ;
 
-unknown_ident ::=
-  IDENT:ident
-  {:
-    if (!ident.toUpperCase().equals("UNKNOWN")) {
-      parser.parseError("identifier", SqlParserSymbols.IDENT, "UNKNOWN");
-    }
-    RESULT = true;
-  :}
-  ;
-
 view_column_defs ::=
   LPAREN view_column_def_list:view_col_defs RPAREN
   {: RESULT = view_col_defs; :}
@@ -2361,14 +2359,16 @@ select_clause ::=
   ;
 
 set_stmt ::=
-  KW_SET ident_or_default:key EQUAL literal:l
+  KW_SET ident_or_default:key EQUAL numeric_literal:l
   {: RESULT = new SetStmt(key, l.getStringValue(), false); :}
+  | KW_SET ident_or_default:key EQUAL STRING_LITERAL:l
+  {: RESULT = new SetStmt(key, l, false); :}
   | KW_SET ident_or_default:key EQUAL SUBTRACT numeric_literal:l
   {:
     l.swapSign();
     RESULT = new SetStmt(key, l.getStringValue(), false); :}
-  | KW_SET ident_or_default:key EQUAL ident_or_default:ident
-  {: RESULT = new SetStmt(key, ident, false); :}
+  | KW_SET ident_or_default:key EQUAL word:value
+  {: RESULT = new SetStmt(key, value, false); :}
   | KW_SET KW_ALL
   {: RESULT = new SetStmt(null, null, true); :}
   | KW_SET
@@ -3106,9 +3106,9 @@ bool_test_expr ::=
   {: RESULT = new FunctionCallExpr("isfalse", Lists.newArrayList(e)); :}
   | expr:e KW_IS KW_NOT KW_FALSE
   {: RESULT = new FunctionCallExpr("isnotfalse", Lists.newArrayList(e)); :}
-  | expr:e KW_IS unknown_ident
+  | expr:e KW_IS KW_UNKNOWN
   {: RESULT = new IsNullPredicate(e, false); :}
-  | expr:e KW_IS KW_NOT unknown_ident
+  | expr:e KW_IS KW_NOT KW_UNKNOWN
   {: RESULT = new IsNullPredicate(e, true); :}
   ;
 
@@ -3203,7 +3203,7 @@ type ::=
 // that we can parse type strings from the Hive Metastore which
 // may have unquoted identifiers corresponding to keywords.
 struct_field_def ::=
-  ident_or_keyword:name COLON type:t opt_comment_val:comment
+  word:name COLON type:t opt_comment_val:comment
   {: RESULT = new StructField(name, t, comment); :}
   ;
 
@@ -3228,7 +3228,7 @@ ident_or_default ::=
   {: RESULT = name.toString(); :}
   ;
 
-ident_or_keyword ::=
+word ::=
   IDENT:r
   {: RESULT = r.toString(); :}
   | KW_ADD:r
@@ -3579,6 +3579,8 @@ ident_or_keyword ::=
   {: RESULT = r.toString(); :}
   | KW_UNION:r
   {: RESULT = r.toString(); :}
+  | KW_UNKNOWN:r
+  {: RESULT = r.toString(); :}
   | KW_UPDATE:r
   {: RESULT = r.toString(); :}
   | KW_UPDATE_FN:r
@@ -3601,4 +3603,6 @@ ident_or_keyword ::=
   {: RESULT = r.toString(); :}
   | KW_WITH:r
   {: RESULT = r.toString(); :}
+  | UNUSED_RESERVED_WORD:r
+  {: RESULT = r.toString(); :}
   ;

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index facebfd..1940cde 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -39,7 +39,6 @@ import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.ql.parse.HiveLexer;
-
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.Function;
@@ -123,15 +122,15 @@ public class ToSqlUtils {
     } catch (Exception e) {
       // Ignore exception and just quote the identifier to be safe.
     }
-    boolean isImpalaKeyword = SqlScanner.isKeyword(ident.toUpperCase());
+    boolean isImpalaReserved = SqlScanner.isReserved(ident.toUpperCase());
     // Impala's scanner recognizes the ".123" portion of "db.123_tbl" as a decimal,
     // so while the quoting is not necessary for the given identifier itself, the quotes
     // are needed if this identifier will be preceded by a ".".
     boolean startsWithNumber = false;
-    if (!hiveNeedsQuotes && !isImpalaKeyword) {
+    if (!hiveNeedsQuotes && !isImpalaReserved) {
       startsWithNumber = Character.isDigit(ident.charAt(0));
     }
-    if (hiveNeedsQuotes || isImpalaKeyword || startsWithNumber) return "`" + ident + "`";
+    if (hiveNeedsQuotes || isImpalaReserved || startsWithNumber) return "`" + ident + "`";
     return ident;
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index f8a6b89..a95435e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -56,8 +56,8 @@ public class BuiltinsDb extends Db {
   // Size in bytes of RankState used for rank() and dense_rank().
   private static final int RANK_INTERMEDIATE_SIZE = 16;
 
-  public BuiltinsDb(String name, Catalog catalog) {
-    super(name, catalog, createMetastoreDb(name));
+  public BuiltinsDb(String name) {
+    super(name, createMetastoreDb(name));
     setIsSystemDb(true);
     initBuiltins();
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/fe/src/main/java/org/apache/impala/catalog/Catalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 4548c2b..6136835 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -17,10 +17,6 @@
 
 package org.apache.impala.catalog;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -36,7 +32,9 @@ import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.PatternMatcher;
 
-import org.apache.log4j.Logger;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  * Thread safe interface for reading and updating metadata stored in the Hive MetaStore.
@@ -89,7 +87,7 @@ public abstract class Catalog {
 
   public Catalog() {
     dataSources_ = new CatalogObjectCache<DataSource>();
-    builtinsDb_ = new BuiltinsDb(BUILTINS_DB, this);
+    builtinsDb_ = new BuiltinsDb(BUILTINS_DB);
     addDb(builtinsDb_);
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 8f75a16..7bc2a91 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -25,15 +25,14 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.fs.Path;
@@ -49,7 +48,6 @@ import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils;
 import org.apache.impala.authorization.SentryConfig;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
-import org.apache.impala.catalog.TopicUpdateLog.Entry;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
@@ -79,7 +77,6 @@ import org.apache.thrift.protocol.TCompactProtocol;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -943,7 +940,7 @@ public class CatalogServiceCatalog extends Catalog {
       // Contains native functions in it's params map.
       org.apache.hadoop.hive.metastore.api.Database msDb =
           msClient.getHiveClient().getDatabase(dbName);
-      tmpDb = new Db(dbName, this, null);
+      tmpDb = new Db(dbName, null);
       // Load native UDFs into the temporary db.
       loadFunctionsFromDbParams(tmpDb, msDb);
       // Load Java UDFs from HMS into the temporary db.
@@ -1004,7 +1001,7 @@ public class CatalogServiceCatalog extends Catalog {
       }
       org.apache.hadoop.hive.metastore.api.Database msDb =
           msClient.getHiveClient().getDatabase(dbName);
-      Db newDb = new Db(dbName, this, msDb);
+      Db newDb = new Db(dbName, msDb);
       // existingDb is usually null when the Catalog loads for the first time.
       // In that case we needn't restore any transient functions.
       if (existingDb != null) {
@@ -1144,7 +1141,7 @@ public class CatalogServiceCatalog extends Catalog {
    * new Db object. Used by CREATE DATABASE statements.
    */
   public Db addDb(String dbName, org.apache.hadoop.hive.metastore.api.Database msDb) {
-    Db newDb = new Db(dbName, this, msDb);
+    Db newDb = new Db(dbName, msDb);
     versionLock_.writeLock().lock();
     try {
       newDb.setCatalogVersion(incrementAndGetCatalogVersion());

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/fe/src/main/java/org/apache/impala/catalog/Db.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index f1c9c8e..d59a28f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -24,23 +24,20 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.codec.binary.Base64;
-import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TCompactProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import org.apache.impala.catalog.Function;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
-import org.apache.impala.common.JniUtil;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TDatabase;
-import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TFunctionBinaryType;
 import org.apache.impala.thrift.TFunctionCategory;
 import org.apache.impala.util.PatternMatcher;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -62,7 +59,6 @@ import com.google.common.collect.Maps;
  */
 public class Db extends CatalogObjectImpl {
   private static final Logger LOG = LoggerFactory.getLogger(Db.class);
-  private final Catalog parentCatalog_;
   private final TDatabase thriftDb_;
 
   public static final String FUNCTION_INDEX_PREFIX = "impala_registered_function_";
@@ -87,10 +83,8 @@ public class Db extends CatalogObjectImpl {
   // (e.g. can't drop it, can't add tables to it, etc).
   private boolean isSystemDb_ = false;
 
-  public Db(String name, Catalog catalog,
-      org.apache.hadoop.hive.metastore.api.Database msDb) {
+  public Db(String name, org.apache.hadoop.hive.metastore.api.Database msDb) {
     thriftDb_ = new TDatabase(name.toLowerCase());
-    parentCatalog_ = catalog;
     thriftDb_.setMetastore_db(msDb);
     tableCache_ = new CatalogObjectCache<Table>();
     functions_ = new HashMap<String, List<Function>>();
@@ -101,8 +95,8 @@ public class Db extends CatalogObjectImpl {
   /**
    * Creates a Db object with no tables based on the given TDatabase thrift struct.
    */
-  public static Db fromTDatabase(TDatabase db, Catalog parentCatalog) {
-    return new Db(db.getDb_name(), parentCatalog, db.getMetastore_db());
+  public static Db fromTDatabase(TDatabase db) {
+    return new Db(db.getDb_name(), db.getMetastore_db());
   }
 
   /**
@@ -416,7 +410,7 @@ public class Db extends CatalogObjectImpl {
    * This is not thread safe so a higher level lock must be taken while iterating
    * over the returned functions.
    */
-  protected HashMap<String, List<Function>> getAllFunctions() {
+  public HashMap<String, List<Function>> getAllFunctions() {
     return functions_;
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 0e2e8b9..4bb6b65 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -17,20 +17,13 @@
 
 package org.apache.impala.catalog;
 
-import com.google.common.base.Preconditions;
-
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
-
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.ImpalaException;
-import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TDataSource;
@@ -42,6 +35,11 @@ import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
 import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
+import org.apache.impala.util.PatternMatcher;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+
+import com.google.common.base.Preconditions;
 
 /**
  * Thread safe Catalog for an Impalad.  The Impalad catalog can be updated either via
@@ -374,7 +372,7 @@ public class ImpaladCatalog extends Catalog {
     Db existingDb = getDb(thriftDb.getDb_name());
     if (existingDb == null ||
         existingDb.getCatalogVersion() < catalogVersion) {
-      Db newDb = Db.fromTDatabase(thriftDb, this);
+      Db newDb = Db.fromTDatabase(thriftDb);
       newDb.setCatalogVersion(catalogVersion);
       addDb(newDb);
       if (existingDb != null) {

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/fe/src/main/java/org/apache/impala/service/BackendConfig.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index af05ad6..659e717 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -17,14 +17,17 @@
 
 package org.apache.impala.service;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
+    .HADOOP_SECURITY_AUTH_TO_LOCAL;
 
 import org.apache.hadoop.conf.Configuration;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL;
 import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.impala.analysis.SqlScanner;
 import org.apache.impala.thrift.TBackendGflags;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
 /**
  * This class is meant to provide the FE with impalad backend configuration parameters,
  * including command line arguments.
@@ -41,9 +44,11 @@ public class BackendConfig {
   public static void create(TBackendGflags cfg) {
     Preconditions.checkNotNull(cfg);
     INSTANCE = new BackendConfig(cfg);
+    SqlScanner.init(cfg.getReserved_words_version());
     initAuthToLocal();
   }
 
+  public TBackendGflags getBackendCfg() { return backendCfg_; }
   public long getReadSize() { return backendCfg_.read_size; }
   public boolean getComputeLineage() {
     return !Strings.isNullOrEmpty(backendCfg_.lineage_event_log_dir);
@@ -71,6 +76,7 @@ public class BackendConfig {
   public int maxNonHdfsPartsParallelLoad() {
     return backendCfg_.max_nonhdfs_partitions_parallel_load;
   }
+
   // Inits the auth_to_local configuration in the static KerberosName class.
   private static void initAuthToLocal() {
     // If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/fe/src/main/jflex/sql-scanner.flex
----------------------------------------------------------------------
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index ee8355d..c4ed217 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -20,15 +20,19 @@ package org.apache.impala.analysis;
 import java_cup.runtime.Symbol;
 import java.lang.Integer;
 import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.Iterator;
-import java.util.List;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 
 import org.apache.impala.analysis.SqlParserSymbols;
+import org.apache.impala.catalog.BuiltinsDb;
+import static org.apache.impala.catalog.Catalog.BUILTINS_DB;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TReservedWordsVersion;
 
 %%
 
@@ -49,253 +53,328 @@ import org.apache.impala.analysis.SqlParserSymbols;
   // uses "and" as a display name and not "&&".
   // Please keep the puts sorted alphabetically by keyword (where the order
   // does not affect the desired error reporting)
-  public static final Map<String, Integer> keywordMap =
-      new LinkedHashMap<String, Integer>();
-  static {
-    keywordMap.put("&&", new Integer(SqlParserSymbols.KW_AND));
-    keywordMap.put("add", new Integer(SqlParserSymbols.KW_ADD));
-    keywordMap.put("aggregate", new Integer(SqlParserSymbols.KW_AGGREGATE));
-    keywordMap.put("all", new Integer(SqlParserSymbols.KW_ALL));
-    keywordMap.put("alter", new Integer(SqlParserSymbols.KW_ALTER));
-    keywordMap.put("analytic", new Integer(SqlParserSymbols.KW_ANALYTIC));
-    keywordMap.put("and", new Integer(SqlParserSymbols.KW_AND));
-    keywordMap.put("anti", new Integer(SqlParserSymbols.KW_ANTI));
-    keywordMap.put("api_version", new Integer(SqlParserSymbols.KW_API_VERSION));
-    keywordMap.put("array", new Integer(SqlParserSymbols.KW_ARRAY));
-    keywordMap.put("as", new Integer(SqlParserSymbols.KW_AS));
-    keywordMap.put("asc", new Integer(SqlParserSymbols.KW_ASC));
-    keywordMap.put("avro", new Integer(SqlParserSymbols.KW_AVRO));
-    keywordMap.put("between", new Integer(SqlParserSymbols.KW_BETWEEN));
-    keywordMap.put("bigint", new Integer(SqlParserSymbols.KW_BIGINT));
-    keywordMap.put("binary", new Integer(SqlParserSymbols.KW_BINARY));
-    keywordMap.put("block_size", new Integer(SqlParserSymbols.KW_BLOCKSIZE));
-    keywordMap.put("boolean", new Integer(SqlParserSymbols.KW_BOOLEAN));
-    keywordMap.put("by", new Integer(SqlParserSymbols.KW_BY));
-    keywordMap.put("cached", new Integer(SqlParserSymbols.KW_CACHED));
-    keywordMap.put("case", new Integer(SqlParserSymbols.KW_CASE));
-    keywordMap.put("cascade", new Integer(SqlParserSymbols.KW_CASCADE));
-    keywordMap.put("cast", new Integer(SqlParserSymbols.KW_CAST));
-    keywordMap.put("change", new Integer(SqlParserSymbols.KW_CHANGE));
-    keywordMap.put("char", new Integer(SqlParserSymbols.KW_CHAR));
-    keywordMap.put("class", new Integer(SqlParserSymbols.KW_CLASS));
-    keywordMap.put("close_fn", new Integer(SqlParserSymbols.KW_CLOSE_FN));
-    keywordMap.put("column", new Integer(SqlParserSymbols.KW_COLUMN));
-    keywordMap.put("columns", new Integer(SqlParserSymbols.KW_COLUMNS));
-    keywordMap.put("comment", new Integer(SqlParserSymbols.KW_COMMENT));
-    keywordMap.put("compression", new Integer(SqlParserSymbols.KW_COMPRESSION));
-    keywordMap.put("compute", new Integer(SqlParserSymbols.KW_COMPUTE));
-    keywordMap.put("create", new Integer(SqlParserSymbols.KW_CREATE));
-    keywordMap.put("cross", new Integer(SqlParserSymbols.KW_CROSS));
-    keywordMap.put("current", new Integer(SqlParserSymbols.KW_CURRENT));
-    keywordMap.put("data", new Integer(SqlParserSymbols.KW_DATA));
-    keywordMap.put("database", new Integer(SqlParserSymbols.KW_DATABASE));
-    keywordMap.put("databases", new Integer(SqlParserSymbols.KW_DATABASES));
-    keywordMap.put("date", new Integer(SqlParserSymbols.KW_DATE));
-    keywordMap.put("datetime", new Integer(SqlParserSymbols.KW_DATETIME));
-    keywordMap.put("decimal", new Integer(SqlParserSymbols.KW_DECIMAL));
-    keywordMap.put("default", new Integer(SqlParserSymbols.KW_DEFAULT));
-    keywordMap.put("delete", new Integer(SqlParserSymbols.KW_DELETE));
-    keywordMap.put("delimited", new Integer(SqlParserSymbols.KW_DELIMITED));
-    keywordMap.put("desc", new Integer(SqlParserSymbols.KW_DESC));
-    keywordMap.put("describe", new Integer(SqlParserSymbols.KW_DESCRIBE));
-    keywordMap.put("distinct", new Integer(SqlParserSymbols.KW_DISTINCT));
-    keywordMap.put("div", new Integer(SqlParserSymbols.KW_DIV));
-    keywordMap.put("double", new Integer(SqlParserSymbols.KW_DOUBLE));
-    keywordMap.put("drop", new Integer(SqlParserSymbols.KW_DROP));
-    keywordMap.put("else", new Integer(SqlParserSymbols.KW_ELSE));
-    keywordMap.put("encoding", new Integer(SqlParserSymbols.KW_ENCODING));
-    keywordMap.put("end", new Integer(SqlParserSymbols.KW_END));
-    keywordMap.put("escaped", new Integer(SqlParserSymbols.KW_ESCAPED));
-    keywordMap.put("exists", new Integer(SqlParserSymbols.KW_EXISTS));
-    keywordMap.put("explain", new Integer(SqlParserSymbols.KW_EXPLAIN));
-    keywordMap.put("extended", new Integer(SqlParserSymbols.KW_EXTENDED));
-    keywordMap.put("external", new Integer(SqlParserSymbols.KW_EXTERNAL));
-    keywordMap.put("false", new Integer(SqlParserSymbols.KW_FALSE));
-    keywordMap.put("fields", new Integer(SqlParserSymbols.KW_FIELDS));
-    keywordMap.put("fileformat", new Integer(SqlParserSymbols.KW_FILEFORMAT));
-    keywordMap.put("files", new Integer(SqlParserSymbols.KW_FILES));
-    keywordMap.put("finalize_fn", new Integer(SqlParserSymbols.KW_FINALIZE_FN));
-    keywordMap.put("first", new Integer(SqlParserSymbols.KW_FIRST));
-    keywordMap.put("float", new Integer(SqlParserSymbols.KW_FLOAT));
-    keywordMap.put("following", new Integer(SqlParserSymbols.KW_FOLLOWING));
-    keywordMap.put("for", new Integer(SqlParserSymbols.KW_FOR));
-    keywordMap.put("format", new Integer(SqlParserSymbols.KW_FORMAT));
-    keywordMap.put("formatted", new Integer(SqlParserSymbols.KW_FORMATTED));
-    keywordMap.put("from", new Integer(SqlParserSymbols.KW_FROM));
-    keywordMap.put("full", new Integer(SqlParserSymbols.KW_FULL));
-    keywordMap.put("function", new Integer(SqlParserSymbols.KW_FUNCTION));
-    keywordMap.put("functions", new Integer(SqlParserSymbols.KW_FUNCTIONS));
-    keywordMap.put("grant", new Integer(SqlParserSymbols.KW_GRANT));
-    keywordMap.put("group", new Integer(SqlParserSymbols.KW_GROUP));
-    keywordMap.put("hash", new Integer(SqlParserSymbols.KW_HASH));
-    keywordMap.put("having", new Integer(SqlParserSymbols.KW_HAVING));
-    keywordMap.put("if", new Integer(SqlParserSymbols.KW_IF));
-    keywordMap.put("ilike", new Integer(SqlParserSymbols.KW_ILIKE));
-    keywordMap.put("ignore", new Integer(SqlParserSymbols.KW_IGNORE));
-    keywordMap.put("in", new Integer(SqlParserSymbols.KW_IN));
-    keywordMap.put("incremental", new Integer(SqlParserSymbols.KW_INCREMENTAL));
-    keywordMap.put("init_fn", new Integer(SqlParserSymbols.KW_INIT_FN));
-    keywordMap.put("inner", new Integer(SqlParserSymbols.KW_INNER));
-    keywordMap.put("inpath", new Integer(SqlParserSymbols.KW_INPATH));
-    keywordMap.put("insert", new Integer(SqlParserSymbols.KW_INSERT));
-    keywordMap.put("int", new Integer(SqlParserSymbols.KW_INT));
-    keywordMap.put("integer", new Integer(SqlParserSymbols.KW_INT));
-    keywordMap.put("intermediate", new Integer(SqlParserSymbols.KW_INTERMEDIATE));
-    keywordMap.put("interval", new Integer(SqlParserSymbols.KW_INTERVAL));
-    keywordMap.put("into", new Integer(SqlParserSymbols.KW_INTO));
-    keywordMap.put("invalidate", new Integer(SqlParserSymbols.KW_INVALIDATE));
-    keywordMap.put("iregexp", new Integer(SqlParserSymbols.KW_IREGEXP));
-    keywordMap.put("is", new Integer(SqlParserSymbols.KW_IS));
-    keywordMap.put("join", new Integer(SqlParserSymbols.KW_JOIN));
-    keywordMap.put("kudu", new Integer(SqlParserSymbols.KW_KUDU));
-    keywordMap.put("last", new Integer(SqlParserSymbols.KW_LAST));
-    keywordMap.put("left", new Integer(SqlParserSymbols.KW_LEFT));
-    keywordMap.put("like", new Integer(SqlParserSymbols.KW_LIKE));
-    keywordMap.put("limit", new Integer(SqlParserSymbols.KW_LIMIT));
-    keywordMap.put("lines", new Integer(SqlParserSymbols.KW_LINES));
-    keywordMap.put("load", new Integer(SqlParserSymbols.KW_LOAD));
-    keywordMap.put("location", new Integer(SqlParserSymbols.KW_LOCATION));
-    keywordMap.put("map", new Integer(SqlParserSymbols.KW_MAP));
-    keywordMap.put("merge_fn", new Integer(SqlParserSymbols.KW_MERGE_FN));
-    keywordMap.put("metadata", new Integer(SqlParserSymbols.KW_METADATA));
-    keywordMap.put("not", new Integer(SqlParserSymbols.KW_NOT));
-    keywordMap.put("null", new Integer(SqlParserSymbols.KW_NULL));
-    keywordMap.put("nulls", new Integer(SqlParserSymbols.KW_NULLS));
-    keywordMap.put("offset", new Integer(SqlParserSymbols.KW_OFFSET));
-    keywordMap.put("on", new Integer(SqlParserSymbols.KW_ON));
-    keywordMap.put("||", new Integer(SqlParserSymbols.KW_OR));
-    keywordMap.put("or", new Integer(SqlParserSymbols.KW_OR));
-    keywordMap.put("order", new Integer(SqlParserSymbols.KW_ORDER));
-    keywordMap.put("outer", new Integer(SqlParserSymbols.KW_OUTER));
-    keywordMap.put("over", new Integer(SqlParserSymbols.KW_OVER));
-    keywordMap.put("overwrite", new Integer(SqlParserSymbols.KW_OVERWRITE));
-    keywordMap.put("parquet", new Integer(SqlParserSymbols.KW_PARQUET));
-    keywordMap.put("parquetfile", new Integer(SqlParserSymbols.KW_PARQUETFILE));
-    keywordMap.put("partition", new Integer(SqlParserSymbols.KW_PARTITION));
-    keywordMap.put("partitioned", new Integer(SqlParserSymbols.KW_PARTITIONED));
-    keywordMap.put("partitions", new Integer(SqlParserSymbols.KW_PARTITIONS));
-    keywordMap.put("preceding", new Integer(SqlParserSymbols.KW_PRECEDING));
-    keywordMap.put("prepare_fn", new Integer(SqlParserSymbols.KW_PREPARE_FN));
-    keywordMap.put("primary", new Integer(SqlParserSymbols.KW_PRIMARY));
-    keywordMap.put("produced", new Integer(SqlParserSymbols.KW_PRODUCED));
-    keywordMap.put("purge", new Integer(SqlParserSymbols.KW_PURGE));
-    keywordMap.put("range", new Integer(SqlParserSymbols.KW_RANGE));
-    keywordMap.put("rcfile", new Integer(SqlParserSymbols.KW_RCFILE));
-    keywordMap.put("real", new Integer(SqlParserSymbols.KW_DOUBLE));
-    keywordMap.put("recover", new Integer(SqlParserSymbols.KW_RECOVER));
-    keywordMap.put("refresh", new Integer(SqlParserSymbols.KW_REFRESH));
-    keywordMap.put("regexp", new Integer(SqlParserSymbols.KW_REGEXP));
-    keywordMap.put("rename", new Integer(SqlParserSymbols.KW_RENAME));
-    keywordMap.put("repeatable", new Integer(SqlParserSymbols.KW_REPEATABLE));
-    keywordMap.put("replace", new Integer(SqlParserSymbols.KW_REPLACE));
-    keywordMap.put("replication", new Integer(SqlParserSymbols.KW_REPLICATION));
-    keywordMap.put("restrict", new Integer(SqlParserSymbols.KW_RESTRICT));
-    keywordMap.put("returns", new Integer(SqlParserSymbols.KW_RETURNS));
-    keywordMap.put("revoke", new Integer(SqlParserSymbols.KW_REVOKE));
-    keywordMap.put("right", new Integer(SqlParserSymbols.KW_RIGHT));
-    keywordMap.put("rlike", new Integer(SqlParserSymbols.KW_RLIKE));
-    keywordMap.put("role", new Integer(SqlParserSymbols.KW_ROLE));
-    keywordMap.put("roles", new Integer(SqlParserSymbols.KW_ROLES));
-    keywordMap.put("row", new Integer(SqlParserSymbols.KW_ROW));
-    keywordMap.put("rows", new Integer(SqlParserSymbols.KW_ROWS));
-    keywordMap.put("schema", new Integer(SqlParserSymbols.KW_SCHEMA));
-    keywordMap.put("schemas", new Integer(SqlParserSymbols.KW_SCHEMAS));
-    keywordMap.put("select", new Integer(SqlParserSymbols.KW_SELECT));
-    keywordMap.put("semi", new Integer(SqlParserSymbols.KW_SEMI));
-    keywordMap.put("sequencefile", new Integer(SqlParserSymbols.KW_SEQUENCEFILE));
-    keywordMap.put("serdeproperties", new Integer(SqlParserSymbols.KW_SERDEPROPERTIES));
-    keywordMap.put("serialize_fn", new Integer(SqlParserSymbols.KW_SERIALIZE_FN));
-    keywordMap.put("set", new Integer(SqlParserSymbols.KW_SET));
-    keywordMap.put("show", new Integer(SqlParserSymbols.KW_SHOW));
-    keywordMap.put("smallint", new Integer(SqlParserSymbols.KW_SMALLINT));
-    keywordMap.put("sort", new Integer(SqlParserSymbols.KW_SORT));
-    keywordMap.put("stats", new Integer(SqlParserSymbols.KW_STATS));
-    keywordMap.put("stored", new Integer(SqlParserSymbols.KW_STORED));
-    keywordMap.put("straight_join", new Integer(SqlParserSymbols.KW_STRAIGHT_JOIN));
-    keywordMap.put("string", new Integer(SqlParserSymbols.KW_STRING));
-    keywordMap.put("struct", new Integer(SqlParserSymbols.KW_STRUCT));
-    keywordMap.put("symbol", new Integer(SqlParserSymbols.KW_SYMBOL));
-    keywordMap.put("table", new Integer(SqlParserSymbols.KW_TABLE));
-    keywordMap.put("tables", new Integer(SqlParserSymbols.KW_TABLES));
-    keywordMap.put("tablesample", new Integer(SqlParserSymbols.KW_TABLESAMPLE));
-    keywordMap.put("tblproperties", new Integer(SqlParserSymbols.KW_TBLPROPERTIES));
-    keywordMap.put("terminated", new Integer(SqlParserSymbols.KW_TERMINATED));
-    keywordMap.put("textfile", new Integer(SqlParserSymbols.KW_TEXTFILE));
-    keywordMap.put("then", new Integer(SqlParserSymbols.KW_THEN));
-    keywordMap.put("timestamp", new Integer(SqlParserSymbols.KW_TIMESTAMP));
-    keywordMap.put("tinyint", new Integer(SqlParserSymbols.KW_TINYINT));
-    keywordMap.put("to", new Integer(SqlParserSymbols.KW_TO));
-    keywordMap.put("true", new Integer(SqlParserSymbols.KW_TRUE));
-    keywordMap.put("truncate", new Integer(SqlParserSymbols.KW_TRUNCATE));
-    keywordMap.put("unbounded", new Integer(SqlParserSymbols.KW_UNBOUNDED));
-    keywordMap.put("uncached", new Integer(SqlParserSymbols.KW_UNCACHED));
-    keywordMap.put("union", new Integer(SqlParserSymbols.KW_UNION));
-    keywordMap.put("update", new Integer(SqlParserSymbols.KW_UPDATE));
-    keywordMap.put("update_fn", new Integer(SqlParserSymbols.KW_UPDATE_FN));
-    keywordMap.put("upsert", new Integer(SqlParserSymbols.KW_UPSERT));
-    keywordMap.put("use", new Integer(SqlParserSymbols.KW_USE));
-    keywordMap.put("using", new Integer(SqlParserSymbols.KW_USING));
-    keywordMap.put("values", new Integer(SqlParserSymbols.KW_VALUES));
-    keywordMap.put("varchar", new Integer(SqlParserSymbols.KW_VARCHAR));
-    keywordMap.put("view", new Integer(SqlParserSymbols.KW_VIEW));
-    keywordMap.put("when", new Integer(SqlParserSymbols.KW_WHEN));
-    keywordMap.put("where", new Integer(SqlParserSymbols.KW_WHERE));
-    keywordMap.put("with", new Integer(SqlParserSymbols.KW_WITH));
-  }
-
+  static Map<String, Integer> keywordMap;
+  // Reserved words are words that cannot be used as identifiers. It is a superset of
+  // keywords.
+  static Set<String> reservedWords;
   // map from token id to token description
-  public static final Map<Integer, String> tokenIdMap =
-      new HashMap<Integer, String>();
-  static {
-    Iterator<Map.Entry<String, Integer>> it = keywordMap.entrySet().iterator();
-    while (it.hasNext()) {
-      Map.Entry<String, Integer> pairs = (Map.Entry<String, Integer>) it.next();
-      tokenIdMap.put(pairs.getValue(), pairs.getKey().toUpperCase());
+  static HashMap<Integer, String> tokenIdMap;
+
+  public static void init(TReservedWordsVersion reservedWordsVersion) {
+    // initilize keywords
+    keywordMap = new LinkedHashMap<>();
+    keywordMap.put("&&", SqlParserSymbols.KW_AND);
+    keywordMap.put("add", SqlParserSymbols.KW_ADD);
+    keywordMap.put("aggregate", SqlParserSymbols.KW_AGGREGATE);
+    keywordMap.put("all", SqlParserSymbols.KW_ALL);
+    keywordMap.put("alter", SqlParserSymbols.KW_ALTER);
+    keywordMap.put("analytic", SqlParserSymbols.KW_ANALYTIC);
+    keywordMap.put("and", SqlParserSymbols.KW_AND);
+    keywordMap.put("anti", SqlParserSymbols.KW_ANTI);
+    keywordMap.put("api_version", SqlParserSymbols.KW_API_VERSION);
+    keywordMap.put("array", SqlParserSymbols.KW_ARRAY);
+    keywordMap.put("as", SqlParserSymbols.KW_AS);
+    keywordMap.put("asc", SqlParserSymbols.KW_ASC);
+    keywordMap.put("avro", SqlParserSymbols.KW_AVRO);
+    keywordMap.put("between", SqlParserSymbols.KW_BETWEEN);
+    keywordMap.put("bigint", SqlParserSymbols.KW_BIGINT);
+    keywordMap.put("binary", SqlParserSymbols.KW_BINARY);
+    keywordMap.put("block_size", SqlParserSymbols.KW_BLOCKSIZE);
+    keywordMap.put("boolean", SqlParserSymbols.KW_BOOLEAN);
+    keywordMap.put("by", SqlParserSymbols.KW_BY);
+    keywordMap.put("cached", SqlParserSymbols.KW_CACHED);
+    keywordMap.put("case", SqlParserSymbols.KW_CASE);
+    keywordMap.put("cascade", SqlParserSymbols.KW_CASCADE);
+    keywordMap.put("cast", SqlParserSymbols.KW_CAST);
+    keywordMap.put("change", SqlParserSymbols.KW_CHANGE);
+    keywordMap.put("char", SqlParserSymbols.KW_CHAR);
+    keywordMap.put("class", SqlParserSymbols.KW_CLASS);
+    keywordMap.put("close_fn", SqlParserSymbols.KW_CLOSE_FN);
+    keywordMap.put("column", SqlParserSymbols.KW_COLUMN);
+    keywordMap.put("columns", SqlParserSymbols.KW_COLUMNS);
+    keywordMap.put("comment", SqlParserSymbols.KW_COMMENT);
+    keywordMap.put("compression", SqlParserSymbols.KW_COMPRESSION);
+    keywordMap.put("compute", SqlParserSymbols.KW_COMPUTE);
+    keywordMap.put("create", SqlParserSymbols.KW_CREATE);
+    keywordMap.put("cross", SqlParserSymbols.KW_CROSS);
+    keywordMap.put("current", SqlParserSymbols.KW_CURRENT);
+    keywordMap.put("data", SqlParserSymbols.KW_DATA);
+    keywordMap.put("database", SqlParserSymbols.KW_DATABASE);
+    keywordMap.put("databases", SqlParserSymbols.KW_DATABASES);
+    keywordMap.put("date", SqlParserSymbols.KW_DATE);
+    keywordMap.put("datetime", SqlParserSymbols.KW_DATETIME);
+    keywordMap.put("decimal", SqlParserSymbols.KW_DECIMAL);
+    keywordMap.put("default", SqlParserSymbols.KW_DEFAULT);
+    keywordMap.put("delete", SqlParserSymbols.KW_DELETE);
+    keywordMap.put("delimited", SqlParserSymbols.KW_DELIMITED);
+    keywordMap.put("desc", SqlParserSymbols.KW_DESC);
+    keywordMap.put("describe", SqlParserSymbols.KW_DESCRIBE);
+    keywordMap.put("distinct", SqlParserSymbols.KW_DISTINCT);
+    keywordMap.put("div", SqlParserSymbols.KW_DIV);
+    keywordMap.put("double", SqlParserSymbols.KW_DOUBLE);
+    keywordMap.put("drop", SqlParserSymbols.KW_DROP);
+    keywordMap.put("else", SqlParserSymbols.KW_ELSE);
+    keywordMap.put("encoding", SqlParserSymbols.KW_ENCODING);
+    keywordMap.put("end", SqlParserSymbols.KW_END);
+    keywordMap.put("escaped", SqlParserSymbols.KW_ESCAPED);
+    keywordMap.put("exists", SqlParserSymbols.KW_EXISTS);
+    keywordMap.put("explain", SqlParserSymbols.KW_EXPLAIN);
+    keywordMap.put("extended", SqlParserSymbols.KW_EXTENDED);
+    keywordMap.put("external", SqlParserSymbols.KW_EXTERNAL);
+    keywordMap.put("false", SqlParserSymbols.KW_FALSE);
+    keywordMap.put("fields", SqlParserSymbols.KW_FIELDS);
+    keywordMap.put("fileformat", SqlParserSymbols.KW_FILEFORMAT);
+    keywordMap.put("files", SqlParserSymbols.KW_FILES);
+    keywordMap.put("finalize_fn", SqlParserSymbols.KW_FINALIZE_FN);
+    keywordMap.put("first", SqlParserSymbols.KW_FIRST);
+    keywordMap.put("float", SqlParserSymbols.KW_FLOAT);
+    keywordMap.put("following", SqlParserSymbols.KW_FOLLOWING);
+    keywordMap.put("for", SqlParserSymbols.KW_FOR);
+    keywordMap.put("format", SqlParserSymbols.KW_FORMAT);
+    keywordMap.put("formatted", SqlParserSymbols.KW_FORMATTED);
+    keywordMap.put("from", SqlParserSymbols.KW_FROM);
+    keywordMap.put("full", SqlParserSymbols.KW_FULL);
+    keywordMap.put("function", SqlParserSymbols.KW_FUNCTION);
+    keywordMap.put("functions", SqlParserSymbols.KW_FUNCTIONS);
+    keywordMap.put("grant", SqlParserSymbols.KW_GRANT);
+    keywordMap.put("group", SqlParserSymbols.KW_GROUP);
+    keywordMap.put("hash", SqlParserSymbols.KW_HASH);
+    keywordMap.put("having", SqlParserSymbols.KW_HAVING);
+    keywordMap.put("if", SqlParserSymbols.KW_IF);
+    keywordMap.put("ilike", SqlParserSymbols.KW_ILIKE);
+    keywordMap.put("ignore", SqlParserSymbols.KW_IGNORE);
+    keywordMap.put("in", SqlParserSymbols.KW_IN);
+    keywordMap.put("incremental", SqlParserSymbols.KW_INCREMENTAL);
+    keywordMap.put("init_fn", SqlParserSymbols.KW_INIT_FN);
+    keywordMap.put("inner", SqlParserSymbols.KW_INNER);
+    keywordMap.put("inpath", SqlParserSymbols.KW_INPATH);
+    keywordMap.put("insert", SqlParserSymbols.KW_INSERT);
+    keywordMap.put("int", SqlParserSymbols.KW_INT);
+    keywordMap.put("integer", SqlParserSymbols.KW_INT);
+    keywordMap.put("intermediate", SqlParserSymbols.KW_INTERMEDIATE);
+    keywordMap.put("interval", SqlParserSymbols.KW_INTERVAL);
+    keywordMap.put("into", SqlParserSymbols.KW_INTO);
+    keywordMap.put("invalidate", SqlParserSymbols.KW_INVALIDATE);
+    keywordMap.put("iregexp", SqlParserSymbols.KW_IREGEXP);
+    keywordMap.put("is", SqlParserSymbols.KW_IS);
+    keywordMap.put("join", SqlParserSymbols.KW_JOIN);
+    keywordMap.put("kudu", SqlParserSymbols.KW_KUDU);
+    keywordMap.put("last", SqlParserSymbols.KW_LAST);
+    keywordMap.put("left", SqlParserSymbols.KW_LEFT);
+    keywordMap.put("like", SqlParserSymbols.KW_LIKE);
+    keywordMap.put("limit", SqlParserSymbols.KW_LIMIT);
+    keywordMap.put("lines", SqlParserSymbols.KW_LINES);
+    keywordMap.put("load", SqlParserSymbols.KW_LOAD);
+    keywordMap.put("location", SqlParserSymbols.KW_LOCATION);
+    keywordMap.put("map", SqlParserSymbols.KW_MAP);
+    keywordMap.put("merge_fn", SqlParserSymbols.KW_MERGE_FN);
+    keywordMap.put("metadata", SqlParserSymbols.KW_METADATA);
+    keywordMap.put("not", SqlParserSymbols.KW_NOT);
+    keywordMap.put("null", SqlParserSymbols.KW_NULL);
+    keywordMap.put("nulls", SqlParserSymbols.KW_NULLS);
+    keywordMap.put("offset", SqlParserSymbols.KW_OFFSET);
+    keywordMap.put("on", SqlParserSymbols.KW_ON);
+    keywordMap.put("||", SqlParserSymbols.KW_OR);
+    keywordMap.put("or", SqlParserSymbols.KW_OR);
+    keywordMap.put("order", SqlParserSymbols.KW_ORDER);
+    keywordMap.put("outer", SqlParserSymbols.KW_OUTER);
+    keywordMap.put("over", SqlParserSymbols.KW_OVER);
+    keywordMap.put("overwrite", SqlParserSymbols.KW_OVERWRITE);
+    keywordMap.put("parquet", SqlParserSymbols.KW_PARQUET);
+    keywordMap.put("parquetfile", SqlParserSymbols.KW_PARQUETFILE);
+    keywordMap.put("partition", SqlParserSymbols.KW_PARTITION);
+    keywordMap.put("partitioned", SqlParserSymbols.KW_PARTITIONED);
+    keywordMap.put("partitions", SqlParserSymbols.KW_PARTITIONS);
+    keywordMap.put("preceding", SqlParserSymbols.KW_PRECEDING);
+    keywordMap.put("prepare_fn", SqlParserSymbols.KW_PREPARE_FN);
+    keywordMap.put("primary", SqlParserSymbols.KW_PRIMARY);
+    keywordMap.put("produced", SqlParserSymbols.KW_PRODUCED);
+    keywordMap.put("purge", SqlParserSymbols.KW_PURGE);
+    keywordMap.put("range", SqlParserSymbols.KW_RANGE);
+    keywordMap.put("rcfile", SqlParserSymbols.KW_RCFILE);
+    keywordMap.put("real", SqlParserSymbols.KW_DOUBLE);
+    keywordMap.put("recover", SqlParserSymbols.KW_RECOVER);
+    keywordMap.put("refresh", SqlParserSymbols.KW_REFRESH);
+    keywordMap.put("regexp", SqlParserSymbols.KW_REGEXP);
+    keywordMap.put("rename", SqlParserSymbols.KW_RENAME);
+    keywordMap.put("repeatable", SqlParserSymbols.KW_REPEATABLE);
+    keywordMap.put("replace", SqlParserSymbols.KW_REPLACE);
+    keywordMap.put("replication", SqlParserSymbols.KW_REPLICATION);
+    keywordMap.put("restrict", SqlParserSymbols.KW_RESTRICT);
+    keywordMap.put("returns", SqlParserSymbols.KW_RETURNS);
+    keywordMap.put("revoke", SqlParserSymbols.KW_REVOKE);
+    keywordMap.put("right", SqlParserSymbols.KW_RIGHT);
+    keywordMap.put("rlike", SqlParserSymbols.KW_RLIKE);
+    keywordMap.put("role", SqlParserSymbols.KW_ROLE);
+    keywordMap.put("roles", SqlParserSymbols.KW_ROLES);
+    keywordMap.put("row", SqlParserSymbols.KW_ROW);
+    keywordMap.put("rows", SqlParserSymbols.KW_ROWS);
+    keywordMap.put("schema", SqlParserSymbols.KW_SCHEMA);
+    keywordMap.put("schemas", SqlParserSymbols.KW_SCHEMAS);
+    keywordMap.put("select", SqlParserSymbols.KW_SELECT);
+    keywordMap.put("semi", SqlParserSymbols.KW_SEMI);
+    keywordMap.put("sequencefile", SqlParserSymbols.KW_SEQUENCEFILE);
+    keywordMap.put("serdeproperties", SqlParserSymbols.KW_SERDEPROPERTIES);
+    keywordMap.put("serialize_fn", SqlParserSymbols.KW_SERIALIZE_FN);
+    keywordMap.put("set", SqlParserSymbols.KW_SET);
+    keywordMap.put("show", SqlParserSymbols.KW_SHOW);
+    keywordMap.put("smallint", SqlParserSymbols.KW_SMALLINT);
+    keywordMap.put("sort", SqlParserSymbols.KW_SORT);
+    keywordMap.put("stats", SqlParserSymbols.KW_STATS);
+    keywordMap.put("stored", SqlParserSymbols.KW_STORED);
+    keywordMap.put("straight_join", SqlParserSymbols.KW_STRAIGHT_JOIN);
+    keywordMap.put("string", SqlParserSymbols.KW_STRING);
+    keywordMap.put("struct", SqlParserSymbols.KW_STRUCT);
+    keywordMap.put("symbol", SqlParserSymbols.KW_SYMBOL);
+    keywordMap.put("table", SqlParserSymbols.KW_TABLE);
+    keywordMap.put("tables", SqlParserSymbols.KW_TABLES);
+    keywordMap.put("tablesample", SqlParserSymbols.KW_TABLESAMPLE);
+    keywordMap.put("tblproperties", SqlParserSymbols.KW_TBLPROPERTIES);
+    keywordMap.put("terminated", SqlParserSymbols.KW_TERMINATED);
+    keywordMap.put("textfile", SqlParserSymbols.KW_TEXTFILE);
+    keywordMap.put("then", SqlParserSymbols.KW_THEN);
+    keywordMap.put("timestamp", SqlParserSymbols.KW_TIMESTAMP);
+    keywordMap.put("tinyint", SqlParserSymbols.KW_TINYINT);
+    keywordMap.put("to", SqlParserSymbols.KW_TO);
+    keywordMap.put("true", SqlParserSymbols.KW_TRUE);
+    keywordMap.put("truncate", SqlParserSymbols.KW_TRUNCATE);
+    keywordMap.put("unbounded", SqlParserSymbols.KW_UNBOUNDED);
+    keywordMap.put("uncached", SqlParserSymbols.KW_UNCACHED);
+    keywordMap.put("union", SqlParserSymbols.KW_UNION);
+    keywordMap.put("unknown", SqlParserSymbols.KW_UNKNOWN);
+    keywordMap.put("update", SqlParserSymbols.KW_UPDATE);
+    keywordMap.put("update_fn", SqlParserSymbols.KW_UPDATE_FN);
+    keywordMap.put("upsert", SqlParserSymbols.KW_UPSERT);
+    keywordMap.put("use", SqlParserSymbols.KW_USE);
+    keywordMap.put("using", SqlParserSymbols.KW_USING);
+    keywordMap.put("values", SqlParserSymbols.KW_VALUES);
+    keywordMap.put("varchar", SqlParserSymbols.KW_VARCHAR);
+    keywordMap.put("view", SqlParserSymbols.KW_VIEW);
+    keywordMap.put("when", SqlParserSymbols.KW_WHEN);
+    keywordMap.put("where", SqlParserSymbols.KW_WHERE);
+    keywordMap.put("with", SqlParserSymbols.KW_WITH);
+
+    // Initilize tokenIdMap for error reporting
+    tokenIdMap = new HashMap<>();
+    for (Map.Entry<String, Integer> entry : keywordMap.entrySet()) {
+      tokenIdMap.put(entry.getValue(), entry.getKey().toUpperCase());
     }
-
     // add non-keyword tokens
-    tokenIdMap.put(new Integer(SqlParserSymbols.IDENT), "IDENTIFIER");
-    tokenIdMap.put(new Integer(SqlParserSymbols.COLON), ":");
-    tokenIdMap.put(new Integer(SqlParserSymbols.SEMICOLON), ";");
-    tokenIdMap.put(new Integer(SqlParserSymbols.COMMA), "COMMA");
-    tokenIdMap.put(new Integer(SqlParserSymbols.BITNOT), "~");
-    tokenIdMap.put(new Integer(SqlParserSymbols.LPAREN), "(");
-    tokenIdMap.put(new Integer(SqlParserSymbols.RPAREN), ")");
-    tokenIdMap.put(new Integer(SqlParserSymbols.LBRACKET), "[");
-    tokenIdMap.put(new Integer(SqlParserSymbols.RBRACKET), "]");
-    tokenIdMap.put(new Integer(SqlParserSymbols.DECIMAL_LITERAL), "DECIMAL LITERAL");
-    tokenIdMap.put(new Integer(SqlParserSymbols.INTEGER_LITERAL), "INTEGER LITERAL");
-    tokenIdMap.put(new Integer(SqlParserSymbols.NOT), "!");
-    tokenIdMap.put(new Integer(SqlParserSymbols.LESSTHAN), "<");
-    tokenIdMap.put(new Integer(SqlParserSymbols.GREATERTHAN), ">");
-    tokenIdMap.put(new Integer(SqlParserSymbols.UNMATCHED_STRING_LITERAL),
-        "UNMATCHED STRING LITERAL");
-    tokenIdMap.put(new Integer(SqlParserSymbols.MOD), "%");
-    tokenIdMap.put(new Integer(SqlParserSymbols.ADD), "+");
-    tokenIdMap.put(new Integer(SqlParserSymbols.DIVIDE), "/");
-    tokenIdMap.put(new Integer(SqlParserSymbols.EQUAL), "=");
-    tokenIdMap.put(new Integer(SqlParserSymbols.STAR), "*");
-    tokenIdMap.put(new Integer(SqlParserSymbols.BITOR), "|");
-    tokenIdMap.put(new Integer(SqlParserSymbols.DOT), ".");
-    tokenIdMap.put(new Integer(SqlParserSymbols.DOTDOTDOT), "...");
-    tokenIdMap.put(new Integer(SqlParserSymbols.STRING_LITERAL), "STRING LITERAL");
-    tokenIdMap.put(new Integer(SqlParserSymbols.EOF), "EOF");
-    tokenIdMap.put(new Integer(SqlParserSymbols.SUBTRACT), "-");
-    tokenIdMap.put(new Integer(SqlParserSymbols.BITAND), "&");
-    tokenIdMap.put(new Integer(SqlParserSymbols.UNEXPECTED_CHAR), "Unexpected character");
-    tokenIdMap.put(new Integer(SqlParserSymbols.BITXOR), "^");
-    tokenIdMap.put(new Integer(SqlParserSymbols.NUMERIC_OVERFLOW), "NUMERIC OVERFLOW");
-    tokenIdMap.put(new Integer(SqlParserSymbols.EMPTY_IDENT), "EMPTY IDENTIFIER");
+    tokenIdMap.put(SqlParserSymbols.IDENT, "IDENTIFIER");
+    tokenIdMap.put(SqlParserSymbols.COLON, ":");
+    tokenIdMap.put(SqlParserSymbols.SEMICOLON, ";");
+    tokenIdMap.put(SqlParserSymbols.COMMA, "COMMA");
+    tokenIdMap.put(SqlParserSymbols.BITNOT, "~");
+    tokenIdMap.put(SqlParserSymbols.LPAREN, "(");
+    tokenIdMap.put(SqlParserSymbols.RPAREN, ")");
+    tokenIdMap.put(SqlParserSymbols.LBRACKET, "[");
+    tokenIdMap.put(SqlParserSymbols.RBRACKET, "]");
+    tokenIdMap.put(SqlParserSymbols.DECIMAL_LITERAL, "DECIMAL LITERAL");
+    tokenIdMap.put(SqlParserSymbols.INTEGER_LITERAL, "INTEGER LITERAL");
+    tokenIdMap.put(SqlParserSymbols.NOT, "!");
+    tokenIdMap.put(SqlParserSymbols.LESSTHAN, "<");
+    tokenIdMap.put(SqlParserSymbols.GREATERTHAN, ">");
+    tokenIdMap.put(SqlParserSymbols.UNMATCHED_STRING_LITERAL, "UNMATCHED STRING LITERAL");
+    tokenIdMap.put(SqlParserSymbols.MOD, "%");
+    tokenIdMap.put(SqlParserSymbols.ADD, "+");
+    tokenIdMap.put(SqlParserSymbols.DIVIDE, "/");
+    tokenIdMap.put(SqlParserSymbols.EQUAL, "=");
+    tokenIdMap.put(SqlParserSymbols.STAR, "*");
+    tokenIdMap.put(SqlParserSymbols.BITOR, "|");
+    tokenIdMap.put(SqlParserSymbols.DOT, ".");
+    tokenIdMap.put(SqlParserSymbols.DOTDOTDOT, "...");
+    tokenIdMap.put(SqlParserSymbols.STRING_LITERAL, "STRING LITERAL");
+    tokenIdMap.put(SqlParserSymbols.EOF, "EOF");
+    tokenIdMap.put(SqlParserSymbols.SUBTRACT, "-");
+    tokenIdMap.put(SqlParserSymbols.BITAND, "&");
+    tokenIdMap.put(SqlParserSymbols.UNEXPECTED_CHAR, "Unexpected character");
+    tokenIdMap.put(SqlParserSymbols.BITXOR, "^");
+    tokenIdMap.put(SqlParserSymbols.NUMERIC_OVERFLOW, "NUMERIC OVERFLOW");
+    tokenIdMap.put(SqlParserSymbols.EMPTY_IDENT, "EMPTY IDENTIFIER");
+
+    // Initilize reservedWords. For impala 2.11, reserved words = keywords.
+    if (reservedWordsVersion == TReservedWordsVersion.IMPALA_2_11) {
+      reservedWords = keywordMap.keySet();
+      return;
+    }
+    // For impala 3.0, reserved words = keywords + sql16ReservedWords - builtinFunctions
+    // - whitelist
+    // unused reserved words = reserved words - keywords. These words are reserved for
+    // forward compatibility purposes.
+    reservedWords = new HashSet<>(keywordMap.keySet());
+    // Add SQL:2016 reserved words
+    reservedWords.addAll(Arrays.asList(new String[] {
+        "abs", "acos", "allocate", "any", "are", "array_agg", "array_max_cardinality",
+        "asensitive", "asin", "asymmetric", "at", "atan", "atomic", "authorization",
+        "avg", "begin", "begin_frame", "begin_partition", "blob", "both", "call",
+        "called", "cardinality", "cascaded", "ceil", "ceiling", "char_length",
+        "character", "character_length", "check", "classifier", "clob", "close",
+        "coalesce", "collate", "collect", "commit", "condition", "connect", "constraint",
+        "contains", "convert", "copy", "corr", "corresponding", "cos", "cosh", "count",
+        "covar_pop", "covar_samp", "cube", "cume_dist", "current_catalog", "current_date",
+        "current_default_transform_group", "current_path", "current_path", "current_role",
+        "current_role", "current_row", "current_schema", "current_time",
+        "current_timestamp", "current_transform_group_for_type", "current_user", "cursor",
+        "cycle", "day", "deallocate", "dec", "decfloat", "declare", "define",
+        "dense_rank", "deref", "deterministic", "disconnect", "dynamic", "each",
+        "element", "empty", "end-exec", "end_frame", "end_partition", "equals", "escape",
+        "every", "except", "exec", "execute", "exp", "extract", "fetch", "filter",
+        "first_value", "floor", "foreign", "frame_row", "free", "fusion", "get", "global",
+        "grouping", "groups", "hold", "hour", "identity", "indicator", "initial", "inout",
+        "insensitive", "integer", "intersect", "intersection", "json_array",
+        "json_arrayagg", "json_exists", "json_object", "json_objectagg", "json_query",
+        "json_table", "json_table_primitive", "json_value", "lag", "language", "large",
+        "last_value", "lateral", "lead", "leading", "like_regex", "listagg", "ln",
+        "local", "localtime", "localtimestamp", "log", "log10 ", "lower", "match",
+        "match_number", "match_recognize", "matches", "max", "member", "merge", "method",
+        "min", "minute", "mod", "modifies", "module", "month", "multiset", "national",
+        "natural", "nchar", "nclob", "new", "no", "none", "normalize", "nth_value",
+        "ntile", "nullif", "numeric", "occurrences_regex", "octet_length", "of", "old",
+        "omit", "one", "only", "open", "out", "overlaps", "overlay", "parameter",
+        "pattern", "per", "percent", "percent_rank", "percentile_cont", "percentile_disc",
+        "period", "portion", "position", "position_regex", "power", "precedes",
+        "precision", "prepare", "procedure", "ptf", "rank", "reads", "real", "recursive",
+        "ref", "references", "referencing", "regr_avgx", "regr_avgy", "regr_count",
+        "regr_intercept", "regr_r2", "regr_slope", "regr_sxx", "regr_sxy", "regr_syy",
+        "release", "result", "return", "rollback", "rollup", "row_number", "running",
+        "savepoint", "scope", "scroll", "search", "second", "seek", "sensitive",
+        "session_user", "similar", "sin", "sinh", "skip", "some", "specific",
+        "specifictype", "sql", "sqlexception", "sqlstate", "sqlwarning", "sqrt", "start",
+        "static", "stddev_pop", "stddev_samp", "submultiset", "subset", "substring",
+        "substring_regex", "succeeds", "sum", "symmetric", "system", "system_time",
+        "system_user", "tan", "tanh", "time", "timezone_hour", "timezone_minute",
+        "trailing", "translate", "translate_regex", "translation", "treat", "trigger",
+        "trim", "trim_array", "uescape", "unique", "unknown", "unnest", "update  ",
+        "upper", "user", "value", "value_of", "var_pop", "var_samp", "varbinary",
+        "varying", "versioning", "whenever", "width_bucket", "window", "within",
+        "without", "year"}));
+    // Remove impala builtin function names
+    reservedWords.removeAll(new BuiltinsDb(BUILTINS_DB).getAllFunctions().keySet());
+    // Remove whitelist words. These words might be heavily used in production, and
+    // impala is unlikely to implement SQL features around these words in the near future.
+    reservedWords.removeAll(Arrays.asList(new String[] {
+        // time units
+        "year", "month", "day", "hour", "minute", "second",
+        "begin", "call", "check", "classifier", "close", "identity", "language",
+        "localtime", "member", "module", "new", "nullif", "old", "open", "parameter",
+        "period", "result", "return", "sql", "start", "system", "time", "user", "value"
+    }));
   }
 
-  public static boolean isKeyword(Integer tokenId) {
-    String token = tokenIdMap.get(tokenId);
-    if (token == null) return false;
-    return keywordMap.containsKey(token.toLowerCase());
+  static {
+    // Default-initilize the static members for FE tests. Outside of FE tests, init() is
+    // called again in BackendConfig.create() once the backend configuration is passed to
+    // the FE, overwriting this initilization.
+    init(TReservedWordsVersion.IMPALA_3_0);
   }
 
-  public static boolean isKeyword(String ident) {
-    return keywordMap.containsKey(ident.toLowerCase());
+  static boolean isReserved(String token) {
+    return reservedWords.contains(token.toLowerCase());
+  }
+
+  static boolean isKeyword(Integer tokenId) {
+    String token = tokenIdMap.get(tokenId);
+    return token != null && keywordMap.containsKey(token.toLowerCase());
   }
 
   private Symbol newToken(int id, Object value) {
@@ -384,23 +463,19 @@ EndOfLineComment = "--" !({HintContent}|{ContainsLineTerminator}) {LineTerminato
 // The rules for IntegerLiteral and DecimalLiteral are the same, but it is useful
 // to distinguish them, e.g., so the Parser can use integer literals without analysis.
 {IntegerLiteral} {
-  BigDecimal val = null;
   try {
-    val = new BigDecimal(yytext());
+    return newToken(SqlParserSymbols.INTEGER_LITERAL, new BigDecimal(yytext()));
   } catch (NumberFormatException e) {
     return newToken(SqlParserSymbols.NUMERIC_OVERFLOW, yytext());
   }
-  return newToken(SqlParserSymbols.INTEGER_LITERAL, val);
 }
 
 {DecimalLiteral} {
-  BigDecimal val = null;
   try {
-    val = new BigDecimal(yytext());
+    return newToken(SqlParserSymbols.DECIMAL_LITERAL, new BigDecimal(yytext()));
   } catch (NumberFormatException e) {
     return newToken(SqlParserSymbols.NUMERIC_OVERFLOW, yytext());
   }
-  return newToken(SqlParserSymbols.DECIMAL_LITERAL, val);
 }
 
 {QuotedIdentifier} {
@@ -416,7 +491,9 @@ EndOfLineComment = "--" !({HintContent}|{ContainsLineTerminator}) {LineTerminato
   String text = yytext();
   Integer kw_id = keywordMap.get(text.toLowerCase());
   if (kw_id != null) {
-    return newToken(kw_id.intValue(), text);
+    return newToken(kw_id, text);
+  } else if (isReserved(text)) {
+    return newToken(SqlParserSymbols.UNUSED_RESERVED_WORD, text);
   } else {
     return newToken(SqlParserSymbols.IDENT, text);
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 4124493..1c02306 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -1276,12 +1276,10 @@ public class AnalyzeDDLTest extends FrontendTestBase {
 
     // Test tablesample clause with extrapolation enabled/disabled. Replace/restore the
     // static backend config for this test to control stats extrapolation.
-    BackendConfig origInstance = BackendConfig.INSTANCE;
+    TBackendGflags gflags = BackendConfig.INSTANCE.getBackendCfg();
+    boolean origEnableStatsExtrapolation = gflags.isEnable_stats_extrapolation();
     try {
-      TBackendGflags testGflags = new TBackendGflags();
-      testGflags.setEnable_stats_extrapolation(true);
-      BackendConfig.create(testGflags);
-
+      gflags.setEnable_stats_extrapolation(true);
       // Test different COMPUTE_STATS_MIN_SAMPLE_BYTES.
       TQueryOptions queryOpts = new TQueryOptions();
 
@@ -1346,13 +1344,12 @@ public class AnalyzeDDLTest extends FrontendTestBase {
           "compute stats functional.alltypes_datasource tablesample system (3)",
           "TABLESAMPLE is only supported on HDFS tables.");
 
-      testGflags.setEnable_stats_extrapolation(false);
-      BackendConfig.create(testGflags);
+      gflags.setEnable_stats_extrapolation(false);
       AnalysisError("compute stats functional.alltypes tablesample system (10)",
           "COMPUTE STATS TABLESAMPLE requires --enable_stats_extrapolation=true. " +
           "Stats extrapolation is currently disabled.");
     } finally {
-      BackendConfig.INSTANCE = origInstance;
+      gflags.setEnable_stats_extrapolation(origEnableStatsExtrapolation);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index d3311ba..65639b2 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -1048,11 +1048,11 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     AnalysisError("select sum(id) over(order by id) from functional.alltypes having 1",
         "HAVING clause must not contain analytic expressions: " +
         "sum(id) OVER (ORDER BY id ASC)");
-    AnalyzesOk("with w_test as (select '1' as one, 2 as two, '3' as three) " +
-        "select one as one, substring(cast(two as string), 1, 1) as two, " +
+    AnalyzesOk("with w_test as (select '1' as `one`, 2 as two, '3' as three) " +
+        "select `one` as `one`, substring(cast(two as string), 1, 1) as two, " +
         "three as three, count(1) as cnt " +
         "from w_test " +
-        "group by one, substring(cast(two as string), 1, 1), three");
+        "group by `one`, substring(cast(two as string), 1, 1), three");
     // Constant exprs should not be interpreted as ordinals
     AnalyzesOk("select int_col, count(*) from functional.alltypes group by 1, 1 * 2");
     AnalyzesOk("select int_col, bigint_col from functional.alltypes order by 1 + 4");

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index 2b9da03..41124f5 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -19,13 +19,13 @@ package org.apache.impala.analysis;
 
 import static org.junit.Assert.fail;
 
-import org.junit.Ignore;
-import org.junit.Test;
-
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FrontendTestBase;
 import org.apache.impala.testutil.TestUtils;
+import org.junit.Ignore;
+import org.junit.Test;
+
 import com.google.common.base.Preconditions;
 
 // TODO: Expand this test, in particular, because view creation relies
@@ -584,9 +584,9 @@ public class ToSqlTest extends FrontendTestBase {
 
       // Table hint
       testToSql(String.format(
-          "select * from functional.alltypes at %sschedule_random_replica%s", prefix,
+          "select * from functional.alltypes atp %sschedule_random_replica%s", prefix,
           suffix),
-          "SELECT * FROM functional.alltypes at\n-- +schedule_random_replica\n");
+          "SELECT * FROM functional.alltypes atp\n-- +schedule_random_replica\n");
       testToSql(String.format(
           "select * from functional.alltypes %sschedule_random_replica%s", prefix,
           suffix),
@@ -597,9 +597,9 @@ public class ToSqlTest extends FrontendTestBase {
           "SELECT * FROM functional.alltypes\n-- +schedule_random_replica," +
           "schedule_disk_local\n");
       testToSql(String.format(
-          "select c1 from (select at.tinyint_col as c1 from functional.alltypes at " +
+          "select c1 from (select atp.tinyint_col as c1 from functional.alltypes atp " +
           "%sschedule_random_replica%s) s1", prefix, suffix),
-          "SELECT c1 FROM (SELECT at.tinyint_col c1 FROM functional.alltypes at\n-- +" +
+          "SELECT c1 FROM (SELECT atp.tinyint_col c1 FROM functional.alltypes atp\n-- +" +
           "schedule_random_replica\n) s1");
 
       // Select-list hint. The legacy-style hint has no prefix and suffix.

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
index e6b71cd..9486a7f 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
@@ -24,8 +24,8 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -33,15 +33,14 @@ import java.util.Set;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.junit.Test;
-
 import org.apache.impala.analysis.FunctionName;
-import org.apache.impala.analysis.HdfsUri;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NumericLiteral;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.testutil.CatalogServiceTestCatalog;
 import org.apache.impala.thrift.TFunctionBinaryType;
+import org.junit.Test;
+
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -623,9 +622,8 @@ public class CatalogTest {
     dbParams.put(badFnKey, badFnVal);
     Db db = catalog_.getDb(dbName);
     assertEquals(db, null);
-    db = new Db(dbName, catalog_,
-        new org.apache.hadoop.hive.metastore.api.Database(dbName,
-        "", "", dbParams));
+    db = new Db(dbName,
+        new org.apache.hadoop.hive.metastore.api.Database(dbName, "", "", dbParams));
     catalog_.addDb(db);
     db = catalog_.getDb(dbName);
     assertTrue(db != null);

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index 49b1f6b..c087f5a 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -153,7 +153,7 @@ public class FrontendTestBase {
   protected Db addTestDb(String dbName, String comment) {
     Db db = catalog_.getDb(dbName);
     Preconditions.checkState(db == null, "Test db must not already exist.");
-    db = new Db(dbName, catalog_, new org.apache.hadoop.hive.metastore.api.Database(
+    db = new Db(dbName, new org.apache.hadoop.hive.metastore.api.Database(
         dbName, comment, "", Collections.<String, String>emptyMap()));
     catalog_.addDb(db);
     testDbs_.add(db);

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/fe/src/test/java/org/apache/impala/planner/StatsExtrapolationTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/StatsExtrapolationTest.java b/fe/src/test/java/org/apache/impala/planner/StatsExtrapolationTest.java
index 92b2f93..bee6a32 100644
--- a/fe/src/test/java/org/apache/impala/planner/StatsExtrapolationTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/StatsExtrapolationTest.java
@@ -79,13 +79,11 @@ public class StatsExtrapolationTest extends FrontendTestBase {
     addTestDb("extrap_stats", null);
     Table tbl = addTestTable("create table extrap_stats.t (i int)");
 
-    // Replace/restore the static backend config for this test.
-    BackendConfig origInstance = BackendConfig.INSTANCE;
+    // Modify/restore the backend config for this test.
+    TBackendGflags gflags = BackendConfig.INSTANCE.getBackendCfg();
+    boolean origEnableStatsExtrapolation = gflags.isEnable_stats_extrapolation();
     try {
-      // Create a fake config with extrapolation enabled.
-      TBackendGflags testGflags = new TBackendGflags();
-      testGflags.setEnable_stats_extrapolation(true);
-      BackendConfig.create(testGflags);
+      gflags.setEnable_stats_extrapolation(true);
 
       // Both stats are set to a meaningful value.
       runTest(tbl, 100L, 1000L, 0, 0);
@@ -131,7 +129,7 @@ public class StatsExtrapolationTest extends FrontendTestBase {
       runTest(tbl, 100L, 1000L, -1, -1);
       runTest(tbl, 100L, 1000L, Long.MIN_VALUE, -1);
     } finally {
-      BackendConfig.INSTANCE = origInstance;
+      gflags.setEnable_stats_extrapolation(origEnableStatsExtrapolation);
     }
   }
 
@@ -140,13 +138,11 @@ public class StatsExtrapolationTest extends FrontendTestBase {
     addTestDb("extrap_stats", null);
     Table tbl = addTestTable("create table extrap_stats.t (i int)");
 
-    // Replace/restore the static backend config for this test.
-    BackendConfig origInstance = BackendConfig.INSTANCE;
+    // Modify/restore the backend config for this test.
+    TBackendGflags gflags = BackendConfig.INSTANCE.getBackendCfg();
+    boolean origEnableStatsExtrapolation = gflags.isEnable_stats_extrapolation();
     try {
-      // Create a fake config with extrapolation disabled.
-      TBackendGflags testGflags = new TBackendGflags();
-      testGflags.setEnable_stats_extrapolation(false);
-      BackendConfig.create(testGflags);
+      gflags.setEnable_stats_extrapolation(false);
 
       // Always expect -1 even with legitimate stats.
       runTest(tbl, 100L, 1000L, 0, -1);
@@ -155,7 +151,7 @@ public class StatsExtrapolationTest extends FrontendTestBase {
       runTest(tbl, 100L, 1000L, Long.MAX_VALUE, -1);
       runTest(tbl, 100L, 1000L, -100, -1);
     } finally {
-      BackendConfig.INSTANCE = origInstance;
+      gflags.setEnable_stats_extrapolation(origEnableStatsExtrapolation);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/fe/src/test/java/org/apache/impala/service/JdbcTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/service/JdbcTest.java b/fe/src/test/java/org/apache/impala/service/JdbcTest.java
index 1a907f8..28b032b 100644
--- a/fe/src/test/java/org/apache/impala/service/JdbcTest.java
+++ b/fe/src/test/java/org/apache/impala/service/JdbcTest.java
@@ -36,15 +36,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
 import org.apache.impala.analysis.CreateTableStmt;
 import org.apache.impala.analysis.SqlParser;
 import org.apache.impala.analysis.SqlScanner;
 import org.apache.impala.testutil.ImpalaJdbcClient;
 import org.apache.impala.util.Metrics;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 import com.google.common.collect.Lists;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/testdata/workloads/functional-query/queries/QueryTest/empty-build-joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/empty-build-joins.test b/testdata/workloads/functional-query/queries/QueryTest/empty-build-joins.test
index 3aa9994..19f5ea6 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/empty-build-joins.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/empty-build-joins.test
@@ -1,9 +1,9 @@
 ====
 ---- QUERY
 # Inner equi-join - executes with hash join.
-select straight_join at.id
-from alltypes at
-  inner join functional.alltypestiny att on at.id = att.id
+select straight_join atp.id
+from alltypes atp
+  inner join functional.alltypestiny att on atp.id = att.id
 where att.int_col = 999
 ---- RESULTS
 ---- TYPES
@@ -11,9 +11,9 @@ INT
 ====
 ---- QUERY
 # Right equi-join - executes with hash join.
-select straight_join at.id
-from alltypes at
-  right join functional.alltypestiny att on at.id = att.id
+select straight_join atp.id
+from alltypes atp
+  right join functional.alltypestiny att on atp.id = att.id
 where att.int_col = 999
 ---- RESULTS
 ---- TYPES
@@ -21,11 +21,11 @@ INT
 ====
 ---- QUERY
 # Left equi-join - executes with hash join.
-select straight_join at.id
-from alltypes at
+select straight_join atp.id
+from alltypes atp
   left join (
-    select * from functional.alltypestiny where int_col = 999) att on at.id = att.id
-order by at.id desc
+    select * from functional.alltypestiny where int_col = 999) att on atp.id = att.id
+order by atp.id desc
 limit 5
 ---- RESULTS
 7299
@@ -38,11 +38,11 @@ INT
 ====
 ---- QUERY
 # Full outer equi-join - executes with hash join.
-select straight_join at.id
-from alltypes at
+select straight_join atp.id
+from alltypes atp
   full outer join (
-    select * from functional.alltypestiny where int_col = 999) att on at.id = att.id
-order by at.id desc
+    select * from functional.alltypestiny where int_col = 999) att on atp.id = att.id
+order by atp.id desc
 limit 5
 ---- RESULTS
 7299
@@ -55,8 +55,8 @@ INT
 ====
 ---- QUERY
 # Left semi equi-join - executes with hash join.
-select straight_join at.id
-from alltypes at
+select straight_join atp.id
+from alltypes atp
 where id in (
   select id from functional.alltypestiny
   where id = 999)
@@ -66,17 +66,17 @@ INT
 ====
 ---- QUERY
 # Right semi equi-join - executes with hash join.
-select straight_join at.id
+select straight_join atp.id
 from (select * from functional.alltypestiny att where int_col = 999) att
-  right semi join alltypes at on at.id = att.id
+  right semi join alltypes atp on atp.id = att.id
 ---- RESULTS
 ---- TYPES
 INT
 ====
 ---- QUERY
 # Left NAAJ equi-join - executes with hash join.
-select straight_join at.id
-from alltypes at
+select straight_join atp.id
+from alltypes atp
 where id not in (
   select id from functional.alltypestiny
   where id = 999)
@@ -93,11 +93,11 @@ INT
 ====
 ---- QUERY
 # Left anti equi-join - executes with hash join.
-select straight_join at.id
-from alltypes at
+select straight_join atp.id
+from alltypes atp
 where not exists (
   select id from functional.alltypestiny att
-    where id = 999 and att.id = at.id)
+    where id = 999 and att.id = atp.id)
 order by id desc
 limit 5
 ---- RESULTS
@@ -111,10 +111,10 @@ INT
 ====
 ---- QUERY
 # Right anti equi-join - executes with hash join.
-select straight_join at.id
+select straight_join atp.id
 from (select * from functional.alltypestiny att where int_col = 999) att
-  right anti join alltypes at on at.id = att.id
-order by at.id desc
+  right anti join alltypes atp on atp.id = att.id
+order by atp.id desc
 limit 5
 ---- RESULTS
 7299
@@ -127,9 +127,9 @@ INT
 ====
 ---- QUERY
 # Inner non-equi-join - executes with nested loop join.
-select straight_join at.id
-from alltypes at
-  inner join functional.alltypestiny att on at.id < att.id
+select straight_join atp.id
+from alltypes atp
+  inner join functional.alltypestiny att on atp.id < att.id
 where att.int_col = 999
 ---- RESULTS
 ---- TYPES
@@ -137,8 +137,8 @@ INT
 ====
 ---- QUERY
 # Cross join - executes with nested loop join.
-select straight_join at.id
-from alltypes at, functional.alltypestiny att
+select straight_join atp.id
+from alltypes atp, functional.alltypestiny att
 where att.int_col = 999
 ---- RESULTS
 ---- TYPES
@@ -146,11 +146,11 @@ INT
 ====
 ---- QUERY
 # Left non-equi-join - executes with nested loop join.
-select straight_join at.id
-from alltypes at
+select straight_join atp.id
+from alltypes atp
   left join (
-    select * from functional.alltypestiny where int_col = 999) att on at.id < att.id
-order by at.id desc
+    select * from functional.alltypestiny where int_col = 999) att on atp.id < att.id
+order by atp.id desc
 limit 5
 ---- RESULTS
 7299
@@ -163,11 +163,11 @@ INT
 ====
 ---- QUERY
 # Left semi non-equi-join - executes with nested loop join.
-select straight_join at.id
-from alltypes at
+select straight_join atp.id
+from alltypes atp
    left semi join (
-     select * from functional.alltypestiny att where int_col = 999) att on at.id < att.id
-order by at.id desc
+     select * from functional.alltypestiny att where int_col = 999) att on atp.id < att.id
+order by atp.id desc
 limit 5
 ---- RESULTS
 ---- TYPES
@@ -175,10 +175,10 @@ INT
 ====
 ---- QUERY
 # Left anti non-equi-join - executes with nested loop join.
-select straight_join at.id
-from alltypes at left anti join (
+select straight_join atp.id
+from alltypes atp left anti join (
   select * from functional.alltypestiny att
-  where id = 999) att on at.id < att.id
+  where id = 999) att on atp.id < att.id
 order by id desc
 limit 5
 ---- RESULTS

http://git-wip-us.apache.org/repos/asf/impala/blob/f0b3d9d1/testdata/workloads/functional-query/queries/QueryTest/exprs.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/exprs.test b/testdata/workloads/functional-query/queries/QueryTest/exprs.test
index 71759f3..67494a6 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/exprs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/exprs.test
@@ -2355,8 +2355,8 @@ select dayofyear(trunc('2014-11-11', string_col)) from functional.alltypestiny l
 Invalid Truncate Unit: 0
 ====
 ---- QUERY
-select regexp_match_count(tmp.str, tmp.pattern) from (values
-('aaa' as str, 'a' as pattern),
+select regexp_match_count(tmp.str, tmp.`pattern`) from (values
+('aaa' as str, 'a' as `pattern`),
 ('aaa', 'aa'),
 ('aaaa', 'aa'),
 ('', ''),
@@ -2401,8 +2401,8 @@ select regexp_match_count(tmp.str, tmp.pattern) from (values
 int
 ====
 ---- QUERY
-select regexp_match_count(tmp.str, tmp.pattern, tmp.start_pos, tmp.params) from (values
-('aaa' as str, 'A' as pattern, 1 as start_pos, 'i' as params),
+select regexp_match_count(tmp.str, tmp.`pattern`, tmp.start_pos, tmp.params) from (values
+('aaa' as str, 'A' as `pattern`, 1 as start_pos, 'i' as params),
 ('aaa', 'A', 1, 'c'),
 ('this\nis\nnewline', '.*', 1, ''),
 ('this\nis\nnewline', '.*', 1, 'n'),
@@ -2435,16 +2435,16 @@ select regexp_match_count(tmp.str, tmp.pattern, tmp.start_pos, tmp.params) from
 int
 ====
 ---- QUERY
-select regexp_match_count(tmp.str, tmp.pattern, tmp.start_pos, tmp.params) from (values
-('a' as str, 'a' as pattern, -1 as start_pos, '' as params),
+select regexp_match_count(tmp.str, tmp.`pattern`, tmp.start_pos, tmp.params) from (values
+('a' as str, 'a' as `pattern`, -1 as start_pos, '' as params),
 ('foobar', 'foobar', 1, 'i'),
 ('iPhone\niPad\niPod', '^I.*$', 1, 'imn')) as tmp
 ---- CATCH
 Illegal starting position -1
 ====
 ---- QUERY
-select regexp_match_count(tmp.str, tmp.pattern, tmp.start_pos, tmp.params) from (values
-('a' as str, 'a' as pattern, 1 as start_pos, 'xyz' as params),
+select regexp_match_count(tmp.str, tmp.`pattern`, tmp.start_pos, tmp.params) from (values
+('a' as str, 'a' as `pattern`, 1 as start_pos, 'xyz' as params),
 ('foobar', 'foobar', 1, 'i'),
 ('iPhone\niPad\niPod', '^I.*$', 1, 'imn')) as tmp
 ---- CATCH


[3/3] impala git commit: IMPALA-6346: Potential deadlock in KrpcDataStreamMgr

Posted by ph...@apache.org.
IMPALA-6346: Potential deadlock in KrpcDataStreamMgr

In KrpcDataStreamMgr::CreateRecvr() we take the lock_ and
then call recvr->TakeOverEarlySender() for all contexts.
recvr->TakeOverEarlySender() then calls
recvr_->mgr_->EnqueueDeserializeTask((), which can block if the
deserialize pool queue is full. The next thread to become available
in that queue will also have to acquire lock_, thus leading to a
deadlock.

We fix this by moving the EarlySendersList out of the
EarlySendersMap and dropping the lock before taking any actions on
the RPC contexts in the EarlySendersList. All functions called after
dropping 'lock_' do not require the lock to protect them as they are
thread safe.

Additionally modified the BE test data-stream-test to work with KRPC
as well.

Testing: Added a new test to data-stream-test to verify that the
deadlock does not happen. Also, I verified that this test hangs
without the fix.

Change-Id: Ib7d1a8f12a4821092ca61ccc8a6f20c0404d56c7
Reviewed-on: http://gerrit.cloudera.org:8080/8950
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ff86feaa
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ff86feaa
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ff86feaa

Branch: refs/heads/master
Commit: ff86feaa67ff8bf703896e33d9a358e42739ae30
Parents: f0b3d9d
Author: Sailesh Mukil <sa...@cloudera.com>
Authored: Fri Jan 5 10:48:13 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 2 02:14:46 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/data-stream-test.cc     | 282 +++++++++++++++++++++++-----
 be/src/runtime/krpc-data-stream-mgr.cc |  44 +++--
 2 files changed, 267 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ff86feaa/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 5e70497..07eefd4 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -23,13 +23,18 @@
 #include "common/status.h"
 #include "codegen/llvm-codegen.h"
 #include "exprs/slot-ref.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/service_if.h"
 #include "rpc/auth-provider.h"
 #include "rpc/thrift-server.h"
+#include "rpc/rpc-mgr.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/data-stream-mgr-base.h"
 #include "runtime/data-stream-mgr.h"
 #include "runtime/exec-env.h"
+#include "runtime/krpc-data-stream-mgr.h"
+#include "runtime/krpc-data-stream-sender.h"
 #include "runtime/data-stream-sender.h"
 #include "runtime/data-stream-recvr-base.h"
 #include "runtime/data-stream-recvr.h"
@@ -38,6 +43,7 @@
 #include "runtime/backend-client.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.inline.h"
+#include "service/data-stream-service.h"
 #include "service/fe-support.h"
 #include "util/cpu-info.h"
 #include "util/disk-info.h"
@@ -47,6 +53,7 @@
 #include "util/mem-info.h"
 #include "util/test-info.h"
 #include "util/tuple-row-compare.h"
+#include "gen-cpp/data_stream_service.pb.h"
 #include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/Types_types.h"
@@ -54,6 +61,7 @@
 #include "service/fe-support.h"
 
 #include <iostream>
+#include <unistd.h>
 
 #include "common/names.h"
 
@@ -61,9 +69,17 @@ using namespace impala;
 using namespace apache::thrift;
 using namespace apache::thrift::protocol;
 
-DEFINE_int32(port, 20001, "port on which to run Impala test backend");
-DECLARE_string(principal);
+using kudu::MetricEntity;
+using kudu::rpc::ResultTracker;
+using kudu::rpc::RpcContext;
+using kudu::rpc::ServiceIf;
+
+DEFINE_int32(port, 20001, "port on which to run Impala Thrift based test backend.");
 DECLARE_int32(datastream_sender_timeout_ms);
+DECLARE_int32(datastream_service_num_deserialization_threads);
+DECLARE_int32(datastream_service_deserialization_queue_size);
+
+DECLARE_bool(use_krpc);
 
 // We reserve contiguous memory for senders in SetUp. If a test uses more
 // senders, a DCHECK will fail and you should increase this value.
@@ -78,10 +94,12 @@ static const int NUM_BATCHES = TOTAL_DATA_SIZE / BATCH_CAPACITY / PER_ROW_DATA;
 
 namespace impala {
 
-class ImpalaTestBackend : public ImpalaInternalServiceIf {
+// This class acts as a service interface for all Thrift related communication within
+// this test file.
+class ImpalaThriftTestBackend : public ImpalaInternalServiceIf {
  public:
-  ImpalaTestBackend(DataStreamMgr* stream_mgr): mgr_(stream_mgr) {}
-  virtual ~ImpalaTestBackend() {}
+  ImpalaThriftTestBackend(DataStreamMgr* stream_mgr): mgr_(stream_mgr) {}
+  virtual ~ImpalaThriftTestBackend() {}
 
   virtual void ExecQueryFInstances(TExecQueryFInstancesResult& return_val,
       const TExecQueryFInstancesParams& params) {}
@@ -109,13 +127,43 @@ class ImpalaTestBackend : public ImpalaInternalServiceIf {
   DataStreamMgr* mgr_;
 };
 
-class DataStreamTest : public testing::Test {
+// This class acts as a service interface for all KRPC related communication within
+// this test file.
+class ImpalaKRPCTestBackend : public DataStreamServiceIf {
+ public:
+  ImpalaKRPCTestBackend(RpcMgr* rpc_mgr, KrpcDataStreamMgr* stream_mgr)
+    : DataStreamServiceIf(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()),
+      stream_mgr_(stream_mgr) {}
+  virtual ~ImpalaKRPCTestBackend() {}
+
+  virtual void TransmitData(const TransmitDataRequestPB* request,
+      TransmitDataResponsePB* response, RpcContext* rpc_context) {
+    stream_mgr_->AddData(request, response, rpc_context);
+  }
+
+  virtual void EndDataStream(const EndDataStreamRequestPB* request,
+      EndDataStreamResponsePB* response, RpcContext* rpc_context) {
+    stream_mgr_->CloseSender(request, response, rpc_context);
+  }
+
+ private:
+  KrpcDataStreamMgr* stream_mgr_;
+};
+
+template <class T> class DataStreamTestBase : public T {
+ protected:
+  virtual void SetUp() {}
+  virtual void TearDown() {}
+};
+
+enum KrpcSwitch {
+  USE_THRIFT,
+  USE_KRPC
+};
+
+class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwitch> > {
  protected:
   DataStreamTest() : next_val_(0) {
-    // Initialize MemTrackers and RuntimeState for use by the data stream receiver.
-    ABORT_IF_ERROR(exec_env_.InitForFeTests());
-    runtime_state_.reset(new RuntimeState(TQueryCtx(), &exec_env_));
-    mem_pool_.reset(new MemPool(&tracker_));
 
     // Stop tests that rely on mismatched sender / receiver pairs timing out from failing.
     FLAGS_datastream_sender_timeout_ms = 250;
@@ -123,6 +171,14 @@ class DataStreamTest : public testing::Test {
   ~DataStreamTest() { runtime_state_->ReleaseResources(); }
 
   virtual void SetUp() {
+    // Initialize MemTrackers and RuntimeState for use by the data stream receiver.
+    FLAGS_use_krpc = GetParam() == USE_KRPC;
+
+    exec_env_.reset(new ExecEnv());
+    ABORT_IF_ERROR(exec_env_->InitForFeTests());
+    runtime_state_.reset(new RuntimeState(TQueryCtx(), exec_env_.get()));
+    mem_pool_.reset(new MemPool(&tracker_));
+
     CreateRowDesc();
 
     is_asc_.push_back(true);
@@ -131,7 +187,8 @@ class DataStreamTest : public testing::Test {
 
     next_instance_id_.lo = 0;
     next_instance_id_.hi = 0;
-    stream_mgr_ = new DataStreamMgr(new MetricGroup(""));
+    stream_mgr_ = ExecEnv::GetInstance()->stream_mgr();
+    if (GetParam() == USE_KRPC) krpc_mgr_ = ExecEnv::GetInstance()->rpc_mgr();
 
     broadcast_sink_.dest_node_id = DEST_NODE_ID;
     broadcast_sink_.output_partition.type = TPartitionType::UNPARTITIONED;
@@ -159,7 +216,11 @@ class DataStreamTest : public testing::Test {
     // Ensure that individual sender info addresses don't change
     sender_info_.reserve(MAX_SENDERS);
     receiver_info_.reserve(MAX_RECEIVERS);
-    StartBackend();
+    if (GetParam() == USE_THRIFT) {
+      StartThriftBackend();
+    } else {
+      StartKrpcBackend();
+    }
   }
 
   const TDataSink GetSink(TPartitionType::type partition_type) {
@@ -185,8 +246,12 @@ class DataStreamTest : public testing::Test {
     less_than_->Close(runtime_state_.get());
     ScalarExpr::Close(ordering_exprs_);
     mem_pool_->FreeAll();
-    exec_env_.impalad_client_cache()->TestShutdown();
-    StopBackend();
+    if (GetParam() == USE_THRIFT) {
+      exec_env_->impalad_client_cache()->TestShutdown();
+      StopThriftBackend();
+    } else {
+      StopKrpcBackend();
+    }
   }
 
   void Reset() {
@@ -203,7 +268,7 @@ class DataStreamTest : public testing::Test {
   vector<bool> is_asc_;
   vector<bool> nulls_first_;
   TupleRowComparator* less_than_;
-  ExecEnv exec_env_;
+  boost::scoped_ptr<ExecEnv> exec_env_;
   scoped_ptr<RuntimeState> runtime_state_;
   TUniqueId next_instance_id_;
   string stmt_;
@@ -215,8 +280,12 @@ class DataStreamTest : public testing::Test {
   int next_val_;
   int64_t* tuple_mem_;
 
+  // Only used for KRPC. Not owned.
+  RpcMgr* krpc_mgr_ = nullptr;
+  TNetworkAddress krpc_address_;
+
   // receiving node
-  DataStreamMgrBase* stream_mgr_;
+  DataStreamMgrBase* stream_mgr_ = nullptr;
   ThriftServer* server_;
 
   // sending node(s)
@@ -266,6 +335,9 @@ class DataStreamTest : public testing::Test {
     dest.fragment_instance_id = next_instance_id_;
     dest.server.hostname = "127.0.0.1";
     dest.server.port = FLAGS_port;
+    if (GetParam() == USE_KRPC) {
+      dest.__set_krpc_server(krpc_address_);
+    }
     *instance_id = next_instance_id_;
     ++next_instance_id_.lo;
   }
@@ -452,24 +524,51 @@ class DataStreamTest : public testing::Test {
     }
   }
 
-  // Start backend in separate thread.
-  void StartBackend() {
+  // Start Thrift based backend in separate thread.
+  void StartThriftBackend() {
     // Dynamic cast stream_mgr_ which is of type DataStreamMgrBase to derived type
-    // DataStreamMgr, since ImpalaTestBackend() accepts only DataStreamMgr*.
-    boost::shared_ptr<ImpalaTestBackend> handler(
-        new ImpalaTestBackend(dynamic_cast<DataStreamMgr*>(stream_mgr_)));
+    // DataStreamMgr, since ImpalaThriftTestBackend() accepts only DataStreamMgr*.
+    boost::shared_ptr<ImpalaThriftTestBackend> handler(
+        new ImpalaThriftTestBackend(dynamic_cast<DataStreamMgr*>(stream_mgr_)));
     boost::shared_ptr<TProcessor> processor(new ImpalaInternalServiceProcessor(handler));
     ThriftServerBuilder builder("DataStreamTest backend", processor, FLAGS_port);
     ASSERT_OK(builder.Build(&server_));
     ASSERT_OK(server_->Start());
   }
 
-  void StopBackend() {
+  void StartKrpcBackend() {
+    IpAddr ip;
+    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+    krpc_address_ = MakeNetworkAddress(ip, FLAGS_port);
+
+    MemTracker* data_svc_tracker = obj_pool_.Add(
+        new MemTracker(-1, "Data Stream Service",
+            ExecEnv::GetInstance()->process_mem_tracker()));
+    MemTracker* stream_mgr_tracker = obj_pool_.Add(
+        new MemTracker(-1, "Data Stream Queued RPC Calls",
+            ExecEnv::GetInstance()->process_mem_tracker()));
+
+    KrpcDataStreamMgr* stream_mgr_ref = dynamic_cast<KrpcDataStreamMgr*>(stream_mgr_);
+    ASSERT_OK(stream_mgr_ref->Init(stream_mgr_tracker, data_svc_tracker));
+    ASSERT_OK(krpc_mgr_->Init());
+
+    unique_ptr<ServiceIf> handler(
+        new ImpalaKRPCTestBackend(krpc_mgr_, stream_mgr_ref));
+    ASSERT_OK(krpc_mgr_->RegisterService(CpuInfo::num_cores(), 1024, move(handler),
+        data_svc_tracker));
+    ASSERT_OK(krpc_mgr_->StartServices(krpc_address_));
+  }
+
+  void StopThriftBackend() {
     VLOG_QUERY << "stop backend\n";
     server_->StopForTesting();
     delete server_;
   }
 
+  void StopKrpcBackend() {
+    krpc_mgr_->Shutdown();
+  }
+
   void StartSender(TPartitionType::type partition_type = TPartitionType::UNPARTITIONED,
                    int channel_buffer_size = 1024) {
     VLOG_QUERY << "start sender";
@@ -479,7 +578,7 @@ class DataStreamTest : public testing::Test {
     SenderInfo& info = sender_info_.back();
     info.thread_handle =
         new thread(&DataStreamTest::Sender, this, num_senders, channel_buffer_size,
-                   partition_type);
+                   partition_type, GetParam() == USE_THRIFT);
   }
 
   void JoinSenders() {
@@ -489,35 +588,54 @@ class DataStreamTest : public testing::Test {
     }
   }
 
-  void Sender(
-      int sender_num, int channel_buffer_size, TPartitionType::type partition_type) {
-    RuntimeState state(TQueryCtx(), &exec_env_, desc_tbl_);
+  void Sender(int sender_num,
+      int channel_buffer_size, TPartitionType::type partition_type, bool is_thrift) {
+    RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
     VLOG_QUERY << "create sender " << sender_num;
     const TDataSink& sink = GetSink(partition_type);
-    DataStreamSender sender(
-        sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, &state);
+
+    // We create an object of the base class DataSink and cast to the appropriate sender
+    // according to the 'is_thrift' option.
+    scoped_ptr<DataSink> sender;
 
     TExprNode expr_node;
     expr_node.node_type = TExprNodeType::SLOT_REF;
     TExpr output_exprs;
     output_exprs.nodes.push_back(expr_node);
-    EXPECT_OK(sender.Init(vector<TExpr>({output_exprs}), sink, &state));
 
-    EXPECT_OK(sender.Prepare(&state, &tracker_));
-    EXPECT_OK(sender.Open(&state));
+    if (is_thrift) {
+      sender.reset(new DataStreamSender(
+          sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, &state));
+      EXPECT_OK(static_cast<DataStreamSender*>(
+          sender.get())->Init(vector<TExpr>({output_exprs}), sink, &state));
+    } else {
+      sender.reset(new KrpcDataStreamSender(
+          sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, &state));
+      EXPECT_OK(static_cast<KrpcDataStreamSender*>(
+          sender.get())->Init(vector<TExpr>({output_exprs}), sink, &state));
+    }
+
+    EXPECT_OK(sender->Prepare(&state, &tracker_));
+    EXPECT_OK(sender->Open(&state));
     scoped_ptr<RowBatch> batch(CreateRowBatch());
     SenderInfo& info = sender_info_[sender_num];
     int next_val = 0;
     for (int i = 0; i < NUM_BATCHES; ++i) {
       GetNextBatch(batch.get(), &next_val);
       VLOG_QUERY << "sender " << sender_num << ": #rows=" << batch->num_rows();
-      info.status = sender.Send(&state, batch.get());
+      info.status = sender->Send(&state, batch.get());
       if (!info.status.ok()) break;
     }
     VLOG_QUERY << "closing sender" << sender_num;
-    info.status.MergeStatus(sender.FlushFinal(&state));
-    sender.Close(&state);
-    info.num_bytes_sent = sender.GetNumDataBytesSent();
+    info.status.MergeStatus(sender->FlushFinal(&state));
+    sender->Close(&state);
+    if (is_thrift) {
+      info.num_bytes_sent = static_cast<DataStreamSender*>(
+          sender.get())->GetNumDataBytesSent();
+    } else {
+      info.num_bytes_sent = static_cast<KrpcDataStreamSender*>(
+          sender.get())->GetNumDataBytesSent();
+    }
 
     batch->Reset();
     state.ReleaseResources();
@@ -542,7 +660,44 @@ class DataStreamTest : public testing::Test {
   }
 };
 
-TEST_F(DataStreamTest, UnknownSenderSmallResult) {
+// We use a seperate class for tests that are required to be run against Thrift only.
+class DataStreamTestThriftOnly : public DataStreamTest {
+ protected:
+  virtual void SetUp() {
+    DataStreamTest::SetUp();
+  }
+
+  virtual void TearDown() {
+    DataStreamTest::TearDown();
+  }
+};
+
+// We need a seperate test class for IMPALA-6346, since we need to do some pre-SetUp()
+// work. Specifically we need to set 2 flags that will be picked up during the SetUp()
+// phase of the DataStreamTest class.
+class DataStreamTestForImpala6346 : public DataStreamTest {
+ protected:
+  virtual void SetUp() {
+    FLAGS_datastream_service_num_deserialization_threads = 1;
+    FLAGS_datastream_service_deserialization_queue_size = 1;
+    DataStreamTest::SetUp();
+  }
+
+  virtual void TearDown() {
+    DataStreamTest::TearDown();
+  }
+};
+
+INSTANTIATE_TEST_CASE_P(ThriftOrKrpc, DataStreamTest,
+    ::testing::Values(USE_THRIFT, USE_KRPC));
+
+INSTANTIATE_TEST_CASE_P(ThriftOnly, DataStreamTestThriftOnly,
+    ::testing::Values(USE_THRIFT));
+
+INSTANTIATE_TEST_CASE_P(KrpcOnly, DataStreamTestForImpala6346,
+    ::testing::Values(USE_KRPC));
+
+TEST_P(DataStreamTest, UnknownSenderSmallResult) {
   // starting a sender w/o a corresponding receiver results in an error. No bytes should
   // be sent.
   // case 1: entire query result fits in single buffer
@@ -554,7 +709,7 @@ TEST_F(DataStreamTest, UnknownSenderSmallResult) {
   EXPECT_EQ(sender_info_[0].num_bytes_sent, 0);
 }
 
-TEST_F(DataStreamTest, UnknownSenderLargeResult) {
+TEST_P(DataStreamTest, UnknownSenderLargeResult) {
   // case 2: query result requires multiple buffers
   TUniqueId dummy_id;
   GetNextInstanceId(&dummy_id);
@@ -564,7 +719,7 @@ TEST_F(DataStreamTest, UnknownSenderLargeResult) {
   EXPECT_EQ(sender_info_[0].num_bytes_sent, 0);
 }
 
-TEST_F(DataStreamTest, Cancel) {
+TEST_P(DataStreamTest, Cancel) {
   TUniqueId instance_id;
   StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, false, &instance_id);
   stream_mgr_->Cancel(instance_id);
@@ -575,7 +730,7 @@ TEST_F(DataStreamTest, Cancel) {
   EXPECT_TRUE(receiver_info_[1].status.IsCancelled());
 }
 
-TEST_F(DataStreamTest, BasicTest) {
+TEST_P(DataStreamTest, BasicTest) {
   // TODO: also test that all client connections have been returned
   TPartitionType::type stream_types[] =
       {TPartitionType::UNPARTITIONED, TPartitionType::RANDOM,
@@ -605,8 +760,8 @@ TEST_F(DataStreamTest, BasicTest) {
 // parent is destroyed. In practice the parent is a member of the query's runtime state.
 //
 // TODO: Make lifecycle requirements more explicit.
-TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
-  scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), &exec_env_));
+TEST_P(DataStreamTestThriftOnly, CloseRecvrWhileReferencesRemain) {
+  scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), exec_env_.get()));
   RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
 
   // Start just one receiver.
@@ -628,7 +783,7 @@ TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
   // RPC does not cause an error (the receiver will still be called, since it is only
   // Close()'d, not deleted from the data stream manager).
   Status rpc_status;
-  ImpalaBackendConnection client(exec_env_.impalad_client_cache(),
+  ImpalaBackendConnection client(exec_env_->impalad_client_cache(),
       MakeNetworkAddress("localhost", FLAGS_port), &rpc_status);
   EXPECT_OK(rpc_status);
   TTransmitDataParams params;
@@ -647,6 +802,49 @@ TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
   stream_recvr.reset();
 }
 
+// This test is to exercise a previously present deadlock path which is now fixed, to
+// ensure that the deadlock does not happen anymore. It does this by doing the following:
+// This test starts multiple senders to send to the same receiver. It makes sure that
+// the senders' payloads reach the receiver before the receiver is setup. Once the
+// receiver is being created, it will notice that there are multiple payloads waiting
+// to be processed already and it would hold the KrpcDataStreamMgr::lock_ and call
+// TakeOverEarlySender() which calls EnqueueDeserializeTask() which tries to Offer()
+// the payload to the deserialization_pool_. However, we've set the queue size to 1,
+// which will cause the payload to be stuck on the Offer(). Now any payload that is
+// already being deserialized will be waiting on the KrpcDataStreamMgr::lock_ as well.
+// But the first thread will never release the lock since it's stuck on Offer(), causing
+// a deadlock. This is fixed with IMPALA-6346.
+TEST_P(DataStreamTestForImpala6346, TestNoDeadlock) {
+  TUniqueId instance_id;
+  GetNextInstanceId(&instance_id);
+
+  // Start 4 senders.
+  StartSender(TPartitionType::UNPARTITIONED, 1024 * 1024);
+  StartSender(TPartitionType::UNPARTITIONED, 1024 * 1024);
+  StartSender(TPartitionType::UNPARTITIONED, 1024 * 1024);
+  StartSender(TPartitionType::UNPARTITIONED, 1024 * 1024);
+
+  // Do a small sleep to ensure that the sent payloads reach before the receivers
+  // are created.
+  sleep(2);
+
+  // Setup the receiver.
+  RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
+  receiver_info_.push_back(ReceiverInfo(TPartitionType::UNPARTITIONED, 4, 1));
+  ReceiverInfo& info = receiver_info_.back();
+  info.stream_recvr = stream_mgr_->CreateRecvr(runtime_state_.get(), row_desc_,
+      instance_id, DEST_NODE_ID, 4, 1024 * 1024, profile, false);
+  info.thread_handle = new thread(
+      &DataStreamTestForImpala6346_TestNoDeadlock_Test::ReadStream, this, &info);
+
+  JoinSenders();
+  CheckSenders();
+  JoinReceivers();
+
+  // Check that 4 payloads have been received.
+  CheckReceivers(TPartitionType::UNPARTITIONED, 4);
+}
+
 // TODO: more tests:
 // - test case for transmission error in last batch
 // - receivers getting created concurrently

http://git-wip-us.apache.org/repos/asf/impala/blob/ff86feaa/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 3f777ea..fabea13 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -54,14 +54,17 @@ DECLARE_int32(datastream_sender_timeout_ms);
 DEFINE_int32(datastream_service_num_deserialization_threads, 16,
     "Number of threads for deserializing RPC requests deferred due to the receiver "
     "not ready or the soft limit of the receiver is reached.");
-
+DEFINE_int32(datastream_service_deserialization_queue_size, 10000,
+    "Number of deferred RPC requests that can be enqueued before being processed by a "
+    "deserialization thread.");
 using boost::mutex;
 
 namespace impala {
 
 KrpcDataStreamMgr::KrpcDataStreamMgr(MetricGroup* metrics)
   : deserialize_pool_("data-stream-mgr", "deserialize",
-      FLAGS_datastream_service_num_deserialization_threads, 10000,
+      FLAGS_datastream_service_num_deserialization_threads,
+      FLAGS_datastream_service_deserialization_queue_size,
       boost::bind(&KrpcDataStreamMgr::DeserializeThreadFn, this, _1, _2)) {
   MetricGroup* dsm_metrics = metrics->GetOrCreateChildGroup("datastream-manager");
   num_senders_waiting_ =
@@ -102,6 +105,7 @@ shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
       new KrpcDataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
           finst_id, dest_node_id, num_senders, is_merging, buffer_size, profile));
   uint32_t hash_value = GetHashValue(finst_id, dest_node_id);
+  EarlySendersList early_senders_for_recvr;
   {
     RecvrId recvr_id = make_pair(finst_id, dest_node_id);
     lock_guard<mutex> l(lock_);
@@ -109,25 +113,31 @@ shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
     receiver_map_.insert(make_pair(hash_value, recvr));
 
     EarlySendersMap::iterator it = early_senders_map_.find(recvr_id);
+
     if (it != early_senders_map_.end()) {
-      EarlySendersList& early_senders = it->second;
-      // Let the receiver take over the RPC payloads of early senders and process them
-      // asynchronously.
-      for (unique_ptr<TransmitDataCtx>& ctx : early_senders.waiting_sender_ctxs) {
-        // Release memory. The receiver will track it in its instance tracker.
-        int64_t transfer_size = ctx->rpc_context->GetTransferSize();
-        recvr->TakeOverEarlySender(move(ctx));
-        mem_tracker_->Release(transfer_size);
-        num_senders_waiting_->Increment(-1);
-      }
-      for (const unique_ptr<EndDataStreamCtx>& ctx : early_senders.closed_sender_ctxs) {
-        recvr->RemoveSender(ctx->request->sender_id());
-        RespondAndReleaseRpc(Status::OK(), ctx->response, ctx->rpc_context, mem_tracker_);
-        num_senders_waiting_->Increment(-1);
-      }
+      // Move the early senders list here so that we can drop 'lock_'. We need to drop
+      // the lock before processing the early senders to avoid a deadlock.
+      // More details in IMPALA-6346.
+      early_senders_for_recvr = std::move(it->second);
       early_senders_map_.erase(it);
     }
   }
+
+  // Let the receiver take over the RPC payloads of early senders and process them
+  // asynchronously.
+  for (unique_ptr<TransmitDataCtx>& ctx : early_senders_for_recvr.waiting_sender_ctxs) {
+    // Release memory. The receiver will track it in its instance tracker.
+    int64_t transfer_size = ctx->rpc_context->GetTransferSize();
+    recvr->TakeOverEarlySender(move(ctx));
+    mem_tracker_->Release(transfer_size);
+    num_senders_waiting_->Increment(-1);
+  }
+  for (const unique_ptr<EndDataStreamCtx>& ctx :
+      early_senders_for_recvr.closed_sender_ctxs) {
+    recvr->RemoveSender(ctx->request->sender_id());
+    RespondAndReleaseRpc(Status::OK(), ctx->response, ctx->rpc_context, mem_tracker_);
+    num_senders_waiting_->Increment(-1);
+  }
   return recvr;
 }