You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2018/10/13 03:27:46 UTC

[1/5] impala git commit: IMPALA-7554: Update custom cluster tests to have new logs for sentry

Repository: impala
Updated Branches:
  refs/heads/master 004719126 -> af76186e0


IMPALA-7554: Update custom cluster tests to have new logs for sentry

This patch adds the ability to create a new log for each spawn of the
sentry service. This will enable better trouble shooting for the
custom cluster tests that restart the sentry service.

Testing:
- Ran all custom cluster tests.

Change-Id: I6e538af7fd6e6ea21dc3f4442bdebf3b31558516
Reviewed-on: http://gerrit.cloudera.org:8080/11624
Reviewed-by: Fredy Wijaya <fw...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/21f521a7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/21f521a7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/21f521a7

Branch: refs/heads/master
Commit: 21f521a7c280031e33cde7c61a979683c5abed00
Parents: 0047191
Author: Adam Holley <gi...@holleyism.com>
Authored: Thu Oct 4 22:55:54 2018 -0500
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 12 01:00:56 2018 +0000

----------------------------------------------------------------------
 testdata/bin/run-sentry-service.sh           |  8 +-
 tests/authorization/test_authorization.py    |  6 +-
 tests/authorization/test_owner_privileges.py | 99 +++++++++++++----------
 tests/common/custom_cluster_test_suite.py    | 29 ++++---
 tests/custom_cluster/test_redaction.py       |  2 +-
 5 files changed, 87 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/21f521a7/testdata/bin/run-sentry-service.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/run-sentry-service.sh b/testdata/bin/run-sentry-service.sh
index a508ffd..7c6167c 100755
--- a/testdata/bin/run-sentry-service.sh
+++ b/testdata/bin/run-sentry-service.sh
@@ -24,13 +24,19 @@ setup_report_build_error
 . ${IMPALA_HOME}/bin/set-classpath.sh
 
 SENTRY_SERVICE_CONFIG=${SENTRY_SERVICE_CONFIG:-}
+SENTRY_LOG_DIR=${SENTRY_LOG_DIR:-}
 
 if [ -z ${SENTRY_SERVICE_CONFIG} ]
 then
   SENTRY_SERVICE_CONFIG=${SENTRY_CONF_DIR}/sentry-site.xml
 fi
 
-LOGDIR="${IMPALA_CLUSTER_LOGS_DIR}"/sentry
+if [ -z ${SENTRY_LOG_DIR} ]
+then
+  LOGDIR="${IMPALA_CLUSTER_LOGS_DIR}"/sentry
+else
+  LOGDIR=${SENTRY_LOG_DIR}
+fi
 
 mkdir -p "${LOGDIR}" || true
 

http://git-wip-us.apache.org/repos/asf/impala/blob/21f521a7/tests/authorization/test_authorization.py
----------------------------------------------------------------------
diff --git a/tests/authorization/test_authorization.py b/tests/authorization/test_authorization.py
index 75c0997..a508440 100644
--- a/tests/authorization/test_authorization.py
+++ b/tests/authorization/test_authorization.py
@@ -367,7 +367,8 @@ class TestAuthorization(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
       impalad_args="--server_name=server1 --sentry_config=" + SENTRY_CONFIG_FILE,
       catalogd_args="--sentry_config=" + SENTRY_CONFIG_FILE,
-      log_dir=tempfile.mkdtemp(prefix="test_deprecated_none_", dir=os.getenv("LOG_DIR")))
+      impala_log_dir=tempfile.mkdtemp(prefix="test_deprecated_none_",
+      dir=os.getenv("LOG_DIR")))
   def test_deprecated_flag_doesnt_show(self):
     assert_no_files_in_dir_contain(self.impala_log_dir, "authorization_policy_file " +
         "flag is deprecated. Object Ownership feature is not supported")
@@ -377,7 +378,8 @@ class TestAuthorization(CustomClusterTestSuite):
       --authorization_policy_file=%s\
       --authorization_policy_provider_class=%s" % (AUTH_POLICY_FILE,
        "org.apache.sentry.provider.file.LocalGroupResourceAuthorizationProvider"),
-      log_dir=tempfile.mkdtemp(prefix="test_deprecated_", dir=os.getenv("LOG_DIR")))
+      impala_log_dir=tempfile.mkdtemp(prefix="test_deprecated_",
+      dir=os.getenv("LOG_DIR")))
   def test_deprecated_flags(self):
     assert_file_in_dir_contains(self.impala_log_dir, "authorization_policy_file flag" +
         " is deprecated. Object Ownership feature is not supported")

http://git-wip-us.apache.org/repos/asf/impala/blob/21f521a7/tests/authorization/test_owner_privileges.py
----------------------------------------------------------------------
diff --git a/tests/authorization/test_owner_privileges.py b/tests/authorization/test_owner_privileges.py
index a44e3e8..4cc2193 100644
--- a/tests/authorization/test_owner_privileges.py
+++ b/tests/authorization/test_owner_privileges.py
@@ -127,14 +127,16 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
   @pytest.mark.execute_serially
   @SentryCacheTestSuite.with_args(
       impalad_args="--server_name=server1 --sentry_config={0} "
-      "--authorization_policy_provider_class="
-      "org.apache.impala.service.CustomClusterResourceAuthorizationProvider "
-      .format(SENTRY_CONFIG_FILE_OO),
+                   "--authorization_policy_provider_class="
+                   "org.apache.impala.service.CustomClusterResourceAuthorizationProvider "
+                   .format(SENTRY_CONFIG_FILE_OO),
       catalogd_args="--sentry_config={0} --authorization_policy_provider_class="
-      "org.apache.impala.service.CustomClusterResourceAuthorizationProvider "
-      "--sentry_catalog_polling_frequency_s={1}".format(SENTRY_CONFIG_FILE_OO,
-      str(SENTRY_LONG_POLLING_FREQUENCY_S)),
-      sentry_config=SENTRY_CONFIG_FILE_OO)
+                    "org.apache.impala.service.CustomClusterResourceAuthorizationProvider"
+                    " --sentry_catalog_polling_frequency_s={1}"
+                    .format(SENTRY_CONFIG_FILE_OO, str(SENTRY_LONG_POLLING_FREQUENCY_S)),
+      sentry_config=SENTRY_CONFIG_FILE_OO,
+      sentry_log_dir="{0}/test_owner_privileges_with_grant_log_poll"
+                     .format(SENTRY_BASE_LOG_DIR))
   def test_owner_privileges_with_grant_long_poll(self, vector, unique_database):
     self.__execute_owner_privilege_tests(TestObject(TestObject.DATABASE, "owner_priv_db",
         grant=True))
@@ -146,14 +148,16 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
   @pytest.mark.execute_serially
   @SentryCacheTestSuite.with_args(
       impalad_args="--server_name=server1 --sentry_config={0} "
-      "--authorization_policy_provider_class="
-      "org.apache.impala.service.CustomClusterResourceAuthorizationProvider"
-      .format(SENTRY_CONFIG_FILE_OO),
+                   "--authorization_policy_provider_class="
+                   "org.apache.impala.service.CustomClusterResourceAuthorizationProvider"
+                   .format(SENTRY_CONFIG_FILE_OO),
       catalogd_args="--sentry_config={0} --sentry_catalog_polling_frequency_s={1} "
-      "--authorization_policy_provider_class="
-      "org.apache.impala.service.CustomClusterResourceAuthorizationProvider"
-      .format(SENTRY_CONFIG_FILE_OO, str(SENTRY_POLLING_FREQUENCY_S)),
-      sentry_config=SENTRY_CONFIG_FILE_OO)
+                    "--authorization_policy_provider_class="
+                    "org.apache.impala.service.CustomClusterResourceAuthorizationProvider"
+                    .format(SENTRY_CONFIG_FILE_OO, str(SENTRY_POLLING_FREQUENCY_S)),
+      sentry_config=SENTRY_CONFIG_FILE_OO,
+      sentry_log_dir="{0}/test_owner_privileges_with_grant"
+                     .format(SENTRY_BASE_LOG_DIR))
   def test_owner_privileges_with_grant(self, vector, unique_database):
     self.__execute_owner_privilege_tests(TestObject(TestObject.DATABASE, "owner_priv_db",
         grant=True), sentry_refresh_timeout_s=SENTRY_REFRESH_TIMEOUT_S)
@@ -238,14 +242,17 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
   @pytest.mark.execute_serially
   @SentryCacheTestSuite.with_args(
       impalad_args="--server_name=server1 --sentry_config={0} "
-      "--authorization_policy_provider_class="
-      "org.apache.impala.service.CustomClusterResourceAuthorizationProvider "
-      .format(SENTRY_CONFIG_FILE_NO_OO),
+                   "--authorization_policy_provider_class="
+                   "org.apache.impala.service.CustomClusterResourceAuthorizationProvider "
+                   .format(SENTRY_CONFIG_FILE_NO_OO),
       catalogd_args="--sentry_config={0} --authorization_policy_provider_class="
-      "org.apache.impala.service.CustomClusterResourceAuthorizationProvider "
-      "--sentry_catalog_polling_frequency_s={1}".format(SENTRY_CONFIG_FILE_NO_OO,
-      str(SENTRY_LONG_POLLING_FREQUENCY_S)),
-      sentry_config=SENTRY_CONFIG_FILE_NO_OO)
+                    "org.apache.impala.service.CustomClusterResourceAuthorizationProvider"
+                    " --sentry_catalog_polling_frequency_s={1}"
+                    .format(SENTRY_CONFIG_FILE_NO_OO,
+                    str(SENTRY_LONG_POLLING_FREQUENCY_S)),
+      sentry_config=SENTRY_CONFIG_FILE_NO_OO,
+      sentry_log_dir="{0}/test_owner_privileges_disabled_log_poll"
+                     .format(SENTRY_BASE_LOG_DIR))
   def test_owner_privileges_disabled_long_poll(self, vector, unique_database):
     self.__execute_owner_privilege_tests_no_oo(TestObject(TestObject.DATABASE,
         "owner_priv_db"))
@@ -257,14 +264,16 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
   @pytest.mark.execute_serially
   @SentryCacheTestSuite.with_args(
       impalad_args="--server_name=server1 --sentry_config={0} "
-      "--authorization_policy_provider_class="
-      "org.apache.impala.service.CustomClusterResourceAuthorizationProvider"
-      .format(SENTRY_CONFIG_FILE_NO_OO),
+                   "--authorization_policy_provider_class="
+                   "org.apache.impala.service.CustomClusterResourceAuthorizationProvider"
+                   .format(SENTRY_CONFIG_FILE_NO_OO),
       catalogd_args="--sentry_config={0} --sentry_catalog_polling_frequency_s={1} "
-      "--authorization_policy_provider_class="
-      "org.apache.impala.service.CustomClusterResourceAuthorizationProvider"
-      .format(SENTRY_CONFIG_FILE_NO_OO, str(SENTRY_POLLING_FREQUENCY_S)),
-      sentry_config=SENTRY_CONFIG_FILE_NO_OO)
+                    "--authorization_policy_provider_class="
+                    "org.apache.impala.service.CustomClusterResourceAuthorizationProvider"
+                    .format(SENTRY_CONFIG_FILE_NO_OO, str(SENTRY_POLLING_FREQUENCY_S)),
+      sentry_config=SENTRY_CONFIG_FILE_NO_OO,
+      sentry_log_dir="{0}/test_owner_privileges_disabled"
+                     .format(SENTRY_BASE_LOG_DIR))
   def test_owner_privileges_disabled(self, vector, unique_database):
     self.__execute_owner_privilege_tests_no_oo(TestObject(TestObject.DATABASE,
         "owner_priv_db"), sentry_refresh_timeout_s=SENTRY_REFRESH_TIMEOUT_S)
@@ -311,14 +320,17 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
   @pytest.mark.execute_serially
   @SentryCacheTestSuite.with_args(
       impalad_args="--server_name=server1 --sentry_config={0} "
-      "--authorization_policy_provider_class="
-      "org.apache.impala.service.CustomClusterResourceAuthorizationProvider "
-      .format(SENTRY_CONFIG_FILE_OO_NOGRANT),
+                   "--authorization_policy_provider_class="
+                   "org.apache.impala.service.CustomClusterResourceAuthorizationProvider"
+                   .format(SENTRY_CONFIG_FILE_OO_NOGRANT),
       catalogd_args="--sentry_config={0} --authorization_policy_provider_class="
-      "org.apache.impala.service.CustomClusterResourceAuthorizationProvider "
-      "--sentry_catalog_polling_frequency_s={1}".format(SENTRY_CONFIG_FILE_OO_NOGRANT,
-      str(SENTRY_LONG_POLLING_FREQUENCY_S)),
-      sentry_config=SENTRY_CONFIG_FILE_OO_NOGRANT)
+                    "org.apache.impala.service.CustomClusterResourceAuthorizationProvider"
+                    " --sentry_catalog_polling_frequency_s={1}"
+                    .format(SENTRY_CONFIG_FILE_OO_NOGRANT,
+                    str(SENTRY_LONG_POLLING_FREQUENCY_S)),
+      sentry_config=SENTRY_CONFIG_FILE_OO_NOGRANT,
+      sentry_log_dir="{0}/test_owner_privileges_without_grant_log_poll"
+                     .format(SENTRY_BASE_LOG_DIR))
   def test_owner_privileges_without_grant_long_poll(self, vector, unique_database):
     self.__execute_owner_privilege_tests_oo_nogrant(TestObject(TestObject.DATABASE,
         "owner_priv_db"))
@@ -330,14 +342,17 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
   @pytest.mark.execute_serially
   @SentryCacheTestSuite.with_args(
       impalad_args="--server_name=server1 --sentry_config={0} "
-      "--authorization_policy_provider_class="
-      "org.apache.impala.service.CustomClusterResourceAuthorizationProvider"
-      .format(SENTRY_CONFIG_FILE_OO_NOGRANT),
+                   "--authorization_policy_provider_class="
+                   "org.apache.impala.service.CustomClusterResourceAuthorizationProvider"
+                   .format(SENTRY_CONFIG_FILE_OO_NOGRANT),
       catalogd_args="--sentry_config={0} --sentry_catalog_polling_frequency_s={1} "
-      "--authorization_policy_provider_class="
-      "org.apache.impala.service.CustomClusterResourceAuthorizationProvider"
-      .format(SENTRY_CONFIG_FILE_OO_NOGRANT, str(SENTRY_POLLING_FREQUENCY_S)),
-      sentry_config=SENTRY_CONFIG_FILE_OO_NOGRANT)
+                    "--authorization_policy_provider_class="
+                    "org.apache.impala.service.CustomClusterResourceAuthorizationProvider"
+                    .format(SENTRY_CONFIG_FILE_OO_NOGRANT,
+                    str(SENTRY_POLLING_FREQUENCY_S)),
+      sentry_config=SENTRY_CONFIG_FILE_OO_NOGRANT,
+      sentry_log_dir="{0}/test_owner_privileges_without_grant"
+                     .format(SENTRY_BASE_LOG_DIR))
   def test_owner_privileges_without_grant(self, vector, unique_database):
     self.__execute_owner_privilege_tests_oo_nogrant(TestObject(TestObject.DATABASE,
         "owner_priv_db"), sentry_refresh_timeout_s=SENTRY_REFRESH_TIMEOUT_S)

http://git-wip-us.apache.org/repos/asf/impala/blob/21f521a7/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 8fc24c2..4274abf 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -42,10 +42,11 @@ CATALOGD_ARGS = 'catalogd_args'
 # Additional args passed to the start-impala-cluster script.
 START_ARGS = 'start_args'
 SENTRY_CONFIG = 'sentry_config'
+SENTRY_LOG_DIR = 'sentry_log_dir'
 # Default query options passed to the impala daemon command line. Handled separately from
 # other impala daemon arguments to allow merging multiple defaults into a single list.
 DEFAULT_QUERY_OPTIONS = 'default_query_options'
-LOG_DIR = 'log_dir'
+IMPALA_LOG_DIR = 'impala_log_dir'
 
 # Run with fast topic updates by default to reduce time to first query running.
 DEFAULT_STATESTORE_ARGS = '--statestore_update_frequency_ms=50 \
@@ -96,7 +97,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
 
   @staticmethod
   def with_args(impalad_args=None, statestored_args=None, catalogd_args=None,
-      start_args=None, sentry_config=None, default_query_options=None, log_dir=None):
+      start_args=None, sentry_config=None, default_query_options=None,
+      impala_log_dir=None, sentry_log_dir=None):
     """Records arguments to be passed to a cluster by adding them to the decorated
     method's func_dict"""
     def decorate(func):
@@ -113,10 +115,12 @@ class CustomClusterTestSuite(ImpalaTestSuite):
         func.func_dict[START_ARGS] = start_args
       if sentry_config is not None:
         func.func_dict[SENTRY_CONFIG] = sentry_config
+      if sentry_log_dir is not None:
+        func.func_dict[SENTRY_LOG_DIR] = sentry_log_dir
       if default_query_options is not None:
         func.func_dict[DEFAULT_QUERY_OPTIONS] = default_query_options
-      if log_dir is not None:
-        func.func_dict[LOG_DIR] = log_dir
+      if impala_log_dir is not None:
+        func.func_dict[IMPALA_LOG_DIR] = impala_log_dir
       return func
     return decorate
 
@@ -129,12 +133,13 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       cluster_args.append(method.func_dict[START_ARGS])
 
     if SENTRY_CONFIG in method.func_dict:
-      self._start_sentry_service(method.func_dict[SENTRY_CONFIG])
+      self._start_sentry_service(method.func_dict[SENTRY_CONFIG],
+          method.func_dict.get(SENTRY_LOG_DIR))
     # Start a clean new cluster before each test
-    if LOG_DIR in method.func_dict:
+    if IMPALA_LOG_DIR in method.func_dict:
       self._start_impala_cluster(cluster_args,
           default_query_options=method.func_dict.get(DEFAULT_QUERY_OPTIONS),
-          log_dir=method.func_dict[LOG_DIR])
+          impala_log_dir=method.func_dict[IMPALA_LOG_DIR])
     else:
       self._start_impala_cluster(cluster_args,
           default_query_options=method.func_dict.get(DEFAULT_QUERY_OPTIONS))
@@ -152,8 +157,10 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     sleep(2)
 
   @classmethod
-  def _start_sentry_service(cls, sentry_service_config):
+  def _start_sentry_service(cls, sentry_service_config, sentry_log_dir=None):
     sentry_env = dict(os.environ)
+    if sentry_log_dir is not None:
+        sentry_env['SENTRY_LOG_DIR'] = sentry_log_dir
     sentry_env['SENTRY_SERVICE_CONFIG'] = sentry_service_config
     call = subprocess.Popen(
         ['/bin/bash', '-c', os.path.join(IMPALA_HOME,
@@ -164,18 +171,18 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       raise RuntimeError("unable to start sentry")
 
   @classmethod
-  def _start_impala_cluster(cls, options, log_dir=os.getenv('LOG_DIR', "/tmp/"),
+  def _start_impala_cluster(cls, options, impala_log_dir=os.getenv('LOG_DIR', "/tmp/"),
       cluster_size=CLUSTER_SIZE, num_coordinators=NUM_COORDINATORS,
       use_exclusive_coordinators=False, log_level=1, expected_num_executors=CLUSTER_SIZE,
       default_query_options=None):
-    cls.impala_log_dir = log_dir
+    cls.impala_log_dir = impala_log_dir
     # We ignore TEST_START_CLUSTER_ARGS here. Custom cluster tests specifically test that
     # certain custom startup arguments work and we want to keep them independent of dev
     # environments.
     cmd = [os.path.join(IMPALA_HOME, 'bin/start-impala-cluster.py'),
            '--cluster_size=%d' % cluster_size,
            '--num_coordinators=%d' % num_coordinators,
-           '--log_dir=%s' % log_dir,
+           '--log_dir=%s' % impala_log_dir,
            '--log_level=%s' % log_level]
 
     if use_exclusive_coordinators:

http://git-wip-us.apache.org/repos/asf/impala/blob/21f521a7/tests/custom_cluster/test_redaction.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_redaction.py b/tests/custom_cluster/test_redaction.py
index 9cdb71d..7789e06 100644
--- a/tests/custom_cluster/test_redaction.py
+++ b/tests/custom_cluster/test_redaction.py
@@ -97,7 +97,7 @@ class TestRedaction(CustomClusterTestSuite, unittest.TestCase):
                             -redaction_rules_file=%s
                             -vmodule=%s'"""
             % (self.audit_dir, self.profile_dir, self.rules_file, vmodule)],
-        log_dir=self.log_dir,
+        impala_log_dir=self.log_dir,
         log_level=log_level)
     self.client = self.create_impala_client()
 


[3/5] impala git commit: IMPALA-7701: grant_option in SHOW GRANT always returns NULL from HS2 clients

Posted by jo...@apache.org.
IMPALA-7701: grant_option in SHOW GRANT always returns NULL from HS2 clients

Prior to this patch, SHOW GRANT ROLE/USER always showed NULL in
grant_option column because the grant_option column header was set
to use BOOLEAN type but the column value was set to use STRING.
This mismatch causes HS2 clients to interpret the column value as
not set (NULL). The patch fixes the issue by setting the grant_option
column value to use BOOLEAN value. The patch also renames
test_show_grant_user.py to test_show_grant.py for all tests related to
SHOW GRANT statements.

Testing:
- Ran all FE tests
- Added new E2E test running SHOW GRANT statements from HS2 client
- Ran all E2E authorization tests

Change-Id: I1e175544172b63d36dceedc61e1f47e0f910d7cf
Reviewed-on: http://gerrit.cloudera.org:8080/11663
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a80ec4a6
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a80ec4a6
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a80ec4a6

Branch: refs/heads/master
Commit: a80ec4a6d987464352e8f7da89110151569a5a64
Parents: e65ac1a
Author: Fredy Wijaya <fw...@cloudera.com>
Authored: Thu Oct 11 14:53:39 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 12 20:22:48 2018 +0000

----------------------------------------------------------------------
 .../impala/catalog/AuthorizationPolicy.java     |   2 +-
 .../apache/impala/util/TResultRowBuilder.java   |   7 +
 tests/authorization/test_show_grant.py          | 148 +++++++++++++++++++
 tests/authorization/test_show_grant_user.py     |  94 ------------
 4 files changed, 156 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/a80ec4a6/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java b/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
index f810d15..decca8e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
+++ b/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
@@ -526,7 +526,7 @@ public class AuthorizationPolicy implements PrivilegeCache {
     // URIs are case sensitive
     rowBuilder.add(Strings.nullToEmpty(privilege.getUri()));
     rowBuilder.add(privilege.getPrivilege_level().toString().toLowerCase());
-    rowBuilder.add(Boolean.toString(privilege.isHas_grant_opt()));
+    rowBuilder.add(privilege.isHas_grant_opt());
     if (privilege.getCreate_time_ms() == -1) {
       rowBuilder.add(null);
     } else {

http://git-wip-us.apache.org/repos/asf/impala/blob/a80ec4a6/fe/src/main/java/org/apache/impala/util/TResultRowBuilder.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/TResultRowBuilder.java b/fe/src/main/java/org/apache/impala/util/TResultRowBuilder.java
index 1481823..836e238 100644
--- a/fe/src/main/java/org/apache/impala/util/TResultRowBuilder.java
+++ b/fe/src/main/java/org/apache/impala/util/TResultRowBuilder.java
@@ -48,6 +48,13 @@ public class TResultRowBuilder {
     return this;
   }
 
+  public TResultRowBuilder add(boolean val) {
+    TColumnValue colVal = new TColumnValue();
+    colVal.setBool_val(val);
+    row_.addToColVals(colVal);
+    return this;
+  }
+
   public TResultRowBuilder addBytes(long val) {
     TColumnValue colVal = new TColumnValue();
     colVal.setString_val(PrintUtils.printBytes(val));

http://git-wip-us.apache.org/repos/asf/impala/blob/a80ec4a6/tests/authorization/test_show_grant.py
----------------------------------------------------------------------
diff --git a/tests/authorization/test_show_grant.py b/tests/authorization/test_show_grant.py
new file mode 100644
index 0000000..cf2e543
--- /dev/null
+++ b/tests/authorization/test_show_grant.py
@@ -0,0 +1,148 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Client tests for SQL statement authorization
+# These tests verify the functionality of SHOW GRANT ROLE/USER. We
+# create several users and groups to verify clear separation.
+
+import grp
+import pytest
+from getpass import getuser
+from os import getenv
+
+from impala.dbapi import connect as impala_connect
+from tests.common.test_dimensions import create_uncompressed_text_dimension
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+SENTRY_CONFIG_FILE = getenv('IMPALA_HOME') + \
+    '/fe/src/test/resources/sentry-site_oo.xml'
+
+
+class TestShowGrant(CustomClusterTestSuite):
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestShowGrant, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  def setup_method(self, method):
+    super(TestShowGrant, self).setup_method(method)
+    self.__test_cleanup()
+
+  def teardown_method(self, method):
+    self.__test_cleanup()
+    self.client.execute('drop role sgu_test_admin')
+    super(TestShowGrant, self).teardown_method(method)
+
+  def __test_cleanup(self):
+    # Clean up any old roles created by this test
+    for role_name in self.client.execute('show roles').data:
+      if 'sgu_test' in role_name:
+        self.client.execute('drop role %s' % role_name)
+
+    # Cleanup any other roles that were granted to this user.
+    # TODO: Update Sentry Service config and authorization tests to use LocalGroupMapping
+    # for resolving users -> groups. This way we can specify custom test users that don't
+    # actually exist in the system.
+    group_name = grp.getgrnam(getuser()).gr_name
+    for role_name in self.client.execute('show role grant group `%s`' % group_name).data:
+      self.client.execute('drop role %s' % role_name)
+
+    # Create a temporary admin user so we can actually view/clean up the test db.
+    self.client.execute('create role sgu_test_admin')
+    self.client.execute('grant all on server to sgu_test_admin')
+    self.client.execute('grant role sgu_test_admin to group `%s`'
+        % group_name)
+
+  @classmethod
+  def restart_first_impalad(cls):
+    impalad = cls.cluster.impalads[0]
+    impalad.restart()
+    cls.client = impalad.service.create_beeswax_client()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args='--server_name=server1 --authorization_policy_provider_class='
+                 'org.apache.impala.service.CustomClusterResourceAuthorizationProvider '
+                 '--sentry_config={0}'.format(SENTRY_CONFIG_FILE),
+    catalogd_args='--sentry_config={0} --authorization_policy_provider_class='
+                  'org.apache.impala.service.CustomClusterResourceAuthorizationProvider'
+                  .format(SENTRY_CONFIG_FILE),
+    sentry_config=SENTRY_CONFIG_FILE)
+  def test_show_grant_user(self, vector, unique_database):
+    group_name = grp.getgrnam(getuser()).gr_name
+    self.client.execute('create role sgu_test_primary')
+    self.client.execute('grant all on server to sgu_test_primary')
+    self.client.execute('grant role sgu_test_primary to group `%s`' % group_name)
+    self.run_test_case('QueryTest/show_grant_user', vector, use_db=unique_database)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args='--server_name=server1 --authorization_policy_provider_class='
+                 'org.apache.impala.service.CustomClusterResourceAuthorizationProvider '
+                 '--sentry_config={0}'.format(SENTRY_CONFIG_FILE),
+    catalogd_args='--sentry_config={0} --authorization_policy_provider_class='
+                  'org.apache.impala.service.CustomClusterResourceAuthorizationProvider'
+                  .format(SENTRY_CONFIG_FILE),
+    sentry_config=SENTRY_CONFIG_FILE)
+  def test_show_grant_in_hs2(self, vector, unique_database):
+    """IMPALA-7701: Test that all types in show grant commands are correct. Incorrect
+    types can result in null/None values."""
+    role = 'sgu_test_primary'
+    self.client.execute('create role %s' % role)
+    self.client.execute('grant all on database %s to role %s' % (unique_database, role))
+    default_impalad = pytest.config.option.impalad.split(',')[0]
+    impalad_host = default_impalad.split(':')[0]
+    impalad_hs2_port = pytest.config.option.impalad_hs2_port
+    with impala_connect(host=impalad_host, port=impalad_hs2_port) as conn:
+      cursor = conn.cursor()
+
+      cursor.execute('show grant user %s on database %s' % (getuser(), unique_database))
+      rows = cursor.fetchall()
+      assert len(rows) == 1
+      cols = rows[0]
+      assert len(cols) == 10
+      assert cols[0] == 'USER'  # principal_type
+      assert cols[1] == getuser()  # principal_name
+      assert cols[2] == 'database'  # scope
+      assert cols[3] == unique_database  # database
+      assert cols[4] == ''  # table
+      assert cols[5] == ''  # column
+      assert cols[6] == ''  # uri
+      assert cols[7] == 'owner'  # privilege
+      assert cols[8]  # grant_option
+      # We don't assert create_time since the value can be None or str depending on the
+      # Sentry refresh.
+
+      cursor.execute('show grant role %s on database %s' % (role, unique_database))
+      rows = cursor.fetchall()
+      assert len(rows) == 1
+      cols = rows[0]
+      assert len(cols) == 8
+      assert cols[0] == 'database'  # scope
+      assert cols[1] == unique_database  # database
+      assert cols[2] == ''  # table
+      assert cols[3] == ''  # column
+      assert cols[4] == ''  # uri
+      assert cols[5] == 'all'  # privilege
+      assert not cols[6]  # grant_option
+      # We don't assert create_time since the value can be None or str depending on the
+      # Sentry refresh.

http://git-wip-us.apache.org/repos/asf/impala/blob/a80ec4a6/tests/authorization/test_show_grant_user.py
----------------------------------------------------------------------
diff --git a/tests/authorization/test_show_grant_user.py b/tests/authorization/test_show_grant_user.py
deleted file mode 100644
index b43b56e..0000000
--- a/tests/authorization/test_show_grant_user.py
+++ /dev/null
@@ -1,94 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-# Client tests for SQL statement authorization
-# These tests verify the functionality of SHOW GRANT USER. We
-# create several users and groups to verify clear separation.
-
-import grp
-import pytest
-from getpass import getuser
-from os import getenv
-
-from tests.common.test_dimensions import create_uncompressed_text_dimension
-from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-
-SENTRY_CONFIG_FILE = getenv('IMPALA_HOME') + \
-    '/fe/src/test/resources/sentry-site_oo.xml'
-
-
-class TestShowGrantUser(CustomClusterTestSuite):
-  @classmethod
-  def add_test_dimensions(cls):
-    super(TestShowGrantUser, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.add_dimension(
-        create_uncompressed_text_dimension(cls.get_workload()))
-
-  @classmethod
-  def get_workload(cls):
-    return 'functional-query'
-
-  def setup_method(self, method):
-    super(TestShowGrantUser, self).setup_method(method)
-    self.__test_cleanup()
-
-  def teardown_method(self, method):
-    self.__test_cleanup()
-    self.client.execute('drop role sgu_test_admin')
-    super(TestShowGrantUser, self).teardown_method(method)
-
-  def __test_cleanup(self):
-    # Clean up any old roles created by this test
-    for role_name in self.client.execute('show roles').data:
-      if 'sgu_test' in role_name:
-        self.client.execute('drop role %s' % role_name)
-
-    # Cleanup any other roles that were granted to this user.
-    # TODO: Update Sentry Service config and authorization tests to use LocalGroupMapping
-    # for resolving users -> groups. This way we can specify custom test users that don't
-    # actually exist in the system.
-    group_name = grp.getgrnam(getuser()).gr_name
-    for role_name in self.client.execute('show role grant group `%s`' % group_name).data:
-      self.client.execute('drop role %s' % role_name)
-
-    # Create a temporary admin user so we can actually view/clean up the test db.
-    self.client.execute('create role sgu_test_admin')
-    self.client.execute('grant all on server to sgu_test_admin')
-    self.client.execute('grant role sgu_test_admin to group `%s`'
-        % group_name)
-
-  @classmethod
-  def restart_first_impalad(cls):
-    impalad = cls.cluster.impalads[0]
-    impalad.restart()
-    cls.client = impalad.service.create_beeswax_client()
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-      impalad_args='--server_name=server1 --authorization_policy_provider_class='
-      'org.apache.impala.service.CustomClusterResourceAuthorizationProvider '
-      '--sentry_config={0}'.format(SENTRY_CONFIG_FILE),
-      catalogd_args='--sentry_config={0} --authorization_policy_provider_class='
-      'org.apache.impala.service.CustomClusterResourceAuthorizationProvider'
-      .format(SENTRY_CONFIG_FILE),
-      sentry_config=SENTRY_CONFIG_FILE)
-  def test_show_grant_user(self, vector, unique_database):
-    group_name = grp.getgrnam(getuser()).gr_name
-    self.client.execute('create role sgu_test_primary')
-    self.client.execute('grant all on server to sgu_test_primary')
-    self.client.execute('grant role sgu_test_primary to group `%s`' % group_name)
-    self.run_test_case('QueryTest/show_grant_user', vector, use_db=unique_database)


[2/5] impala git commit: IMPALA-7690: Make test_pool_config_change_while_queued compatible with python 2.6

Posted by jo...@apache.org.
IMPALA-7690: Make test_pool_config_change_while_queued compatible with
python 2.6

The ElementTree XML API used in test_pool_config_change_while_queued
used iter() which was added in python 2.7. Switching it to
getiterator() made it compatible with python 2.6.

Change-Id: Id2593609e5be288054d1361f0fe57580e17ea042
Reviewed-on: http://gerrit.cloudera.org:8080/11660
Reviewed-by: Pooja Nilangekar <po...@cloudera.com>
Reviewed-by: Michael Brown <mi...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e65ac1a4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e65ac1a4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e65ac1a4

Branch: refs/heads/master
Commit: e65ac1a4341acfce7c1afc08c0c0566ee0ca50ab
Parents: 21f521a
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Thu Oct 11 12:29:00 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 12 01:28:07 2018 +0000

----------------------------------------------------------------------
 tests/common/resource_pool_config.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e65ac1a4/tests/common/resource_pool_config.py
----------------------------------------------------------------------
diff --git a/tests/common/resource_pool_config.py b/tests/common/resource_pool_config.py
index adab034..b160163 100644
--- a/tests/common/resource_pool_config.py
+++ b/tests/common/resource_pool_config.py
@@ -83,7 +83,7 @@ class ResourcePoolConfig(object):
 
   def __find_xml_node(self, xml_root, pool_name, pool_attribute):
     """Returns the xml node corresponding to the 'pool_attribute' for the 'pool_name'"""
-    for property in xml_root.iter('property'):
+    for property in xml_root.getiterator('property'):
       try:
         name = property.find('name').text
         # eg. of name = impala.admission-control.max-query-mem-limit-bytes.root.pool_name


[5/5] impala git commit: IMPALA-7704: Revert "IMPALA-7644: Hide Parquet page index writing with feature flag"

Posted by jo...@apache.org.
IMPALA-7704: Revert "IMPALA-7644: Hide Parquet page index writing with feature flag"

The fix for IMPALA-7644 introduced ASAN issues detailed in
IMPALA-7704. Reverting for now.

This reverts commit 843683ed6c2ef41c7c25e9fa4af68801dbdd1a78.

Change-Id: Icf0a64d6ec747275e3ecd6e801e054f81095591a
Reviewed-on: http://gerrit.cloudera.org:8080/11671
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Michael Ho <kw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/af76186e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/af76186e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/af76186e

Branch: refs/heads/master
Commit: af76186e013607cb64baf151c039e4f6aaab4350
Parents: 97f0282
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Fri Oct 12 11:15:07 2018 -0700
Committer: Joe McDonnell <jo...@cloudera.com>
Committed: Sat Oct 13 03:26:03 2018 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc                   |   6 -
 be/src/exec/hdfs-parquet-table-writer.cc        | 100 ++---
 .../queries/QueryTest/stats-extrapolation.test  |  14 +-
 tests/custom_cluster/test_parquet_page_index.py | 371 ------------------
 tests/query_test/test_parquet_page_index.py     | 372 +++++++++++++++++++
 5 files changed, 417 insertions(+), 446 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/af76186e/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index ac76b53..2ea1ca5 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -239,12 +239,6 @@ DEFINE_double_hidden(invalidate_tables_fraction_on_memory_pressure, 0.1,
     "The fraction of tables to invalidate when CatalogdTableInvalidator considers the "
     "old GC generation to be almost full.");
 
-DEFINE_bool_hidden(enable_parquet_page_index_writing_debug_only, false, "If true, Impala "
-    "will write the Parquet page index. It is not advised to use it in a production "
-    "environment, only for testing and development. This flag is meant to be temporary. "
-    "We plan to remove this flag once Impala is able to read the page index and has "
-    "better test coverage around it.");
-
 // ++========================++
 // || Startup flag graveyard ||
 // ++========================++

http://git-wip-us.apache.org/repos/asf/impala/blob/af76186e/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 13137e5..8aa4f7a 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -83,8 +83,6 @@ using namespace apache::thrift;
 // the columns and run that function over row batches.
 // TODO: we need to pass in the compression from the FE/metadata
 
-DECLARE_bool(enable_parquet_page_index_writing_debug_only);
-
 namespace impala {
 
 // Base class for column writers. This contains most of the logic except for
@@ -207,58 +205,6 @@ class HdfsParquetTableWriter::BaseColumnWriter {
  protected:
   friend class HdfsParquetTableWriter;
 
-  Status AddMemoryConsumptionForPageIndex(int64_t new_memory_allocation) {
-    if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) {
-      return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_,
-          "Failed to allocate memory for Parquet page index.", new_memory_allocation);
-    }
-    page_index_memory_consumption_ += new_memory_allocation;
-    return Status::OK();
-  }
-
-  Status ReserveOffsetIndex(int64_t capacity) {
-    if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
-    RETURN_IF_ERROR(
-        AddMemoryConsumptionForPageIndex(capacity * sizeof(parquet::PageLocation)));
-    offset_index_.page_locations.reserve(capacity);
-    return Status::OK();
-  }
-
-  void AddLocationToOffsetIndex(const parquet::PageLocation& location) {
-    if (!FLAGS_enable_parquet_page_index_writing_debug_only) return;
-    offset_index_.page_locations.push_back(location);
-  }
-
-  Status AddPageStatsToColumnIndex() {
-    if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
-    parquet::Statistics page_stats;
-    page_stats_base_->EncodeToThrift(&page_stats);
-    // If pages_stats contains min_value and max_value, then append them to min_values_
-    // and max_values_ and also mark the page as not null. In case min and max values are
-    // not set, push empty strings to maintain the consistency of the index and mark the
-    // page as null. Always push the null_count.
-    string min_val;
-    string max_val;
-    if ((page_stats.__isset.min_value) && (page_stats.__isset.max_value)) {
-      Status s_min = TruncateDown(page_stats.min_value, PAGE_INDEX_MAX_STRING_LENGTH,
-          &min_val);
-      Status s_max = TruncateUp(page_stats.max_value, PAGE_INDEX_MAX_STRING_LENGTH,
-          &max_val);
-      if (!s_min.ok() || !s_max.ok()) valid_column_index_ = false;
-      column_index_.null_pages.push_back(false);
-    } else {
-      DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value);
-      column_index_.null_pages.push_back(true);
-      DCHECK_EQ(page_stats.null_count, num_values_);
-    }
-    RETURN_IF_ERROR(
-        AddMemoryConsumptionForPageIndex(min_val.capacity() + max_val.capacity()));
-    column_index_.min_values.emplace_back(std::move(min_val));
-    column_index_.max_values.emplace_back(std::move(max_val));
-    column_index_.null_counts.push_back(page_stats.null_count);
-    return Status::OK();
-  }
-
   // Encodes value into the current page output buffer and updates the column statistics
   // aggregates. Returns true if the value was appended successfully to the current page.
   // Returns false if the value was not appended to the current page and the caller can
@@ -699,10 +645,11 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
 
   *first_data_page = *file_pos;
   int64_t current_row_group_index = 0;
-  RETURN_IF_ERROR(ReserveOffsetIndex(num_data_pages_));
+  offset_index_.page_locations.resize(num_data_pages_);
 
   // Write data pages
-  for (const DataPage& page : pages_) {
+  for (int i = 0; i < num_data_pages_; ++i) {
+    DataPage& page = pages_[i];
     parquet::PageLocation location;
 
     if (page.header.data_page_header.num_values == 0) {
@@ -710,7 +657,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
       location.offset = -1;
       location.compressed_page_size = 0;
       location.first_row_index = -1;
-      AddLocationToOffsetIndex(location);
+      offset_index_.page_locations[i] = location;
       continue;
     }
 
@@ -730,7 +677,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
     // its name suggests. On the other hand, parquet::PageLocation::compressed_page_size
     // also includes the size of the page header.
     location.compressed_page_size = page.header.compressed_page_size + len;
-    AddLocationToOffsetIndex(location);
+    offset_index_.page_locations[i] = location;
 
     // Write the page data
     RETURN_IF_ERROR(parent_->Write(page.data, page.header.compressed_page_size));
@@ -807,7 +754,37 @@ Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
   }
 
   DCHECK(page_stats_base_ != nullptr);
-  RETURN_IF_ERROR(AddPageStatsToColumnIndex());
+  parquet::Statistics page_stats;
+  page_stats_base_->EncodeToThrift(&page_stats);
+  {
+    // If pages_stats contains min_value and max_value, then append them to min_values_
+    // and max_values_ and also mark the page as not null. In case min and max values are
+    // not set, push empty strings to maintain the consistency of the index and mark the
+    // page as null. Always push the null_count.
+    string min_val;
+    string max_val;
+    if ((page_stats.__isset.min_value) && (page_stats.__isset.max_value)) {
+      Status s_min = TruncateDown(page_stats.min_value, PAGE_INDEX_MAX_STRING_LENGTH,
+          &min_val);
+      Status s_max = TruncateUp(page_stats.max_value, PAGE_INDEX_MAX_STRING_LENGTH,
+          &max_val);
+      if (!s_min.ok() || !s_max.ok()) valid_column_index_ = false;
+      column_index_.null_pages.push_back(false);
+    } else {
+      DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value);
+      column_index_.null_pages.push_back(true);
+      DCHECK_EQ(page_stats.null_count, num_values_);
+    }
+    int64_t new_memory_allocation = min_val.capacity() + max_val.capacity();
+    if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) {
+      return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_,
+          "Failed to allocate memory for Parquet page index.", new_memory_allocation);
+    }
+    page_index_memory_consumption_ += new_memory_allocation;
+    column_index_.min_values.emplace_back(std::move(min_val));
+    column_index_.max_values.emplace_back(std::move(max_val));
+    column_index_.null_counts.push_back(page_stats.null_count);
+  }
 
   // Update row group statistics from page statistics.
   DCHECK(row_group_stats_base_ != nullptr);
@@ -1160,7 +1137,6 @@ Status HdfsParquetTableWriter::Finalize() {
 
   RETURN_IF_ERROR(FlushCurrentRowGroup());
   RETURN_IF_ERROR(WritePageIndex());
-  for (auto& column : columns_) column->Reset();
   RETURN_IF_ERROR(WriteFileFooter());
   stats_.__set_parquet_stats(parquet_insert_stats_);
   COUNTER_ADD(parent_->rows_inserted_counter(), row_count_);
@@ -1273,8 +1249,6 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
 }
 
 Status HdfsParquetTableWriter::WritePageIndex() {
-  if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
-
   // Currently Impala only write Parquet files with a single row group. The current
   // page index logic depends on this behavior as it only keeps one row group's
   // statistics in memory.
@@ -1310,6 +1284,8 @@ Status HdfsParquetTableWriter::WritePageIndex() {
     row_group->columns[i].__set_offset_index_length(len);
     file_pos_ += len;
   }
+  // Reset column writers.
+  for (auto& column : columns_) column->Reset();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/af76186e/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
index 3b25427..8e95168 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
@@ -33,17 +33,17 @@ show table stats alltypes
 YEAR, MONTH, #ROWS, EXTRAP #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
 ---- RESULTS
 '2009','1',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=1'
-'2009','2',-1,288,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
-'2009','3',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3'
+'2009','2',-1,289,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
+'2009','3',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3'
 '2009','4',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=4'
-'2009','5',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5'
+'2009','5',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5'
 '2009','6',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=6'
-'2009','7',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7'
-'2009','8',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8'
+'2009','7',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7'
+'2009','8',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8'
 '2009','9',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=9'
-'2009','10',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10'
+'2009','10',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10'
 '2009','11',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=11'
-'2009','12',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12'
+'2009','12',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12'
 'Total','',3650,3650,12,regex:.*B,'0B','','','',''
 ---- TYPES
 STRING,STRING,BIGINT,BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING

http://git-wip-us.apache.org/repos/asf/impala/blob/af76186e/tests/custom_cluster/test_parquet_page_index.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_parquet_page_index.py b/tests/custom_cluster/test_parquet_page_index.py
deleted file mode 100644
index 0d2a750..0000000
--- a/tests/custom_cluster/test_parquet_page_index.py
+++ /dev/null
@@ -1,371 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Targeted Impala insert tests
-
-import os
-
-from collections import namedtuple
-from subprocess import check_call
-from parquet.ttypes import BoundaryOrder, ColumnIndex, OffsetIndex, PageHeader, PageType
-
-from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.skip import SkipIfLocal
-from tests.util.filesystem_utils import get_fs_path
-from tests.util.get_parquet_metadata import (
-    decode_stats_value,
-    get_parquet_metadata,
-    read_serialized_object
-)
-
-PAGE_INDEX_MAX_STRING_LENGTH = 64
-
-
-@SkipIfLocal.parquet_file_size
-class TestHdfsParquetTableIndexWriter(CustomClusterTestSuite):
-  """Since PARQUET-922 page statistics can be written before the footer.
-  The tests in this class checks if Impala writes the page indices correctly.
-  It is temporarily a custom cluster test suite because we need to set the
-  enable_parquet_page_index_writing command-line flag for the Impala daemon
-  in order to make it write the page index.
-  TODO: IMPALA-5843 Once Impala is able to read the page index and also write it by
-  default, this test suite should be moved back to query tests.
-  """
-  @classmethod
-  def get_workload(cls):
-    return 'functional-query'
-
-  @classmethod
-  def add_test_dimensions(cls):
-    super(CustomClusterTestSuite, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.add_constraint(
-        lambda v: v.get_value('table_format').file_format == 'parquet')
-
-  def _get_row_group_from_file(self, parquet_file):
-    """Returns namedtuples that contain the schema, stats, offset_index, column_index,
-    and page_headers for each column in the first row group in file 'parquet_file'. Fails
-    if the file contains multiple row groups.
-    """
-    ColumnInfo = namedtuple('ColumnInfo', ['schema', 'stats', 'offset_index',
-        'column_index', 'page_headers'])
-
-    file_meta_data = get_parquet_metadata(parquet_file)
-    assert len(file_meta_data.row_groups) == 1
-    # We only support flat schemas, the additional element is the root element.
-    schemas = file_meta_data.schema[1:]
-    row_group = file_meta_data.row_groups[0]
-    assert len(schemas) == len(row_group.columns)
-    row_group_index = []
-    with open(parquet_file) as file_handle:
-      for column, schema in zip(row_group.columns, schemas):
-        column_index_offset = column.column_index_offset
-        column_index_length = column.column_index_length
-        column_index = None
-        if column_index_offset and column_index_length:
-          column_index = read_serialized_object(ColumnIndex, file_handle,
-                                                column_index_offset, column_index_length)
-        column_meta_data = column.meta_data
-        stats = None
-        if column_meta_data:
-          stats = column_meta_data.statistics
-
-        offset_index_offset = column.offset_index_offset
-        offset_index_length = column.offset_index_length
-        offset_index = None
-        page_headers = []
-        if offset_index_offset and offset_index_length:
-          offset_index = read_serialized_object(OffsetIndex, file_handle,
-                                                offset_index_offset, offset_index_length)
-          for page_loc in offset_index.page_locations:
-            page_header = read_serialized_object(PageHeader, file_handle, page_loc.offset,
-                                                 page_loc.compressed_page_size)
-            page_headers.append(page_header)
-
-        column_info = ColumnInfo(schema, stats, offset_index, column_index, page_headers)
-        row_group_index.append(column_info)
-    return row_group_index
-
-  def _get_row_groups_from_hdfs_folder(self, hdfs_path, tmpdir):
-    """Returns a list of column infos (containing the schema, stats, offset_index,
-    column_index, and page_headers) for the first row group in all parquet files in
-    'hdfs_path'.
-    """
-    row_group_indexes = []
-    check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
-    for root, subdirs, files in os.walk(tmpdir.strpath):
-      for f in files:
-        parquet_file = os.path.join(root, str(f))
-        row_group_indexes.append(self._get_row_group_from_file(parquet_file))
-    return row_group_indexes
-
-  def _validate_page_locations(self, page_locations):
-    """Validate that the page locations are in order."""
-    for previous_loc, current_loc in zip(page_locations[:-1], page_locations[1:]):
-      assert previous_loc.offset < current_loc.offset
-      assert previous_loc.first_row_index < current_loc.first_row_index
-
-  def _validate_null_stats(self, index_size, column_info):
-    """Validates the statistics stored in null_pages and null_counts."""
-    column_index = column_info.column_index
-    column_stats = column_info.stats
-    assert column_index.null_pages is not None
-    assert len(column_index.null_pages) == index_size
-    assert column_index.null_counts is not None
-    assert len(column_index.null_counts) == index_size
-
-    for page_is_null, null_count, page_header in zip(column_index.null_pages,
-        column_index.null_counts, column_info.page_headers):
-      assert page_header.type == PageType.DATA_PAGE
-      num_values = page_header.data_page_header.num_values
-      assert not page_is_null or null_count == num_values
-
-    if column_stats:
-      assert column_stats.null_count == sum(column_index.null_counts)
-
-  def _validate_min_max_values(self, index_size, column_info):
-    """Validate min/max values of the pages in a column chunk."""
-    column_index = column_info.column_index
-    min_values = column_info.column_index.min_values
-    assert len(min_values) == index_size
-    max_values = column_info.column_index.max_values
-    assert len(max_values) == index_size
-
-    if not column_info.stats:
-      return
-
-    column_min_value_str = column_info.stats.min_value
-    column_max_value_str = column_info.stats.max_value
-    if column_min_value_str is None or column_max_value_str is None:
-      # If either is None, then both need to be None.
-      assert column_min_value_str is None and column_max_value_str is None
-      # No min and max value, all pages need to be null
-      for idx, null_page in enumerate(column_index.null_pages):
-        assert null_page, "Page {} of column {} is not null, \
-            but doesn't have min and max values!".format(idx, column_index.schema.name)
-      # Everything is None, no further checks needed.
-      return
-
-    column_min_value = decode_stats_value(column_info.schema, column_min_value_str)
-    for null_page, page_min_str in zip(column_index.null_pages, min_values):
-      if not null_page:
-        page_min_value = decode_stats_value(column_info.schema, page_min_str)
-        # If type is str, page_min_value might have been truncated.
-        if isinstance(page_min_value, basestring):
-          assert page_min_value >= column_min_value[:len(page_min_value)]
-        else:
-          assert page_min_value >= column_min_value
-
-    column_max_value = decode_stats_value(column_info.schema, column_max_value_str)
-    for null_page, page_max_str in zip(column_index.null_pages, max_values):
-      if not null_page:
-        page_max_value = decode_stats_value(column_info.schema, page_max_str)
-        # If type is str, page_max_value might have been truncated and incremented.
-        if (isinstance(page_max_value, basestring) and
-            len(page_max_value) == PAGE_INDEX_MAX_STRING_LENGTH):
-          max_val_prefix = page_max_value.rstrip('\0')
-          assert max_val_prefix[:-1] <= column_max_value
-        else:
-          assert page_max_value <= column_max_value
-
-  def _validate_ordering(self, ordering, schema, null_pages, min_values, max_values):
-    """Check if the ordering of the values reflects the value of 'ordering'."""
-
-    def is_sorted(l, reverse=False):
-      if not reverse:
-        return all(a <= b for a, b in zip(l, l[1:]))
-      else:
-        return all(a >= b for a, b in zip(l, l[1:]))
-
-    # Filter out null pages and decode the actual min/max values.
-    actual_min_values = [decode_stats_value(schema, min_val)
-                         for min_val, is_null in zip(min_values, null_pages)
-                         if not is_null]
-    actual_max_values = [decode_stats_value(schema, max_val)
-                         for max_val, is_null in zip(max_values, null_pages)
-                         if not is_null]
-
-    # For ASCENDING and DESCENDING, both min and max values need to be sorted.
-    if ordering == BoundaryOrder.ASCENDING:
-      assert is_sorted(actual_min_values)
-      assert is_sorted(actual_max_values)
-    elif ordering == BoundaryOrder.DESCENDING:
-      assert is_sorted(actual_min_values, reverse=True)
-      assert is_sorted(actual_max_values, reverse=True)
-    else:
-      assert ordering == BoundaryOrder.UNORDERED
-      # For UNORDERED, min and max values cannot be both sorted.
-      assert not is_sorted(actual_min_values) or not is_sorted(actual_max_values)
-      assert (not is_sorted(actual_min_values, reverse=True) or
-              not is_sorted(actual_max_values, reverse=True))
-
-  def _validate_boundary_order(self, column_info):
-    """Validate that min/max values are really in the order specified by
-    boundary order.
-    """
-    column_index = column_info.column_index
-    self._validate_ordering(column_index.boundary_order, column_info.schema,
-        column_index.null_pages, column_index.min_values, column_index.max_values)
-
-  def _validate_parquet_page_index(self, hdfs_path, tmpdir):
-    """Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup
-    index in that file is in the valid format.
-    """
-    row_group_indexes = self._get_row_groups_from_hdfs_folder(hdfs_path, tmpdir)
-    for columns in row_group_indexes:
-      for column_info in columns:
-        try:
-          index_size = len(column_info.offset_index.page_locations)
-          assert index_size > 0
-          self._validate_page_locations(column_info.offset_index.page_locations)
-          # IMPALA-7304: Impala doesn't write column index for floating-point columns
-          # until PARQUET-1222 is resolved.
-          if column_info.schema.type in [4, 5]:
-            assert column_info.column_index is None
-            continue
-          self._validate_null_stats(index_size, column_info)
-          self._validate_min_max_values(index_size, column_info)
-          self._validate_boundary_order(column_info)
-        except AssertionError as e:
-          e.args += ("Validation failed on column {}.".format(column_info.schema.name),)
-          raise
-
-  def _ctas_table_and_verify_index(self, vector, unique_database, source_table,
-                                   tmpdir, sorting_column=None):
-    """Copies 'source_table' into a parquet table and makes sure that the index
-    in the resulting parquet file is valid.
-    """
-    table_name = "test_hdfs_parquet_table_writer"
-    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
-    hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
-                                                                 table_name))
-    # Setting num_nodes = 1 ensures that the query is executed on the coordinator,
-    # resulting in a single parquet file being written.
-    vector.get_value('exec_option')['num_nodes'] = 1
-    self.execute_query("drop table if exists {0}".format(qualified_table_name))
-    if sorting_column is None:
-      query = ("create table {0} stored as parquet as select * from {1}").format(
-          qualified_table_name, source_table)
-    else:
-      query = ("create table {0} sort by({1}) stored as parquet as select * from {2}"
-               ).format(qualified_table_name, sorting_column, source_table)
-    self.execute_query(query, vector.get_value('exec_option'))
-    self._validate_parquet_page_index(hdfs_path, tmpdir.join(source_table))
-
-  def _create_string_table_with_values(self, vector, unique_database, table_name,
-                                       values_sql):
-    """Creates a parquet table that has a single string column, then invokes an insert
-    statement on it with the 'values_sql' parameter. E.g. 'values_sql' is "('asdf')".
-    It returns the HDFS path for the table.
-    """
-    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
-    self.execute_query("drop table if exists {0}".format(qualified_table_name))
-    vector.get_value('exec_option')['num_nodes'] = 1
-    query = ("create table {0} (str string) stored as parquet").format(
-        qualified_table_name)
-    self.execute_query(query, vector.get_value('exec_option'))
-    self.execute_query("insert into {0} values {1}".format(qualified_table_name,
-        values_sql), vector.get_value('exec_option'))
-    return get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
-        table_name))
-
-  @CustomClusterTestSuite.with_args("--enable_parquet_page_index_writing_debug_only")
-  def test_ctas_tables(self, vector, unique_database, tmpdir):
-    """Test different Parquet files created via CTAS statements."""
-
-    # Test that writing a parquet file populates the rowgroup indexes with the correct
-    # values.
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.alltypes",
-        tmpdir)
-
-    # Test that writing a parquet file populates the rowgroup indexes with the correct
-    # values, using decimal types.
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.decimal_tbl",
-        tmpdir)
-
-    # Test that writing a parquet file populates the rowgroup indexes with the correct
-    # values, using char types.
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.chars_formats",
-        tmpdir)
-
-    # Test that we don't write min/max values in the index for null columns.
-    # Ensure null_count is set for columns with null values.
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.nulltable",
-        tmpdir)
-
-    # Test that when a ColumnChunk is written across multiple pages, the index is
-    # valid.
-    self._ctas_table_and_verify_index(vector, unique_database, "tpch.customer",
-        tmpdir)
-    self._ctas_table_and_verify_index(vector, unique_database, "tpch.orders",
-        tmpdir)
-
-    # Test that when the schema has a sorting column, the index is valid.
-    self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.zipcode_incomes", tmpdir, "id")
-
-    # Test table with wide row.
-    self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.widerow", tmpdir)
-
-    # Test tables with wide rows and many columns.
-    self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.widetable_250_cols", tmpdir)
-    self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.widetable_500_cols", tmpdir)
-    self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.widetable_1000_cols", tmpdir)
-
-  @CustomClusterTestSuite.with_args("--enable_parquet_page_index_writing_debug_only")
-  def test_max_string_values(self, vector, unique_database, tmpdir):
-    """Test string values that are all 0xFFs or end with 0xFFs."""
-
-    # String value is all of 0xFFs but its length is less than PAGE_INDEX_TRUNCATE_LENGTH.
-    short_tbl = "short_tbl"
-    short_hdfs_path = self._create_string_table_with_values(vector, unique_database,
-        short_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH - 1))
-    self._validate_parquet_page_index(short_hdfs_path, tmpdir.join(short_tbl))
-
-    # String value is all of 0xFFs and its length is PAGE_INDEX_TRUNCATE_LENGTH.
-    fit_tbl = "fit_tbl"
-    fit_hdfs_path = self._create_string_table_with_values(vector, unique_database,
-        fit_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH))
-    self._validate_parquet_page_index(fit_hdfs_path, tmpdir.join(fit_tbl))
-
-    # All bytes are 0xFFs and the string is longer then PAGE_INDEX_TRUNCATE_LENGTH, so we
-    # should not write page statistics.
-    too_long_tbl = "too_long_tbl"
-    too_long_hdfs_path = self._create_string_table_with_values(vector, unique_database,
-        too_long_tbl, "(rpad('', {0}, chr(255)))".format(
-            PAGE_INDEX_MAX_STRING_LENGTH + 1))
-    row_group_indexes = self._get_row_groups_from_hdfs_folder(too_long_hdfs_path,
-        tmpdir.join(too_long_tbl))
-    column = row_group_indexes[0][0]
-    assert column.column_index is None
-    # We always write the offset index
-    assert column.offset_index is not None
-
-    # Test string with value that starts with 'aaa' following with 0xFFs and its length is
-    # greater than PAGE_INDEX_TRUNCATE_LENGTH. Max value should be 'aab'.
-    aaa_tbl = "aaa_tbl"
-    aaa_hdfs_path = self._create_string_table_with_values(vector, unique_database,
-        aaa_tbl, "(rpad('aaa', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
-    row_group_indexes = self._get_row_groups_from_hdfs_folder(aaa_hdfs_path,
-        tmpdir.join(aaa_tbl))
-    column = row_group_indexes[0][0]
-    assert len(column.column_index.max_values) == 1
-    max_value = column.column_index.max_values[0]
-    assert max_value == 'aab'

http://git-wip-us.apache.org/repos/asf/impala/blob/af76186e/tests/query_test/test_parquet_page_index.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_parquet_page_index.py b/tests/query_test/test_parquet_page_index.py
new file mode 100644
index 0000000..6235819
--- /dev/null
+++ b/tests/query_test/test_parquet_page_index.py
@@ -0,0 +1,372 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Targeted Impala insert tests
+
+import os
+
+from collections import namedtuple
+from subprocess import check_call
+from parquet.ttypes import BoundaryOrder, ColumnIndex, OffsetIndex, PageHeader, PageType
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.skip import SkipIfLocal
+from tests.util.filesystem_utils import get_fs_path
+from tests.util.get_parquet_metadata import (
+    decode_stats_value,
+    get_parquet_metadata,
+    read_serialized_object
+)
+
+PAGE_INDEX_MAX_STRING_LENGTH = 64
+
+
+@SkipIfLocal.parquet_file_size
+class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
+  """Since PARQUET-922 page statistics can be written before the footer.
+  The tests in this class checks if Impala writes the page indices correctly.
+  """
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestHdfsParquetTableIndexWriter, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'parquet')
+
+  def _get_row_group_from_file(self, parquet_file):
+    """Returns namedtuples that contain the schema, stats, offset_index, column_index,
+    and page_headers for each column in the first row group in file 'parquet_file'. Fails
+    if the file contains multiple row groups.
+    """
+    ColumnInfo = namedtuple('ColumnInfo', ['schema', 'stats', 'offset_index',
+        'column_index', 'page_headers'])
+
+    file_meta_data = get_parquet_metadata(parquet_file)
+    assert len(file_meta_data.row_groups) == 1
+    # We only support flat schemas, the additional element is the root element.
+    schemas = file_meta_data.schema[1:]
+    row_group = file_meta_data.row_groups[0]
+    assert len(schemas) == len(row_group.columns)
+    row_group_index = []
+    with open(parquet_file) as file_handle:
+      for column, schema in zip(row_group.columns, schemas):
+        column_index_offset = column.column_index_offset
+        column_index_length = column.column_index_length
+        column_index = None
+        if column_index_offset and column_index_length:
+          column_index = read_serialized_object(ColumnIndex, file_handle,
+                                                column_index_offset, column_index_length)
+        column_meta_data = column.meta_data
+        stats = None
+        if column_meta_data:
+          stats = column_meta_data.statistics
+
+        offset_index_offset = column.offset_index_offset
+        offset_index_length = column.offset_index_length
+        offset_index = None
+        page_headers = []
+        if offset_index_offset and offset_index_length:
+          offset_index = read_serialized_object(OffsetIndex, file_handle,
+                                                offset_index_offset, offset_index_length)
+          for page_loc in offset_index.page_locations:
+            page_header = read_serialized_object(PageHeader, file_handle, page_loc.offset,
+                                                 page_loc.compressed_page_size)
+            page_headers.append(page_header)
+
+        column_info = ColumnInfo(schema, stats, offset_index, column_index, page_headers)
+        row_group_index.append(column_info)
+    return row_group_index
+
+  def _get_row_groups_from_hdfs_folder(self, hdfs_path, tmpdir):
+    """Returns a list of column infos (containing the schema, stats, offset_index,
+    column_index, and page_headers) for the first row group in all parquet files in
+    'hdfs_path'.
+    """
+    row_group_indexes = []
+    check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
+    for root, subdirs, files in os.walk(tmpdir.strpath):
+      for f in files:
+        parquet_file = os.path.join(root, str(f))
+        row_group_indexes.append(self._get_row_group_from_file(parquet_file))
+    return row_group_indexes
+
+  def _validate_page_locations(self, page_locations):
+    """Validate that the page locations are in order."""
+    for previous_loc, current_loc in zip(page_locations[:-1], page_locations[1:]):
+      assert previous_loc.offset < current_loc.offset
+      assert previous_loc.first_row_index < current_loc.first_row_index
+
+  def _validate_null_stats(self, index_size, column_info):
+    """Validates the statistics stored in null_pages and null_counts."""
+    column_index = column_info.column_index
+    column_stats = column_info.stats
+    assert column_index.null_pages is not None
+    assert len(column_index.null_pages) == index_size
+    assert column_index.null_counts is not None
+    assert len(column_index.null_counts) == index_size
+
+    for page_is_null, null_count, page_header in zip(column_index.null_pages,
+        column_index.null_counts, column_info.page_headers):
+      assert page_header.type == PageType.DATA_PAGE
+      num_values = page_header.data_page_header.num_values
+      assert not page_is_null or null_count == num_values
+
+    if column_stats:
+      assert column_stats.null_count == sum(column_index.null_counts)
+
+  def _validate_min_max_values(self, index_size, column_info):
+    """Validate min/max values of the pages in a column chunk."""
+    column_index = column_info.column_index
+    min_values = column_info.column_index.min_values
+    assert len(min_values) == index_size
+    max_values = column_info.column_index.max_values
+    assert len(max_values) == index_size
+
+    if not column_info.stats:
+      return
+
+    column_min_value_str = column_info.stats.min_value
+    column_max_value_str = column_info.stats.max_value
+    if column_min_value_str is None or column_max_value_str is None:
+      # If either is None, then both need to be None.
+      assert column_min_value_str is None and column_max_value_str is None
+      # No min and max value, all pages need to be null
+      for idx, null_page in enumerate(column_index.null_pages):
+        assert null_page, "Page {} of column {} is not null, \
+            but doesn't have min and max values!".format(idx, column_index.schema.name)
+      # Everything is None, no further checks needed.
+      return
+
+    column_min_value = decode_stats_value(column_info.schema, column_min_value_str)
+    for null_page, page_min_str in zip(column_index.null_pages, min_values):
+      if not null_page:
+        page_min_value = decode_stats_value(column_info.schema, page_min_str)
+        # If type is str, page_min_value might have been truncated.
+        if isinstance(page_min_value, basestring):
+          assert page_min_value >= column_min_value[:len(page_min_value)]
+        else:
+          assert page_min_value >= column_min_value
+
+    column_max_value = decode_stats_value(column_info.schema, column_max_value_str)
+    for null_page, page_max_str in zip(column_index.null_pages, max_values):
+      if not null_page:
+        page_max_value = decode_stats_value(column_info.schema, page_max_str)
+        # If type is str, page_max_value might have been truncated and incremented.
+        if (isinstance(page_max_value, basestring) and
+            len(page_max_value) == PAGE_INDEX_MAX_STRING_LENGTH):
+          max_val_prefix = page_max_value.rstrip('\0')
+          assert max_val_prefix[:-1] <= column_max_value
+        else:
+          assert page_max_value <= column_max_value
+
+  def _validate_ordering(self, ordering, schema, null_pages, min_values, max_values):
+    """Check if the ordering of the values reflects the value of 'ordering'."""
+
+    def is_sorted(l, reverse=False):
+      if not reverse:
+        return all(a <= b for a, b in zip(l, l[1:]))
+      else:
+        return all(a >= b for a, b in zip(l, l[1:]))
+
+    # Filter out null pages and decode the actual min/max values.
+    actual_min_values = [decode_stats_value(schema, min_val)
+                         for min_val, is_null in zip(min_values, null_pages)
+                         if not is_null]
+    actual_max_values = [decode_stats_value(schema, max_val)
+                         for max_val, is_null in zip(max_values, null_pages)
+                         if not is_null]
+
+    # For ASCENDING and DESCENDING, both min and max values need to be sorted.
+    if ordering == BoundaryOrder.ASCENDING:
+      assert is_sorted(actual_min_values)
+      assert is_sorted(actual_max_values)
+    elif ordering == BoundaryOrder.DESCENDING:
+      assert is_sorted(actual_min_values, reverse=True)
+      assert is_sorted(actual_max_values, reverse=True)
+    else:
+      assert ordering == BoundaryOrder.UNORDERED
+      # For UNORDERED, min and max values cannot be both sorted.
+      assert not is_sorted(actual_min_values) or not is_sorted(actual_max_values)
+      assert (not is_sorted(actual_min_values, reverse=True) or
+              not is_sorted(actual_max_values, reverse=True))
+
+  def _validate_boundary_order(self, column_info):
+    """Validate that min/max values are really in the order specified by
+    boundary order.
+    """
+    column_index = column_info.column_index
+    self._validate_ordering(column_index.boundary_order, column_info.schema,
+        column_index.null_pages, column_index.min_values, column_index.max_values)
+
+  def _validate_parquet_page_index(self, hdfs_path, tmpdir):
+    """Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup
+    index in that file is in the valid format.
+    """
+    row_group_indexes = self._get_row_groups_from_hdfs_folder(hdfs_path, tmpdir)
+    for columns in row_group_indexes:
+      for column_info in columns:
+        try:
+          index_size = len(column_info.offset_index.page_locations)
+          assert index_size > 0
+          self._validate_page_locations(column_info.offset_index.page_locations)
+          # IMPALA-7304: Impala doesn't write column index for floating-point columns
+          # until PARQUET-1222 is resolved.
+          if column_info.schema.type in [4, 5]:
+            assert column_info.column_index is None
+            continue
+          self._validate_null_stats(index_size, column_info)
+          self._validate_min_max_values(index_size, column_info)
+          self._validate_boundary_order(column_info)
+        except AssertionError as e:
+          e.args += ("Validation failed on column {}.".format(column_info.schema.name),)
+          raise
+
+  def _ctas_table_and_verify_index(self, vector, unique_database, source_table,
+                                   tmpdir, sorting_column=None):
+    """Copies 'source_table' into a parquet table and makes sure that the index
+    in the resulting parquet file is valid.
+    """
+    table_name = "test_hdfs_parquet_table_writer"
+    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
+    hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
+                                                                 table_name))
+    # Setting num_nodes = 1 ensures that the query is executed on the coordinator,
+    # resulting in a single parquet file being written.
+    vector.get_value('exec_option')['num_nodes'] = 1
+    self.execute_query("drop table if exists {0}".format(qualified_table_name))
+    if sorting_column is None:
+      query = ("create table {0} stored as parquet as select * from {1}").format(
+          qualified_table_name, source_table)
+    else:
+      query = ("create table {0} sort by({1}) stored as parquet as select * from {2}"
+               ).format(qualified_table_name, sorting_column, source_table)
+    self.execute_query(query, vector.get_value('exec_option'))
+    self._validate_parquet_page_index(hdfs_path, tmpdir.join(source_table))
+
+  def _create_string_table_with_values(self, vector, unique_database, table_name,
+                                       values_sql):
+    """Creates a parquet table that has a single string column, then invokes an insert
+    statement on it with the 'values_sql' parameter. E.g. 'values_sql' is "('asdf')".
+    It returns the HDFS path for the table.
+    """
+    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
+    self.execute_query("drop table if exists {0}".format(qualified_table_name))
+    vector.get_value('exec_option')['num_nodes'] = 1
+    query = ("create table {0} (str string) stored as parquet").format(qualified_table_name)
+    self.execute_query(query, vector.get_value('exec_option'))
+    self.execute_query("insert into {0} values {1}".format(qualified_table_name,
+        values_sql), vector.get_value('exec_option'))
+    return get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
+        table_name))
+
+  def test_write_index_alltypes(self, vector, unique_database, tmpdir):
+    """Test that writing a parquet file populates the rowgroup indexes with the correct
+    values.
+    """
+    self._ctas_table_and_verify_index(vector, unique_database, "functional.alltypes",
+        tmpdir)
+
+  def test_write_index_decimals(self, vector, unique_database, tmpdir):
+    """Test that writing a parquet file populates the rowgroup indexes with the correct
+    values, using decimal types.
+    """
+    self._ctas_table_and_verify_index(vector, unique_database, "functional.decimal_tbl",
+        tmpdir)
+
+  def test_write_index_chars(self, vector, unique_database, tmpdir):
+    """Test that writing a parquet file populates the rowgroup indexes with the correct
+    values, using char types.
+    """
+    self._ctas_table_and_verify_index(vector, unique_database, "functional.chars_formats",
+        tmpdir)
+
+  def test_write_index_null(self, vector, unique_database, tmpdir):
+    """Test that we don't write min/max values in the index for null columns.
+    Ensure null_count is set for columns with null values.
+    """
+    self._ctas_table_and_verify_index(vector, unique_database, "functional.nulltable",
+        tmpdir)
+
+  def test_write_index_multi_page(self, vector, unique_database, tmpdir):
+    """Test that when a ColumnChunk is written across multiple pages, the index is
+    valid.
+    """
+    self._ctas_table_and_verify_index(vector, unique_database, "tpch.customer",
+        tmpdir)
+    self._ctas_table_and_verify_index(vector, unique_database, "tpch.orders",
+        tmpdir)
+
+  def test_write_index_sorting_column(self, vector, unique_database, tmpdir):
+    """Test that when the schema has a sorting column, the index is valid."""
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.zipcode_incomes", tmpdir, "id")
+
+  def test_write_index_wide_table(self, vector, unique_database, tmpdir):
+    """Test table with wide row."""
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.widerow", tmpdir)
+
+  def test_write_index_many_columns_tables(self, vector, unique_database, tmpdir):
+    """Test tables with wide rows and many columns."""
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.widetable_250_cols", tmpdir)
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.widetable_500_cols", tmpdir)
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.widetable_1000_cols", tmpdir)
+
+  def test_max_string_values(self, vector, unique_database, tmpdir):
+    """Test string values that are all 0xFFs or end with 0xFFs."""
+
+    # String value is all of 0xFFs but its length is less than PAGE_INDEX_TRUNCATE_LENGTH.
+    short_tbl = "short_tbl"
+    short_hdfs_path = self._create_string_table_with_values(vector, unique_database,
+        short_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH - 1))
+    self._validate_parquet_page_index(short_hdfs_path, tmpdir.join(short_tbl))
+
+    # String value is all of 0xFFs and its length is PAGE_INDEX_TRUNCATE_LENGTH.
+    fit_tbl = "fit_tbl"
+    fit_hdfs_path = self._create_string_table_with_values(vector, unique_database,
+        fit_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH))
+    self._validate_parquet_page_index(fit_hdfs_path, tmpdir.join(fit_tbl))
+
+    # All bytes are 0xFFs and the string is longer then PAGE_INDEX_TRUNCATE_LENGTH, so we
+    # should not write page statistics.
+    too_long_tbl = "too_long_tbl"
+    too_long_hdfs_path = self._create_string_table_with_values(vector, unique_database,
+        too_long_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
+    row_group_indexes = self._get_row_groups_from_hdfs_folder(too_long_hdfs_path,
+        tmpdir.join(too_long_tbl))
+    column = row_group_indexes[0][0]
+    assert column.column_index is None
+    # We always write the offset index
+    assert column.offset_index is not None
+
+    # Test string with value that starts with 'aaa' following with 0xFFs and its length is
+    # greater than PAGE_INDEX_TRUNCATE_LENGTH. Max value should be 'aab'.
+    aaa_tbl = "aaa_tbl"
+    aaa_hdfs_path = self._create_string_table_with_values(vector, unique_database,
+        aaa_tbl, "(rpad('aaa', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
+    row_group_indexes = self._get_row_groups_from_hdfs_folder(aaa_hdfs_path,
+        tmpdir.join(aaa_tbl))
+    column = row_group_indexes[0][0]
+    assert len(column.column_index.max_values) == 1
+    max_value = column.column_index.max_values[0]
+    assert max_value == 'aab'


[4/5] impala git commit: IMPALA-7622: adds profile metrics for incremental stats

Posted by jo...@apache.org.
IMPALA-7622: adds profile metrics for incremental stats

Reapplies change after fixing where frontend profile is placed in runtime
profile.

When computing incremental statistics by fetching the stats directly
from catalogd, a potentially expensive RPC is made from the impalad
coordinator to catalogd. This change adds metrics to the frontend
section of the profile to track how long the request takes, the size
of the compressed bytes received, and the number of partitions received.

The profile for a 'compute incremental ...' command on a table with
no statistics looks like this:

Frontend:
     - StatsFetch.CompressedBytes: 0
     - StatsFetch.TotalPartitions: 24
     - StatsFetch.NumPartitionsWithStats: 0
     - StatsFetch.Time: 26ms

And the profile looks as follows when the table has stats, so the stats
are fetched:

Frontend:
     - StatsFetch.CompressedBytes: 24622
     - StatsFetch.TotalPartitions: 23
     - StatsFetch.NumPartitionsWithStats: 23
     - StatsFetch.Time: 14ms

Testing:
- manual inspection
- e2e test to check the profile

Change-Id: I94559a749500d44aa6aad564134d55c39e1d5273
Reviewed-on: http://gerrit.cloudera.org:8080/11670
Reviewed-by: Tianyi Wang <tw...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/97f02829
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/97f02829
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/97f02829

Branch: refs/heads/master
Commit: 97f028299c9d9d7493bdbeaacbf0a288678f9371
Parents: a80ec4a
Author: Vuk Ercegovac <ve...@cloudera.com>
Authored: Wed Sep 26 16:14:43 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 12 23:44:42 2018 +0000

----------------------------------------------------------------------
 .../impala/analysis/ComputeStatsStmt.java       | 43 ++++++++++++++++-
 tests/common/custom_cluster_test_suite.py       |  2 +-
 tests/custom_cluster/test_pull_stats.py         | 51 ++++++++++++++++++++
 3 files changed, 93 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/97f02829/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 36f88f2..24f387c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -46,15 +46,18 @@ import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.CatalogOpExecutor;
+import org.apache.impala.service.FrontendProfile;
 import org.apache.impala.thrift.TComputeStatsParams;
 import org.apache.impala.thrift.TErrorCode;
 import org.apache.impala.thrift.TGetPartitionStatsResponse;
 import org.apache.impala.thrift.TPartitionStats;
 import org.apache.impala.thrift.TTableName;
+import org.apache.impala.thrift.TUnit;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -114,6 +117,21 @@ public class ComputeStatsStmt extends StatementBase {
   private static String AVRO_SCHEMA_MSG_SUFFIX = "Please re-create the table with " +
           "column definitions, e.g., using the result of 'SHOW CREATE TABLE'";
 
+  // Metrics collected when fetching incremental statistics from Catalogd. All metrics
+  // are per query.
+  private static final String STATS_FETCH_PREFIX = "StatsFetch";
+  // Time (ms) needed to fetch all partitions stats from catalogd.
+  private static final String STATS_FETCH_TIME = STATS_FETCH_PREFIX + ".Time";
+  // Number of compressed bytes received for all partitions.
+  private static final String STATS_FETCH_COMPRESSED_BYTES =
+      STATS_FETCH_PREFIX + ".CompressedBytes";
+  // Number of partitions sent from Catalogd.
+  private static final String STATS_FETCH_TOTAL_PARTITIONS =
+      STATS_FETCH_PREFIX + ".TotalPartitions";
+  // Number of partitions sent from Catalogd that include statistics.
+  private static final String STATS_FETCH_NUM_PARTITIONS_WITH_STATS =
+      STATS_FETCH_PREFIX + ".NumPartitionsWithStats";
+
   protected final TableName tableName_;
   protected final TableSampleClause sampleParams_;
 
@@ -627,8 +645,6 @@ public class ComputeStatsStmt extends StatementBase {
    * - incremental statistics are present
    * - the partition is whitelisted in 'partitions'
    * - the partition is present in the local impalad catalog
-   * TODO(vercegovac): Add metrics to track time spent for these rpc's when fetching
-   *                   from catalog. Look into adding to timeline.
    * TODO(vercegovac): Look into parallelizing the fetch while child-queries are
    *                   running. Easiest would be to move this fetch to the backend.
    */
@@ -638,6 +654,10 @@ public class ComputeStatsStmt extends StatementBase {
     Preconditions.checkState(BackendConfig.INSTANCE.pullIncrementalStatistics()
         && !RuntimeEnv.INSTANCE.isTestEnv());
     if (partitions.isEmpty()) return Collections.emptyMap();
+    Stopwatch sw = new Stopwatch().start();
+    int numCompressedBytes = 0;
+    int totalPartitions = 0;
+    int numPartitionsWithStats = 0;
     try {
       TGetPartitionStatsResponse response =
           analyzer.getCatalog().getPartitionStats(table.getTableName());
@@ -657,16 +677,19 @@ public class ComputeStatsStmt extends StatementBase {
       // local catalogs are returned.
       Map<Long, TPartitionStats> partitionStats =
           Maps.newHashMapWithExpectedSize(partitions.size());
+      totalPartitions = partitions.size();
       for (FeFsPartition part: partitions) {
         ByteBuffer compressedStats = response.partition_stats.get(
             FeCatalogUtils.getPartitionName(part));
         if (compressedStats != null) {
           byte[] compressedStatsBytes = new byte[compressedStats.remaining()];
+          numCompressedBytes += compressedStatsBytes.length;
           compressedStats.get(compressedStatsBytes);
           TPartitionStats remoteStats =
               PartitionStatsUtil.partStatsFromCompressedBytes(
                   compressedStatsBytes, part);
           if (remoteStats != null && remoteStats.isSetIntermediate_col_stats()) {
+            ++numPartitionsWithStats;
             partitionStats.put(part.getId(), remoteStats);
           }
         }
@@ -675,10 +698,26 @@ public class ComputeStatsStmt extends StatementBase {
     } catch (Exception e) {
       Throwables.propagateIfInstanceOf(e, AnalysisException.class);
       throw new AnalysisException("Error fetching partition statistics", e);
+    } finally {
+      recordFetchMetrics(numCompressedBytes, totalPartitions, numPartitionsWithStats, sw);
     }
   }
 
   /**
+   * Adds metrics to the frontend profile when fetching incremental stats from catalogd.
+   */
+  private static void recordFetchMetrics(int numCompressedBytes,
+      int totalPartitions, int numPartitionsWithStats, Stopwatch stopwatch) {
+    FrontendProfile profile = FrontendProfile.getCurrentOrNull();
+    if (profile == null) return;
+    profile.addToCounter(STATS_FETCH_COMPRESSED_BYTES, TUnit.BYTES, numCompressedBytes);
+    profile.addToCounter(STATS_FETCH_TOTAL_PARTITIONS, TUnit.NONE, totalPartitions);
+    profile.addToCounter(STATS_FETCH_NUM_PARTITIONS_WITH_STATS, TUnit.NONE,
+        numPartitionsWithStats);
+    profile.addToCounter(STATS_FETCH_TIME, TUnit.TIME_MS, stopwatch.elapsedMillis());
+  }
+
+  /**
    * Analyzes the TABLESAMPLE clause and computes the files sample to set
    * 'effectiveSamplePerc_'.
    * Returns the TABLESAMPLE SQL to be used for all child queries or an empty string if

http://git-wip-us.apache.org/repos/asf/impala/blob/97f02829/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 4274abf..0140303 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -194,7 +194,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
 
     if pytest.config.option.pull_incremental_statistics:
       cmd.append("--impalad_args=%s --catalogd_args=%s" %
-                 ("--pull_incremental_statistcs", "--pull_incremental_statistics"))
+                 ("--pull_incremental_statistics", "--pull_incremental_statistics"))
 
     default_query_option_kvs = []
     # Put any defaults first, then any arguments after that so they can override defaults.

http://git-wip-us.apache.org/repos/asf/impala/blob/97f02829/tests/custom_cluster/test_pull_stats.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_pull_stats.py b/tests/custom_cluster/test_pull_stats.py
index e470ead..b852f3d 100644
--- a/tests/custom_cluster/test_pull_stats.py
+++ b/tests/custom_cluster/test_pull_stats.py
@@ -31,3 +31,54 @@ class TestPullStatistics(CustomClusterTestSuite):
                                     catalogd_args="--pull_incremental_statistics=true")
   def test_pull_stats(self, vector, unique_database):
     self.run_test_case('QueryTest/compute-stats-incremental', vector, unique_database)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="--pull_incremental_statistics=true",
+                                    catalogd_args="--pull_incremental_statistics=true")
+  def test_pull_stats_profile(self, vector, unique_database):
+    """Checks that the frontend profile includes metrics when computing
+       incremental statistics.
+    """
+    try:
+      client = self.cluster.impalads[0].service.create_beeswax_client()
+      create = "create table test like functional.alltypes"
+      load = "insert into test partition(year, month) select * from functional.alltypes"
+      insert = """insert into test partition(year=2009, month=1) values
+                  (29349999, true, 4, 4, 4, 40,4.400000095367432,40.4,
+                  "10/21/09","4","2009-10-21 03:24:09.600000000")"""
+      stats_all = "compute incremental stats test"
+      stats_part = "compute incremental stats test partition (year=2009,month=1)"
+
+      # Checks that profile does not have metrics for incremental stats when
+      # the operation is not 'compute incremental stats'.
+      self.execute_query_expect_success(client, "use %s" % unique_database)
+      profile = self.execute_query_expect_success(client, create).runtime_profile
+      assert profile.count("StatsFetch") == 0
+      # Checks that incremental stats metrics are present when 'compute incremental
+      # stats' is run. Since the table has no stats, expect that no bytes are fetched.
+      self.execute_query_expect_success(client, load)
+      profile = self.execute_query_expect_success(client, stats_all).runtime_profile
+      assert profile.count("StatsFetch") > 1
+      assert profile.count("StatsFetch.CompressedBytes: 0") == 1
+      # Checks that bytes fetched is non-zero since incremental stats are present now
+      # and should have been fetched.
+      self.execute_query_expect_success(client, insert)
+      profile = self.execute_query_expect_success(client, stats_part).runtime_profile
+      assert profile.count("StatsFetch") > 1
+      assert profile.count("StatsFetch.CompressedBytes") == 1
+      assert profile.count("StatsFetch.CompressedBytes: 0") == 0
+      # Adds a partition, computes stats, and checks that the metrics in the profile
+      # reflect the operation.
+      alter = "alter table test add partition(year=2011, month=1)"
+      insert_new_partition = """
+          insert into test partition(year=2011, month=1) values
+          (29349999, true, 4, 4, 4, 40,4.400000095367432,40.4,
+          "10/21/09","4","2009-10-21 03:24:09.600000000")
+          """
+      self.execute_query_expect_success(client, alter)
+      self.execute_query_expect_success(client, insert_new_partition)
+      profile = self.execute_query_expect_success(client, stats_all).runtime_profile
+      assert profile.count("StatsFetch.TotalPartitions: 25") == 1
+      assert profile.count("StatsFetch.NumPartitionsWithStats: 24") == 1
+    finally:
+      client.close()