You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2018/10/08 17:11:57 UTC

[1/6] impala git commit: IMPALA-7661: Increase the sleep time in test_reconnect

Repository: impala
Updated Branches:
  refs/heads/master 0e1de31ba -> d48ffc2d4


IMPALA-7661: Increase the sleep time in test_reconnect

test_reconnect is flaky in ASAN because the time waited for impala
shell to connect to impalad is not enough. This patch increases the
sleep time from 2 secs to 5 secs.

Change-Id: Ia009808adac0da1cfa00b9e9dd41cc276d49c6eb
Reviewed-on: http://gerrit.cloudera.org:8080/11589
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f8b2eb58
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f8b2eb58
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f8b2eb58

Branch: refs/heads/master
Commit: f8b2eb585ad4c5a57763e07a88266a3a757432a2
Parents: 0e1de31
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Thu Oct 4 18:07:21 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 5 22:05:24 2018 +0000

----------------------------------------------------------------------
 tests/shell/test_shell_interactive.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/f8b2eb58/tests/shell/test_shell_interactive.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py
index 860803d..4d071de 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -231,14 +231,14 @@ class TestImpalaShellInteractive(object):
     num_sessions_target = get_num_open_sessions(target_impala_service)
     # Connect to localhost:21000 (default)
     p = ImpalaShell()
-    sleep(2)
+    sleep(5)
     # Make sure we're connected <hostname>:21000
     assert get_num_open_sessions(initial_impala_service) == num_sessions_initial + 1, \
         "Not connected to %s:21000" % hostname
     p.send_cmd("connect %s:21001" % hostname)
 
     # Wait for a little while
-    sleep(2)
+    sleep(5)
     # The number of sessions on the target impalad should have been incremented.
     assert get_num_open_sessions(target_impala_service) == num_sessions_target + 1, \
         "Not connected to %s:21001" % hostname


[3/6] impala git commit: IMPALA-7643: report # queries actually executing in stress test

Posted by bh...@apache.org.
IMPALA-7643: report # queries actually executing in stress test

With admission control it's interesting to separate out two categories
of queries:
1. Queries that have started up and are executing
2. Queries that have not made it that far yet, e.g. are waiting to
  establish a client connection (hitting --fe_service_threads limit),
  are in the planner, are queued in admission control or are starting
  up.

We now call 1+2 "Active" and 1 "Executing".

Example output:
Done | Active | Executing | Mem Lmt Ex | AC Reject | AC Timeout | Time Out | Cancel | Err | Incorrect | Next Qry Mem Lmt | Tot Qry Mem Lmt | Tracked Mem | RSS Mem
   0 |      0 |         0 |          0 |         0 |          0 |        0 |      0 |   0 |         0 |                0 |               0 |             |
   0 |     10 |         3 |          0 |         0 |          0 |        0 |      0 |   0 |         0 |              510 |            3922 |         158 |    4541
   0 |     20 |        10 |          0 |         0 |          0 |        0 |      0 |   0 |         0 |              390 |            8534 |         570 |    4517

Refactored QueryRunner.run_query() to reduce nesting and make it more
readable.

Testing:
Ran local stress tests with and without --test_admission_control set.

Change-Id: I5692e8e5ba3224becefc24437197bf5a5b450335
Reviewed-on: http://gerrit.cloudera.org:8080/11587
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/81c58d5d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/81c58d5d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/81c58d5d

Branch: refs/heads/master
Commit: 81c58d5de0d0295b5535ff15afb284bccb6b0026
Parents: d3db326
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Oct 2 17:26:51 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Sat Oct 6 03:13:42 2018 +0000

----------------------------------------------------------------------
 tests/stress/concurrent_select.py | 175 ++++++++++++++++++++++-----------
 1 file changed, 117 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/81c58d5d/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index 80a2386..844c245 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -427,9 +427,9 @@ class StressRunner(object):
     self._mem_mb_needed_for_next_query = Value("i", 0)
 
     # This lock provides a way to stop new queries from running. This lock must be
-    # acquired before writing to _num_queries_started. Before query submission
-    # _num_queries_started must be incremented. Reading _num_queries_started is allowed
-    # without taking this lock.
+    # acquired before writing to _num_queries_submitted. Before query submission
+    # _num_queries_submitted must be incremented. Reading _num_queries_submitted is
+    # allowed without taking this lock.
     self._submit_query_lock = Lock()
 
     self.leak_check_interval_mins = None
@@ -439,7 +439,11 @@ class StressRunner(object):
 
     # All values below are cumulative.
     self._num_queries_dequeued = Value("i", 0)
-    self._num_queries_started = Value("i", 0)
+    # The number of queries that were submitted to a query runner.
+    self._num_queries_submitted = Value("i", 0)
+    # The number of queries that have entered the RUNNING state (i.e. got through Impala's
+    # admission control and started executing) or were cancelled or hit an error.
+    self._num_queries_started_running_or_cancelled = Value("i", 0)
     self._num_queries_finished = Value("i", 0)
     self._num_queries_exceeded_mem_limit = Value("i", 0)
     self._num_queries_ac_rejected = Value("i", 0)
@@ -458,9 +462,9 @@ class StressRunner(object):
     self.results_dir = gettempdir()
 
     self._status_headers = [
-        "Done", "Running", "Mem Lmt Ex", "AC Reject", "AC Timeout", "Time Out", "Cancel",
-        "Err", "Incorrect", "Next Qry Mem Lmt", "Tot Qry Mem Lmt", "Tracked Mem",
-        "RSS Mem"]
+        "Done", "Active", "Executing", "Mem Lmt Ex", "AC Reject", "AC Timeout",
+        "Time Out", "Cancel", "Err", "Incorrect", "Next Qry Mem Lmt",
+        "Tot Qry Mem Lmt", "Tracked Mem", "RSS Mem"]
 
     self._num_queries_to_run = None
     self._query_producer_thread = None
@@ -563,11 +567,11 @@ class StressRunner(object):
   def _start_consuming_queries(self, impala):
     def start_additional_runners_if_needed():
       try:
-        while self._num_queries_started.value < self._num_queries_to_run:
+        while self._num_queries_submitted.value < self._num_queries_to_run:
           sleep(1.0 / self.startup_queries_per_sec)
           # Remember num dequeued/started are cumulative.
           with self._submit_query_lock:
-            if self._num_queries_dequeued.value != self._num_queries_started.value:
+            if self._num_queries_dequeued.value != self._num_queries_submitted.value:
               # Assume dequeued queries are stuck waiting for cluster resources so there
               # is no point in starting an additional runner.
               continue
@@ -597,10 +601,10 @@ class StressRunner(object):
       # while no queries were running.
       ready_to_unlock = None
       try:
-        while self._num_queries_started.value < self._num_queries_to_run:
+        while self._num_queries_submitted.value < self._num_queries_to_run:
           if ready_to_unlock:
             assert query_sumbission_is_locked, "Query submission not yet locked"
-            assert not self._num_queries_running, "Queries are still running"
+            assert not self._num_queries_active, "Queries are still running"
             LOG.debug("Resuming query submission")
             self._next_leak_check_unix_time.value = int(
                 time() + 60 * self.leak_check_interval_mins)
@@ -613,7 +617,7 @@ class StressRunner(object):
               self.leak_check_interval_mins and
               time() > self._next_leak_check_unix_time.value
           ):
-            assert self._num_queries_running <= len(self._query_runners), \
+            assert self._num_queries_active <= len(self._query_runners), \
                 "Each running query should belong to a runner"
             LOG.debug("Stopping query submission")
             self._submit_query_lock.acquire()
@@ -638,7 +642,7 @@ class StressRunner(object):
             max_actual = -1
           self._set_mem_usage_values(max_reported, max_actual)
 
-          if query_sumbission_is_locked and not self._num_queries_running:
+          if query_sumbission_is_locked and not self._num_queries_active:
             if ready_to_unlock is None:
               ready_to_unlock = False
             else:
@@ -670,17 +674,33 @@ class StressRunner(object):
         self._max_mem_mb_usage.value = actual
 
   @property
-  def _num_queries_running(self):
-    num_running = self._num_queries_started.value - self._num_queries_finished.value
+  def _num_queries_active(self):
+    """The number of queries that are currently active (i.e. submitted to a query runner
+    and haven't yet completed)."""
+    num_running = self._num_queries_submitted.value - self._num_queries_finished.value
     assert num_running >= 0, "The number of running queries is negative"
     return num_running
 
+  @property
+  def _num_queries_executing(self):
+    """The number of queries that are currently executing (i.e. entered the RUNNING state
+    and haven't yet completed)."""
+    num_executing = (self._num_queries_started_running_or_cancelled.value -
+        self._num_queries_finished.value)
+    assert num_executing >= 0, "The number of executing queries is negative"
+    return num_executing
+
+  def increment_num_queries_started_running_or_cancelled(self):
+    """Called by query runner to increment _num_queries_started_running_or_cancelled."""
+    increment(self._num_queries_started_running_or_cancelled)
+
+
   def _start_single_runner(self, impalad):
     """Consumer function to take a query of the queue and run it. This is intended to
     run in a separate process so validating the result set can use a full CPU.
     """
     LOG.debug("New query runner started")
-    runner = QueryRunner()
+    runner = QueryRunner(self)
     runner.impalad = impalad
     runner.results_dir = self.results_dir
     runner.use_kerberos = self.use_kerberos
@@ -714,7 +734,7 @@ class StressRunner(object):
         solo_runtime = query.solo_runtime_secs_with_spilling
 
       LOG.debug("Waiting for other query runners to start their queries")
-      while query_idx > self._num_queries_started.value:
+      while query_idx > self._num_queries_submitted.value:
         sleep(0.1)
 
       self._mem_mb_needed_for_next_query.value = mem_limit
@@ -723,13 +743,13 @@ class StressRunner(object):
       with self._mem_broker.reserve_mem_mb(mem_limit) as reservation_id:
         LOG.debug("Received memory reservation")
         with self._submit_query_lock:
-          increment(self._num_queries_started)
+          increment(self._num_queries_submitted)
         should_cancel = self.cancel_probability > random()
         if should_cancel:
           timeout = randrange(1, max(int(solo_runtime), 2))
         else:
           timeout = solo_runtime * max(
-              10, self._num_queries_started.value - self._num_queries_finished.value)
+              10, self._num_queries_submitted.value - self._num_queries_finished.value)
         report = runner.run_query(query, timeout, mem_limit, should_cancel=should_cancel)
         LOG.debug("Got execution report for query")
         if report.timed_out and should_cancel:
@@ -811,8 +831,10 @@ class StressRunner(object):
     print(status_format % (
         # Done
         self._num_queries_finished.value,
-        # Running
-        self._num_queries_started.value - self._num_queries_finished.value,
+        # Active
+        self._num_queries_active,
+        # Executing
+        self._num_queries_executing,
         # Mem Lmt Ex
         self._num_queries_exceeded_mem_limit.value,
         # AC Rejected
@@ -1010,7 +1032,11 @@ class QueryRunner(object):
   SPILLED_PATTERNS = [re.compile("ExecOption:.*Spilled"), re.compile("SpilledRuns: [^0]")]
   BATCH_SIZE = 1024
 
-  def __init__(self):
+  def __init__(self, stress_runner=None):
+    """Creates a new instance. The caller must fill in the below fields. stress_runner
+    must be provided if this is running in the context of a stress run, so that statistics
+    can be updated."""
+    self.stress_runner = stress_runner
     self.impalad = None
     self.impalad_conn = None
     self.use_kerberos = False
@@ -1043,29 +1069,7 @@ class QueryRunner(object):
     try:
       with self.impalad_conn.cursor() as cursor:
         start_time = time()
-        if query.db_name:
-          LOG.debug("Using %s database", query.db_name)
-          cursor.execute("USE %s" % query.db_name)
-        if run_set_up and query.set_up_sql:
-          LOG.debug("Running set up query:\n%s", self.set_up_sql)
-          cursor.execute(query.set_up_sql)
-        for query_option, value in self.common_query_options.iteritems():
-          cursor.execute(
-              "SET {query_option}={value}".format(query_option=query_option, value=value))
-        for query_option, value in query.options.iteritems():
-          cursor.execute(
-              "SET {query_option}={value}".format(query_option=query_option, value=value))
-        cursor.execute("SET ABORT_ON_ERROR=1")
-        if self.test_admission_control:
-          LOG.debug(
-              "Running query without mem limit at %s with timeout secs %s:\n%s",
-              self.impalad.host_name, timeout_secs, query.sql)
-        else:
-          LOG.debug("Setting mem limit to %s MB", mem_limit_mb)
-          cursor.execute("SET MEM_LIMIT=%sM" % mem_limit_mb)
-          LOG.debug(
-              "Running query with %s MB mem limit at %s with timeout secs %s:\n%s",
-              mem_limit_mb, self.impalad.host_name, timeout_secs, query.sql)
+        self._set_db_and_options(cursor, query, run_set_up, mem_limit_mb, timeout_secs)
         error = None
         try:
           cursor.execute_async(
@@ -1074,19 +1078,10 @@ class QueryRunner(object):
           report.query_id = op_handle_to_query_id(cursor._last_operation.handle if
                                                   cursor._last_operation else None)
           LOG.debug("Query id is %s", report.query_id)
-          sleep_secs = 0.1
-          secs_since_log = 0
-          while cursor.is_executing():
-            if time() > timeout_unix_time:
-              if not should_cancel:
-                fetch_and_set_profile(cursor, report)
-              self._cancel(cursor, report)
-              return report
-            if secs_since_log > 5:
-              secs_since_log = 0
-              LOG.debug("Waiting for query to execute")
-            sleep(sleep_secs)
-            secs_since_log += sleep_secs
+          if not self._wait_until_fetchable(cursor, report, timeout_unix_time,
+                                            should_cancel):
+            return report
+
           if query.query_type == QueryType.SELECT:
             try:
               report.result_hash = self._hash_result(cursor, timeout_unix_time, query)
@@ -1118,6 +1113,70 @@ class QueryRunner(object):
       report.other_error = error
     return report
 
+  def _set_db_and_options(self, cursor, query, run_set_up, mem_limit_mb, timeout_secs):
+    """Set up a new cursor for running a query by switching to the correct database and
+    setting query options."""
+    if query.db_name:
+      LOG.debug("Using %s database", query.db_name)
+      cursor.execute("USE %s" % query.db_name)
+    if run_set_up and query.set_up_sql:
+      LOG.debug("Running set up query:\n%s", query.set_up_sql)
+      cursor.execute(query.set_up_sql)
+    for query_option, value in self.common_query_options.iteritems():
+      cursor.execute(
+          "SET {query_option}={value}".format(query_option=query_option, value=value))
+    for query_option, value in query.options.iteritems():
+      cursor.execute(
+          "SET {query_option}={value}".format(query_option=query_option, value=value))
+    cursor.execute("SET ABORT_ON_ERROR=1")
+    if self.test_admission_control:
+      LOG.debug(
+          "Running query without mem limit at %s with timeout secs %s:\n%s",
+          self.impalad.host_name, timeout_secs, query.sql)
+    else:
+      LOG.debug("Setting mem limit to %s MB", mem_limit_mb)
+      cursor.execute("SET MEM_LIMIT=%sM" % mem_limit_mb)
+      LOG.debug(
+          "Running query with %s MB mem limit at %s with timeout secs %s:\n%s",
+          mem_limit_mb, self.impalad.host_name, timeout_secs, query.sql)
+
+  def _wait_until_fetchable(self, cursor, report, timeout_unix_time, should_cancel):
+    """Wait up until timeout_unix_time until the query results can be fetched (if it's
+    a SELECT query) or until it has finished executing (if it's a different query type
+    like DML). If the timeout expires we either cancel the query or report the timeout.
+    Return True in the first case or False in the second (timeout) case."""
+    # Loop until the query gets to the right state or a timeout expires.
+    sleep_secs = 0.1
+    secs_since_log = 0
+    # True if we incremented num_queries_started_running_or_cancelled for this query.
+    started_running_or_cancelled = False
+    while True:
+      query_state = cursor.status()
+      # Check if the query got past the PENDING/INITIALIZED states, either because
+      # it's executing or hit an error.
+      if (not started_running_or_cancelled and query_state not in ('PENDING_STATE',
+                                                      'INITIALIZED_STATE')):
+        started_running_or_cancelled = True
+        if self.stress_runner:
+          self.stress_runner.increment_num_queries_started_running_or_cancelled()
+      # Return if we're ready to fetch results (in the FINISHED state) or we are in
+      # another terminal state like EXCEPTION.
+      if query_state not in ('PENDING_STATE', 'INITIALIZED_STATE', 'RUNNING_STATE'):
+        return True
+
+      if time() > timeout_unix_time:
+        if not should_cancel:
+          fetch_and_set_profile(cursor, report)
+        self._cancel(cursor, report)
+        if not started_running_or_cancelled and self.stress_runner:
+          self.stress_runner.increment_num_queries_started_running_or_cancelled()
+        return False
+      if secs_since_log > 5:
+        secs_since_log = 0
+        LOG.debug("Waiting for query to execute")
+      sleep(sleep_secs)
+      secs_since_log += sleep_secs
+
   def _cancel(self, cursor, report):
     report.timed_out = True
 


[4/6] impala git commit: IMPALA-7671: Fix broken SHOW GRANT USER ON Posted by bh...@apache.org.
IMPALA-7671: Fix broken SHOW GRANT USER ON <object>

This patch fixes the broken SHOW GRANT USER ON <object> that always
shows an empty result due to incorrect comparison between TPrivilege for
the filter vs TPrivilege for the actual privilege that should not
consider the "grantoption".

Testing:
- Added new E2E tests
- Ran all FE tests
- Ran all authorization E2E tests

Change-Id: I7adc403caddd18e5a954cf6affd5d1d555b9f5f0
Reviewed-on: http://gerrit.cloudera.org:8080/11598
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e5c502e4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e5c502e4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e5c502e4

Branch: refs/heads/master
Commit: e5c502e4e428bd1cd5b04f06d72eba8fba61e918
Parents: 81c58d5
Author: Fredy Wijaya <fw...@cloudera.com>
Authored: Fri Oct 5 12:13:44 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Sat Oct 6 07:25:49 2018 +0000

----------------------------------------------------------------------
 .../apache/impala/catalog/AuthorizationPolicy.java  |  3 +++
 .../queries/QueryTest/show_grant_user.test          | 16 ++++++++++++++++
 tests/common/impala_test_suite.py                   |  6 ++++--
 3 files changed, 23 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e5c502e4/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java b/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
index e06818c..4819079 100644
--- a/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
+++ b/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
@@ -491,7 +491,10 @@ public class AuthorizationPolicy implements PrivilegeCache {
    * Check if the filter matches the privilege.
    */
   private boolean isPrivilegeFiltered(TPrivilege filter, TPrivilege privilege) {
+    // Set the filter with privilege level and has grant option from the given privilege
+    // since those two fields don't matter for the filter.
     filter.setPrivilege_level(privilege.getPrivilege_level());
+    filter.setHas_grant_opt(privilege.isHas_grant_opt());
     String privName = PrincipalPrivilege.buildPrivilegeName(filter);
     return !privName.equalsIgnoreCase(PrincipalPrivilege.buildPrivilegeName(privilege));
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/e5c502e4/testdata/workloads/functional-query/queries/QueryTest/show_grant_user.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show_grant_user.test b/testdata/workloads/functional-query/queries/QueryTest/show_grant_user.test
index 8dd86fe..55ba28f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show_grant_user.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show_grant_user.test
@@ -1,4 +1,11 @@
 ====
+---- QUERY
+show grant user $USER on database $DATABASE
+---- RESULTS
+'USER','$USER','database','$DATABASE','','','','owner',true,regex:.+
+---- TYPES
+STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, BOOLEAN, STRING
+====
 ---- USER
 does_not_exist
 ---- QUERY
@@ -134,6 +141,15 @@ show grant user user2_shared2
 ---- TYPES
 STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, BOOLEAN, STRING
 ====
+---- USER
+user_1group
+---- QUERY
+show grant user user_1group on table $DATABASE.user_1group_tbl
+---- RESULTS
+'USER','user_1group','table','$DATABASE','user_1group_tbl','','','owner',true,regex:.+
+---- TYPES
+STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, BOOLEAN, STRING
+====
 ---- QUERY
 create role sgu_test_role1_group1;
 grant role sgu_test_role1_group1 to group group_1;

http://git-wip-us.apache.org/repos/asf/impala/blob/e5c502e4/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 979bff5..0f18dea 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -373,11 +373,13 @@ class ImpalaTestSuite(BaseTestSuite):
           .replace('$GROUP_NAME', group_name)
           .replace('$IMPALA_HOME', IMPALA_HOME)
           .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX)
-          .replace('$SECONDARY_FILESYSTEM', os.getenv("SECONDARY_FILESYSTEM") or str()))
+          .replace('$SECONDARY_FILESYSTEM', os.getenv("SECONDARY_FILESYSTEM") or str())
+          .replace('$USER', getuser()))
       if use_db: query = query.replace('$DATABASE', use_db)
 
       reserved_keywords = ["$DATABASE", "$FILESYSTEM_PREFIX", "$GROUP_NAME",
-                           "$IMPALA_HOME", "$NAMENODE", "$QUERY", "$SECONDARY_FILESYSTEM"]
+                           "$IMPALA_HOME", "$NAMENODE", "$QUERY", "$SECONDARY_FILESYSTEM",
+                           "$USER"]
 
       if test_file_vars:
         for key, value in test_file_vars.iteritems():


[5/6] impala git commit: IMPALA-7644: Hide Parquet page index writing with feature flag

Posted by bh...@apache.org.
IMPALA-7644: Hide Parquet page index writing with feature flag

This commit adds the command line flag enable_parquet_page_index_writing
to the Impala daemon that switches Impala's ability of writing the
Parquet page index. By default the flag is false, i.e. Impala doesn't
write the page index.

This flag is only temporary, we plan to remove it once Impala is able to
read the page index and has better testing around it.

Because of this change I had to move test_parquet_page_index.py to the
custom_cluset test suite since I need to set this command line flag
in order to test the functionality. I also merged most of the test cases
because we don't want to restart the cluster too many times.

Change-Id: If9994882aa59cbaf3ae464100caa8211598287bc
Reviewed-on: http://gerrit.cloudera.org:8080/11563
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/843683ed
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/843683ed
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/843683ed

Branch: refs/heads/master
Commit: 843683ed6c2ef41c7c25e9fa4af68801dbdd1a78
Parents: e5c502e
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
Authored: Tue Oct 2 14:11:58 2018 +0200
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon Oct 8 13:27:49 2018 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc                   |   6 +
 be/src/exec/hdfs-parquet-table-writer.cc        | 100 +++--
 .../queries/QueryTest/stats-extrapolation.test  |  14 +-
 tests/custom_cluster/test_parquet_page_index.py | 371 ++++++++++++++++++
 tests/query_test/test_parquet_page_index.py     | 372 -------------------
 5 files changed, 446 insertions(+), 417 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/843683ed/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 2ea1ca5..ac76b53 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -239,6 +239,12 @@ DEFINE_double_hidden(invalidate_tables_fraction_on_memory_pressure, 0.1,
     "The fraction of tables to invalidate when CatalogdTableInvalidator considers the "
     "old GC generation to be almost full.");
 
+DEFINE_bool_hidden(enable_parquet_page_index_writing_debug_only, false, "If true, Impala "
+    "will write the Parquet page index. It is not advised to use it in a production "
+    "environment, only for testing and development. This flag is meant to be temporary. "
+    "We plan to remove this flag once Impala is able to read the page index and has "
+    "better test coverage around it.");
+
 // ++========================++
 // || Startup flag graveyard ||
 // ++========================++

http://git-wip-us.apache.org/repos/asf/impala/blob/843683ed/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 8aa4f7a..13137e5 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -83,6 +83,8 @@ using namespace apache::thrift;
 // the columns and run that function over row batches.
 // TODO: we need to pass in the compression from the FE/metadata
 
+DECLARE_bool(enable_parquet_page_index_writing_debug_only);
+
 namespace impala {
 
 // Base class for column writers. This contains most of the logic except for
@@ -205,6 +207,58 @@ class HdfsParquetTableWriter::BaseColumnWriter {
  protected:
   friend class HdfsParquetTableWriter;
 
+  Status AddMemoryConsumptionForPageIndex(int64_t new_memory_allocation) {
+    if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) {
+      return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_,
+          "Failed to allocate memory for Parquet page index.", new_memory_allocation);
+    }
+    page_index_memory_consumption_ += new_memory_allocation;
+    return Status::OK();
+  }
+
+  Status ReserveOffsetIndex(int64_t capacity) {
+    if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
+    RETURN_IF_ERROR(
+        AddMemoryConsumptionForPageIndex(capacity * sizeof(parquet::PageLocation)));
+    offset_index_.page_locations.reserve(capacity);
+    return Status::OK();
+  }
+
+  void AddLocationToOffsetIndex(const parquet::PageLocation& location) {
+    if (!FLAGS_enable_parquet_page_index_writing_debug_only) return;
+    offset_index_.page_locations.push_back(location);
+  }
+
+  Status AddPageStatsToColumnIndex() {
+    if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
+    parquet::Statistics page_stats;
+    page_stats_base_->EncodeToThrift(&page_stats);
+    // If pages_stats contains min_value and max_value, then append them to min_values_
+    // and max_values_ and also mark the page as not null. In case min and max values are
+    // not set, push empty strings to maintain the consistency of the index and mark the
+    // page as null. Always push the null_count.
+    string min_val;
+    string max_val;
+    if ((page_stats.__isset.min_value) && (page_stats.__isset.max_value)) {
+      Status s_min = TruncateDown(page_stats.min_value, PAGE_INDEX_MAX_STRING_LENGTH,
+          &min_val);
+      Status s_max = TruncateUp(page_stats.max_value, PAGE_INDEX_MAX_STRING_LENGTH,
+          &max_val);
+      if (!s_min.ok() || !s_max.ok()) valid_column_index_ = false;
+      column_index_.null_pages.push_back(false);
+    } else {
+      DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value);
+      column_index_.null_pages.push_back(true);
+      DCHECK_EQ(page_stats.null_count, num_values_);
+    }
+    RETURN_IF_ERROR(
+        AddMemoryConsumptionForPageIndex(min_val.capacity() + max_val.capacity()));
+    column_index_.min_values.emplace_back(std::move(min_val));
+    column_index_.max_values.emplace_back(std::move(max_val));
+    column_index_.null_counts.push_back(page_stats.null_count);
+    return Status::OK();
+  }
+
   // Encodes value into the current page output buffer and updates the column statistics
   // aggregates. Returns true if the value was appended successfully to the current page.
   // Returns false if the value was not appended to the current page and the caller can
@@ -645,11 +699,10 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
 
   *first_data_page = *file_pos;
   int64_t current_row_group_index = 0;
-  offset_index_.page_locations.resize(num_data_pages_);
+  RETURN_IF_ERROR(ReserveOffsetIndex(num_data_pages_));
 
   // Write data pages
-  for (int i = 0; i < num_data_pages_; ++i) {
-    DataPage& page = pages_[i];
+  for (const DataPage& page : pages_) {
     parquet::PageLocation location;
 
     if (page.header.data_page_header.num_values == 0) {
@@ -657,7 +710,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
       location.offset = -1;
       location.compressed_page_size = 0;
       location.first_row_index = -1;
-      offset_index_.page_locations[i] = location;
+      AddLocationToOffsetIndex(location);
       continue;
     }
 
@@ -677,7 +730,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
     // its name suggests. On the other hand, parquet::PageLocation::compressed_page_size
     // also includes the size of the page header.
     location.compressed_page_size = page.header.compressed_page_size + len;
-    offset_index_.page_locations[i] = location;
+    AddLocationToOffsetIndex(location);
 
     // Write the page data
     RETURN_IF_ERROR(parent_->Write(page.data, page.header.compressed_page_size));
@@ -754,37 +807,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
   }
 
   DCHECK(page_stats_base_ != nullptr);
-  parquet::Statistics page_stats;
-  page_stats_base_->EncodeToThrift(&page_stats);
-  {
-    // If pages_stats contains min_value and max_value, then append them to min_values_
-    // and max_values_ and also mark the page as not null. In case min and max values are
-    // not set, push empty strings to maintain the consistency of the index and mark the
-    // page as null. Always push the null_count.
-    string min_val;
-    string max_val;
-    if ((page_stats.__isset.min_value) && (page_stats.__isset.max_value)) {
-      Status s_min = TruncateDown(page_stats.min_value, PAGE_INDEX_MAX_STRING_LENGTH,
-          &min_val);
-      Status s_max = TruncateUp(page_stats.max_value, PAGE_INDEX_MAX_STRING_LENGTH,
-          &max_val);
-      if (!s_min.ok() || !s_max.ok()) valid_column_index_ = false;
-      column_index_.null_pages.push_back(false);
-    } else {
-      DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value);
-      column_index_.null_pages.push_back(true);
-      DCHECK_EQ(page_stats.null_count, num_values_);
-    }
-    int64_t new_memory_allocation = min_val.capacity() + max_val.capacity();
-    if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) {
-      return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_,
-          "Failed to allocate memory for Parquet page index.", new_memory_allocation);
-    }
-    page_index_memory_consumption_ += new_memory_allocation;
-    column_index_.min_values.emplace_back(std::move(min_val));
-    column_index_.max_values.emplace_back(std::move(max_val));
-    column_index_.null_counts.push_back(page_stats.null_count);
-  }
+  RETURN_IF_ERROR(AddPageStatsToColumnIndex());
 
   // Update row group statistics from page statistics.
   DCHECK(row_group_stats_base_ != nullptr);
@@ -1137,6 +1160,7 @@ Status HdfsParquetTableWriter::Finalize() {
 
   RETURN_IF_ERROR(FlushCurrentRowGroup());
   RETURN_IF_ERROR(WritePageIndex());
+  for (auto& column : columns_) column->Reset();
   RETURN_IF_ERROR(WriteFileFooter());
   stats_.__set_parquet_stats(parquet_insert_stats_);
   COUNTER_ADD(parent_->rows_inserted_counter(), row_count_);
@@ -1249,6 +1273,8 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
 }
 
 Status HdfsParquetTableWriter::WritePageIndex() {
+  if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
+
   // Currently Impala only write Parquet files with a single row group. The current
   // page index logic depends on this behavior as it only keeps one row group's
   // statistics in memory.
@@ -1284,8 +1310,6 @@ Status HdfsParquetTableWriter::WritePageIndex() {
     row_group->columns[i].__set_offset_index_length(len);
     file_pos_ += len;
   }
-  // Reset column writers.
-  for (auto& column : columns_) column->Reset();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/843683ed/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
index 8e95168..3b25427 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
@@ -33,17 +33,17 @@ show table stats alltypes
 YEAR, MONTH, #ROWS, EXTRAP #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
 ---- RESULTS
 '2009','1',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=1'
-'2009','2',-1,289,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
-'2009','3',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3'
+'2009','2',-1,288,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
+'2009','3',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3'
 '2009','4',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=4'
-'2009','5',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5'
+'2009','5',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5'
 '2009','6',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=6'
-'2009','7',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7'
-'2009','8',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8'
+'2009','7',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7'
+'2009','8',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8'
 '2009','9',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=9'
-'2009','10',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10'
+'2009','10',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10'
 '2009','11',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=11'
-'2009','12',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12'
+'2009','12',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12'
 'Total','',3650,3650,12,regex:.*B,'0B','','','',''
 ---- TYPES
 STRING,STRING,BIGINT,BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING

http://git-wip-us.apache.org/repos/asf/impala/blob/843683ed/tests/custom_cluster/test_parquet_page_index.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_parquet_page_index.py b/tests/custom_cluster/test_parquet_page_index.py
new file mode 100644
index 0000000..0d2a750
--- /dev/null
+++ b/tests/custom_cluster/test_parquet_page_index.py
@@ -0,0 +1,371 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Targeted Impala insert tests
+
+import os
+
+from collections import namedtuple
+from subprocess import check_call
+from parquet.ttypes import BoundaryOrder, ColumnIndex, OffsetIndex, PageHeader, PageType
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.skip import SkipIfLocal
+from tests.util.filesystem_utils import get_fs_path
+from tests.util.get_parquet_metadata import (
+    decode_stats_value,
+    get_parquet_metadata,
+    read_serialized_object
+)
+
+PAGE_INDEX_MAX_STRING_LENGTH = 64
+
+
+@SkipIfLocal.parquet_file_size
+class TestHdfsParquetTableIndexWriter(CustomClusterTestSuite):
+  """Since PARQUET-922 page statistics can be written before the footer.
+  The tests in this class checks if Impala writes the page indices correctly.
+  It is temporarily a custom cluster test suite because we need to set the
+  enable_parquet_page_index_writing command-line flag for the Impala daemon
+  in order to make it write the page index.
+  TODO: IMPALA-5843 Once Impala is able to read the page index and also write it by
+  default, this test suite should be moved back to query tests.
+  """
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(CustomClusterTestSuite, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'parquet')
+
+  def _get_row_group_from_file(self, parquet_file):
+    """Returns namedtuples that contain the schema, stats, offset_index, column_index,
+    and page_headers for each column in the first row group in file 'parquet_file'. Fails
+    if the file contains multiple row groups.
+    """
+    ColumnInfo = namedtuple('ColumnInfo', ['schema', 'stats', 'offset_index',
+        'column_index', 'page_headers'])
+
+    file_meta_data = get_parquet_metadata(parquet_file)
+    assert len(file_meta_data.row_groups) == 1
+    # We only support flat schemas, the additional element is the root element.
+    schemas = file_meta_data.schema[1:]
+    row_group = file_meta_data.row_groups[0]
+    assert len(schemas) == len(row_group.columns)
+    row_group_index = []
+    with open(parquet_file) as file_handle:
+      for column, schema in zip(row_group.columns, schemas):
+        column_index_offset = column.column_index_offset
+        column_index_length = column.column_index_length
+        column_index = None
+        if column_index_offset and column_index_length:
+          column_index = read_serialized_object(ColumnIndex, file_handle,
+                                                column_index_offset, column_index_length)
+        column_meta_data = column.meta_data
+        stats = None
+        if column_meta_data:
+          stats = column_meta_data.statistics
+
+        offset_index_offset = column.offset_index_offset
+        offset_index_length = column.offset_index_length
+        offset_index = None
+        page_headers = []
+        if offset_index_offset and offset_index_length:
+          offset_index = read_serialized_object(OffsetIndex, file_handle,
+                                                offset_index_offset, offset_index_length)
+          for page_loc in offset_index.page_locations:
+            page_header = read_serialized_object(PageHeader, file_handle, page_loc.offset,
+                                                 page_loc.compressed_page_size)
+            page_headers.append(page_header)
+
+        column_info = ColumnInfo(schema, stats, offset_index, column_index, page_headers)
+        row_group_index.append(column_info)
+    return row_group_index
+
+  def _get_row_groups_from_hdfs_folder(self, hdfs_path, tmpdir):
+    """Returns a list of column infos (containing the schema, stats, offset_index,
+    column_index, and page_headers) for the first row group in all parquet files in
+    'hdfs_path'.
+    """
+    row_group_indexes = []
+    check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
+    for root, subdirs, files in os.walk(tmpdir.strpath):
+      for f in files:
+        parquet_file = os.path.join(root, str(f))
+        row_group_indexes.append(self._get_row_group_from_file(parquet_file))
+    return row_group_indexes
+
+  def _validate_page_locations(self, page_locations):
+    """Validate that the page locations are in order."""
+    for previous_loc, current_loc in zip(page_locations[:-1], page_locations[1:]):
+      assert previous_loc.offset < current_loc.offset
+      assert previous_loc.first_row_index < current_loc.first_row_index
+
+  def _validate_null_stats(self, index_size, column_info):
+    """Validates the statistics stored in null_pages and null_counts."""
+    column_index = column_info.column_index
+    column_stats = column_info.stats
+    assert column_index.null_pages is not None
+    assert len(column_index.null_pages) == index_size
+    assert column_index.null_counts is not None
+    assert len(column_index.null_counts) == index_size
+
+    for page_is_null, null_count, page_header in zip(column_index.null_pages,
+        column_index.null_counts, column_info.page_headers):
+      assert page_header.type == PageType.DATA_PAGE
+      num_values = page_header.data_page_header.num_values
+      assert not page_is_null or null_count == num_values
+
+    if column_stats:
+      assert column_stats.null_count == sum(column_index.null_counts)
+
+  def _validate_min_max_values(self, index_size, column_info):
+    """Validate min/max values of the pages in a column chunk."""
+    column_index = column_info.column_index
+    min_values = column_info.column_index.min_values
+    assert len(min_values) == index_size
+    max_values = column_info.column_index.max_values
+    assert len(max_values) == index_size
+
+    if not column_info.stats:
+      return
+
+    column_min_value_str = column_info.stats.min_value
+    column_max_value_str = column_info.stats.max_value
+    if column_min_value_str is None or column_max_value_str is None:
+      # If either is None, then both need to be None.
+      assert column_min_value_str is None and column_max_value_str is None
+      # No min and max value, all pages need to be null
+      for idx, null_page in enumerate(column_index.null_pages):
+        assert null_page, "Page {} of column {} is not null, \
+            but doesn't have min and max values!".format(idx, column_index.schema.name)
+      # Everything is None, no further checks needed.
+      return
+
+    column_min_value = decode_stats_value(column_info.schema, column_min_value_str)
+    for null_page, page_min_str in zip(column_index.null_pages, min_values):
+      if not null_page:
+        page_min_value = decode_stats_value(column_info.schema, page_min_str)
+        # If type is str, page_min_value might have been truncated.
+        if isinstance(page_min_value, basestring):
+          assert page_min_value >= column_min_value[:len(page_min_value)]
+        else:
+          assert page_min_value >= column_min_value
+
+    column_max_value = decode_stats_value(column_info.schema, column_max_value_str)
+    for null_page, page_max_str in zip(column_index.null_pages, max_values):
+      if not null_page:
+        page_max_value = decode_stats_value(column_info.schema, page_max_str)
+        # If type is str, page_max_value might have been truncated and incremented.
+        if (isinstance(page_max_value, basestring) and
+            len(page_max_value) == PAGE_INDEX_MAX_STRING_LENGTH):
+          max_val_prefix = page_max_value.rstrip('\0')
+          assert max_val_prefix[:-1] <= column_max_value
+        else:
+          assert page_max_value <= column_max_value
+
+  def _validate_ordering(self, ordering, schema, null_pages, min_values, max_values):
+    """Check if the ordering of the values reflects the value of 'ordering'."""
+
+    def is_sorted(l, reverse=False):
+      if not reverse:
+        return all(a <= b for a, b in zip(l, l[1:]))
+      else:
+        return all(a >= b for a, b in zip(l, l[1:]))
+
+    # Filter out null pages and decode the actual min/max values.
+    actual_min_values = [decode_stats_value(schema, min_val)
+                         for min_val, is_null in zip(min_values, null_pages)
+                         if not is_null]
+    actual_max_values = [decode_stats_value(schema, max_val)
+                         for max_val, is_null in zip(max_values, null_pages)
+                         if not is_null]
+
+    # For ASCENDING and DESCENDING, both min and max values need to be sorted.
+    if ordering == BoundaryOrder.ASCENDING:
+      assert is_sorted(actual_min_values)
+      assert is_sorted(actual_max_values)
+    elif ordering == BoundaryOrder.DESCENDING:
+      assert is_sorted(actual_min_values, reverse=True)
+      assert is_sorted(actual_max_values, reverse=True)
+    else:
+      assert ordering == BoundaryOrder.UNORDERED
+      # For UNORDERED, min and max values cannot be both sorted.
+      assert not is_sorted(actual_min_values) or not is_sorted(actual_max_values)
+      assert (not is_sorted(actual_min_values, reverse=True) or
+              not is_sorted(actual_max_values, reverse=True))
+
+  def _validate_boundary_order(self, column_info):
+    """Validate that min/max values are really in the order specified by
+    boundary order.
+    """
+    column_index = column_info.column_index
+    self._validate_ordering(column_index.boundary_order, column_info.schema,
+        column_index.null_pages, column_index.min_values, column_index.max_values)
+
+  def _validate_parquet_page_index(self, hdfs_path, tmpdir):
+    """Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup
+    index in that file is in the valid format.
+    """
+    row_group_indexes = self._get_row_groups_from_hdfs_folder(hdfs_path, tmpdir)
+    for columns in row_group_indexes:
+      for column_info in columns:
+        try:
+          index_size = len(column_info.offset_index.page_locations)
+          assert index_size > 0
+          self._validate_page_locations(column_info.offset_index.page_locations)
+          # IMPALA-7304: Impala doesn't write column index for floating-point columns
+          # until PARQUET-1222 is resolved.
+          if column_info.schema.type in [4, 5]:
+            assert column_info.column_index is None
+            continue
+          self._validate_null_stats(index_size, column_info)
+          self._validate_min_max_values(index_size, column_info)
+          self._validate_boundary_order(column_info)
+        except AssertionError as e:
+          e.args += ("Validation failed on column {}.".format(column_info.schema.name),)
+          raise
+
+  def _ctas_table_and_verify_index(self, vector, unique_database, source_table,
+                                   tmpdir, sorting_column=None):
+    """Copies 'source_table' into a parquet table and makes sure that the index
+    in the resulting parquet file is valid.
+    """
+    table_name = "test_hdfs_parquet_table_writer"
+    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
+    hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
+                                                                 table_name))
+    # Setting num_nodes = 1 ensures that the query is executed on the coordinator,
+    # resulting in a single parquet file being written.
+    vector.get_value('exec_option')['num_nodes'] = 1
+    self.execute_query("drop table if exists {0}".format(qualified_table_name))
+    if sorting_column is None:
+      query = ("create table {0} stored as parquet as select * from {1}").format(
+          qualified_table_name, source_table)
+    else:
+      query = ("create table {0} sort by({1}) stored as parquet as select * from {2}"
+               ).format(qualified_table_name, sorting_column, source_table)
+    self.execute_query(query, vector.get_value('exec_option'))
+    self._validate_parquet_page_index(hdfs_path, tmpdir.join(source_table))
+
+  def _create_string_table_with_values(self, vector, unique_database, table_name,
+                                       values_sql):
+    """Creates a parquet table that has a single string column, then invokes an insert
+    statement on it with the 'values_sql' parameter. E.g. 'values_sql' is "('asdf')".
+    It returns the HDFS path for the table.
+    """
+    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
+    self.execute_query("drop table if exists {0}".format(qualified_table_name))
+    vector.get_value('exec_option')['num_nodes'] = 1
+    query = ("create table {0} (str string) stored as parquet").format(
+        qualified_table_name)
+    self.execute_query(query, vector.get_value('exec_option'))
+    self.execute_query("insert into {0} values {1}".format(qualified_table_name,
+        values_sql), vector.get_value('exec_option'))
+    return get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
+        table_name))
+
+  @CustomClusterTestSuite.with_args("--enable_parquet_page_index_writing_debug_only")
+  def test_ctas_tables(self, vector, unique_database, tmpdir):
+    """Test different Parquet files created via CTAS statements."""
+
+    # Test that writing a parquet file populates the rowgroup indexes with the correct
+    # values.
+    self._ctas_table_and_verify_index(vector, unique_database, "functional.alltypes",
+        tmpdir)
+
+    # Test that writing a parquet file populates the rowgroup indexes with the correct
+    # values, using decimal types.
+    self._ctas_table_and_verify_index(vector, unique_database, "functional.decimal_tbl",
+        tmpdir)
+
+    # Test that writing a parquet file populates the rowgroup indexes with the correct
+    # values, using char types.
+    self._ctas_table_and_verify_index(vector, unique_database, "functional.chars_formats",
+        tmpdir)
+
+    # Test that we don't write min/max values in the index for null columns.
+    # Ensure null_count is set for columns with null values.
+    self._ctas_table_and_verify_index(vector, unique_database, "functional.nulltable",
+        tmpdir)
+
+    # Test that when a ColumnChunk is written across multiple pages, the index is
+    # valid.
+    self._ctas_table_and_verify_index(vector, unique_database, "tpch.customer",
+        tmpdir)
+    self._ctas_table_and_verify_index(vector, unique_database, "tpch.orders",
+        tmpdir)
+
+    # Test that when the schema has a sorting column, the index is valid.
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.zipcode_incomes", tmpdir, "id")
+
+    # Test table with wide row.
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.widerow", tmpdir)
+
+    # Test tables with wide rows and many columns.
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.widetable_250_cols", tmpdir)
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.widetable_500_cols", tmpdir)
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.widetable_1000_cols", tmpdir)
+
+  @CustomClusterTestSuite.with_args("--enable_parquet_page_index_writing_debug_only")
+  def test_max_string_values(self, vector, unique_database, tmpdir):
+    """Test string values that are all 0xFFs or end with 0xFFs."""
+
+    # String value is all of 0xFFs but its length is less than PAGE_INDEX_TRUNCATE_LENGTH.
+    short_tbl = "short_tbl"
+    short_hdfs_path = self._create_string_table_with_values(vector, unique_database,
+        short_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH - 1))
+    self._validate_parquet_page_index(short_hdfs_path, tmpdir.join(short_tbl))
+
+    # String value is all of 0xFFs and its length is PAGE_INDEX_TRUNCATE_LENGTH.
+    fit_tbl = "fit_tbl"
+    fit_hdfs_path = self._create_string_table_with_values(vector, unique_database,
+        fit_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH))
+    self._validate_parquet_page_index(fit_hdfs_path, tmpdir.join(fit_tbl))
+
+    # All bytes are 0xFFs and the string is longer then PAGE_INDEX_TRUNCATE_LENGTH, so we
+    # should not write page statistics.
+    too_long_tbl = "too_long_tbl"
+    too_long_hdfs_path = self._create_string_table_with_values(vector, unique_database,
+        too_long_tbl, "(rpad('', {0}, chr(255)))".format(
+            PAGE_INDEX_MAX_STRING_LENGTH + 1))
+    row_group_indexes = self._get_row_groups_from_hdfs_folder(too_long_hdfs_path,
+        tmpdir.join(too_long_tbl))
+    column = row_group_indexes[0][0]
+    assert column.column_index is None
+    # We always write the offset index
+    assert column.offset_index is not None
+
+    # Test string with value that starts with 'aaa' following with 0xFFs and its length is
+    # greater than PAGE_INDEX_TRUNCATE_LENGTH. Max value should be 'aab'.
+    aaa_tbl = "aaa_tbl"
+    aaa_hdfs_path = self._create_string_table_with_values(vector, unique_database,
+        aaa_tbl, "(rpad('aaa', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
+    row_group_indexes = self._get_row_groups_from_hdfs_folder(aaa_hdfs_path,
+        tmpdir.join(aaa_tbl))
+    column = row_group_indexes[0][0]
+    assert len(column.column_index.max_values) == 1
+    max_value = column.column_index.max_values[0]
+    assert max_value == 'aab'

http://git-wip-us.apache.org/repos/asf/impala/blob/843683ed/tests/query_test/test_parquet_page_index.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_parquet_page_index.py b/tests/query_test/test_parquet_page_index.py
deleted file mode 100644
index 6235819..0000000
--- a/tests/query_test/test_parquet_page_index.py
+++ /dev/null
@@ -1,372 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Targeted Impala insert tests
-
-import os
-
-from collections import namedtuple
-from subprocess import check_call
-from parquet.ttypes import BoundaryOrder, ColumnIndex, OffsetIndex, PageHeader, PageType
-
-from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIfLocal
-from tests.util.filesystem_utils import get_fs_path
-from tests.util.get_parquet_metadata import (
-    decode_stats_value,
-    get_parquet_metadata,
-    read_serialized_object
-)
-
-PAGE_INDEX_MAX_STRING_LENGTH = 64
-
-
-@SkipIfLocal.parquet_file_size
-class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
-  """Since PARQUET-922 page statistics can be written before the footer.
-  The tests in this class checks if Impala writes the page indices correctly.
-  """
-  @classmethod
-  def get_workload(cls):
-    return 'functional-query'
-
-  @classmethod
-  def add_test_dimensions(cls):
-    super(TestHdfsParquetTableIndexWriter, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.add_constraint(
-        lambda v: v.get_value('table_format').file_format == 'parquet')
-
-  def _get_row_group_from_file(self, parquet_file):
-    """Returns namedtuples that contain the schema, stats, offset_index, column_index,
-    and page_headers for each column in the first row group in file 'parquet_file'. Fails
-    if the file contains multiple row groups.
-    """
-    ColumnInfo = namedtuple('ColumnInfo', ['schema', 'stats', 'offset_index',
-        'column_index', 'page_headers'])
-
-    file_meta_data = get_parquet_metadata(parquet_file)
-    assert len(file_meta_data.row_groups) == 1
-    # We only support flat schemas, the additional element is the root element.
-    schemas = file_meta_data.schema[1:]
-    row_group = file_meta_data.row_groups[0]
-    assert len(schemas) == len(row_group.columns)
-    row_group_index = []
-    with open(parquet_file) as file_handle:
-      for column, schema in zip(row_group.columns, schemas):
-        column_index_offset = column.column_index_offset
-        column_index_length = column.column_index_length
-        column_index = None
-        if column_index_offset and column_index_length:
-          column_index = read_serialized_object(ColumnIndex, file_handle,
-                                                column_index_offset, column_index_length)
-        column_meta_data = column.meta_data
-        stats = None
-        if column_meta_data:
-          stats = column_meta_data.statistics
-
-        offset_index_offset = column.offset_index_offset
-        offset_index_length = column.offset_index_length
-        offset_index = None
-        page_headers = []
-        if offset_index_offset and offset_index_length:
-          offset_index = read_serialized_object(OffsetIndex, file_handle,
-                                                offset_index_offset, offset_index_length)
-          for page_loc in offset_index.page_locations:
-            page_header = read_serialized_object(PageHeader, file_handle, page_loc.offset,
-                                                 page_loc.compressed_page_size)
-            page_headers.append(page_header)
-
-        column_info = ColumnInfo(schema, stats, offset_index, column_index, page_headers)
-        row_group_index.append(column_info)
-    return row_group_index
-
-  def _get_row_groups_from_hdfs_folder(self, hdfs_path, tmpdir):
-    """Returns a list of column infos (containing the schema, stats, offset_index,
-    column_index, and page_headers) for the first row group in all parquet files in
-    'hdfs_path'.
-    """
-    row_group_indexes = []
-    check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
-    for root, subdirs, files in os.walk(tmpdir.strpath):
-      for f in files:
-        parquet_file = os.path.join(root, str(f))
-        row_group_indexes.append(self._get_row_group_from_file(parquet_file))
-    return row_group_indexes
-
-  def _validate_page_locations(self, page_locations):
-    """Validate that the page locations are in order."""
-    for previous_loc, current_loc in zip(page_locations[:-1], page_locations[1:]):
-      assert previous_loc.offset < current_loc.offset
-      assert previous_loc.first_row_index < current_loc.first_row_index
-
-  def _validate_null_stats(self, index_size, column_info):
-    """Validates the statistics stored in null_pages and null_counts."""
-    column_index = column_info.column_index
-    column_stats = column_info.stats
-    assert column_index.null_pages is not None
-    assert len(column_index.null_pages) == index_size
-    assert column_index.null_counts is not None
-    assert len(column_index.null_counts) == index_size
-
-    for page_is_null, null_count, page_header in zip(column_index.null_pages,
-        column_index.null_counts, column_info.page_headers):
-      assert page_header.type == PageType.DATA_PAGE
-      num_values = page_header.data_page_header.num_values
-      assert not page_is_null or null_count == num_values
-
-    if column_stats:
-      assert column_stats.null_count == sum(column_index.null_counts)
-
-  def _validate_min_max_values(self, index_size, column_info):
-    """Validate min/max values of the pages in a column chunk."""
-    column_index = column_info.column_index
-    min_values = column_info.column_index.min_values
-    assert len(min_values) == index_size
-    max_values = column_info.column_index.max_values
-    assert len(max_values) == index_size
-
-    if not column_info.stats:
-      return
-
-    column_min_value_str = column_info.stats.min_value
-    column_max_value_str = column_info.stats.max_value
-    if column_min_value_str is None or column_max_value_str is None:
-      # If either is None, then both need to be None.
-      assert column_min_value_str is None and column_max_value_str is None
-      # No min and max value, all pages need to be null
-      for idx, null_page in enumerate(column_index.null_pages):
-        assert null_page, "Page {} of column {} is not null, \
-            but doesn't have min and max values!".format(idx, column_index.schema.name)
-      # Everything is None, no further checks needed.
-      return
-
-    column_min_value = decode_stats_value(column_info.schema, column_min_value_str)
-    for null_page, page_min_str in zip(column_index.null_pages, min_values):
-      if not null_page:
-        page_min_value = decode_stats_value(column_info.schema, page_min_str)
-        # If type is str, page_min_value might have been truncated.
-        if isinstance(page_min_value, basestring):
-          assert page_min_value >= column_min_value[:len(page_min_value)]
-        else:
-          assert page_min_value >= column_min_value
-
-    column_max_value = decode_stats_value(column_info.schema, column_max_value_str)
-    for null_page, page_max_str in zip(column_index.null_pages, max_values):
-      if not null_page:
-        page_max_value = decode_stats_value(column_info.schema, page_max_str)
-        # If type is str, page_max_value might have been truncated and incremented.
-        if (isinstance(page_max_value, basestring) and
-            len(page_max_value) == PAGE_INDEX_MAX_STRING_LENGTH):
-          max_val_prefix = page_max_value.rstrip('\0')
-          assert max_val_prefix[:-1] <= column_max_value
-        else:
-          assert page_max_value <= column_max_value
-
-  def _validate_ordering(self, ordering, schema, null_pages, min_values, max_values):
-    """Check if the ordering of the values reflects the value of 'ordering'."""
-
-    def is_sorted(l, reverse=False):
-      if not reverse:
-        return all(a <= b for a, b in zip(l, l[1:]))
-      else:
-        return all(a >= b for a, b in zip(l, l[1:]))
-
-    # Filter out null pages and decode the actual min/max values.
-    actual_min_values = [decode_stats_value(schema, min_val)
-                         for min_val, is_null in zip(min_values, null_pages)
-                         if not is_null]
-    actual_max_values = [decode_stats_value(schema, max_val)
-                         for max_val, is_null in zip(max_values, null_pages)
-                         if not is_null]
-
-    # For ASCENDING and DESCENDING, both min and max values need to be sorted.
-    if ordering == BoundaryOrder.ASCENDING:
-      assert is_sorted(actual_min_values)
-      assert is_sorted(actual_max_values)
-    elif ordering == BoundaryOrder.DESCENDING:
-      assert is_sorted(actual_min_values, reverse=True)
-      assert is_sorted(actual_max_values, reverse=True)
-    else:
-      assert ordering == BoundaryOrder.UNORDERED
-      # For UNORDERED, min and max values cannot be both sorted.
-      assert not is_sorted(actual_min_values) or not is_sorted(actual_max_values)
-      assert (not is_sorted(actual_min_values, reverse=True) or
-              not is_sorted(actual_max_values, reverse=True))
-
-  def _validate_boundary_order(self, column_info):
-    """Validate that min/max values are really in the order specified by
-    boundary order.
-    """
-    column_index = column_info.column_index
-    self._validate_ordering(column_index.boundary_order, column_info.schema,
-        column_index.null_pages, column_index.min_values, column_index.max_values)
-
-  def _validate_parquet_page_index(self, hdfs_path, tmpdir):
-    """Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup
-    index in that file is in the valid format.
-    """
-    row_group_indexes = self._get_row_groups_from_hdfs_folder(hdfs_path, tmpdir)
-    for columns in row_group_indexes:
-      for column_info in columns:
-        try:
-          index_size = len(column_info.offset_index.page_locations)
-          assert index_size > 0
-          self._validate_page_locations(column_info.offset_index.page_locations)
-          # IMPALA-7304: Impala doesn't write column index for floating-point columns
-          # until PARQUET-1222 is resolved.
-          if column_info.schema.type in [4, 5]:
-            assert column_info.column_index is None
-            continue
-          self._validate_null_stats(index_size, column_info)
-          self._validate_min_max_values(index_size, column_info)
-          self._validate_boundary_order(column_info)
-        except AssertionError as e:
-          e.args += ("Validation failed on column {}.".format(column_info.schema.name),)
-          raise
-
-  def _ctas_table_and_verify_index(self, vector, unique_database, source_table,
-                                   tmpdir, sorting_column=None):
-    """Copies 'source_table' into a parquet table and makes sure that the index
-    in the resulting parquet file is valid.
-    """
-    table_name = "test_hdfs_parquet_table_writer"
-    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
-    hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
-                                                                 table_name))
-    # Setting num_nodes = 1 ensures that the query is executed on the coordinator,
-    # resulting in a single parquet file being written.
-    vector.get_value('exec_option')['num_nodes'] = 1
-    self.execute_query("drop table if exists {0}".format(qualified_table_name))
-    if sorting_column is None:
-      query = ("create table {0} stored as parquet as select * from {1}").format(
-          qualified_table_name, source_table)
-    else:
-      query = ("create table {0} sort by({1}) stored as parquet as select * from {2}"
-               ).format(qualified_table_name, sorting_column, source_table)
-    self.execute_query(query, vector.get_value('exec_option'))
-    self._validate_parquet_page_index(hdfs_path, tmpdir.join(source_table))
-
-  def _create_string_table_with_values(self, vector, unique_database, table_name,
-                                       values_sql):
-    """Creates a parquet table that has a single string column, then invokes an insert
-    statement on it with the 'values_sql' parameter. E.g. 'values_sql' is "('asdf')".
-    It returns the HDFS path for the table.
-    """
-    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
-    self.execute_query("drop table if exists {0}".format(qualified_table_name))
-    vector.get_value('exec_option')['num_nodes'] = 1
-    query = ("create table {0} (str string) stored as parquet").format(qualified_table_name)
-    self.execute_query(query, vector.get_value('exec_option'))
-    self.execute_query("insert into {0} values {1}".format(qualified_table_name,
-        values_sql), vector.get_value('exec_option'))
-    return get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
-        table_name))
-
-  def test_write_index_alltypes(self, vector, unique_database, tmpdir):
-    """Test that writing a parquet file populates the rowgroup indexes with the correct
-    values.
-    """
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.alltypes",
-        tmpdir)
-
-  def test_write_index_decimals(self, vector, unique_database, tmpdir):
-    """Test that writing a parquet file populates the rowgroup indexes with the correct
-    values, using decimal types.
-    """
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.decimal_tbl",
-        tmpdir)
-
-  def test_write_index_chars(self, vector, unique_database, tmpdir):
-    """Test that writing a parquet file populates the rowgroup indexes with the correct
-    values, using char types.
-    """
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.chars_formats",
-        tmpdir)
-
-  def test_write_index_null(self, vector, unique_database, tmpdir):
-    """Test that we don't write min/max values in the index for null columns.
-    Ensure null_count is set for columns with null values.
-    """
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.nulltable",
-        tmpdir)
-
-  def test_write_index_multi_page(self, vector, unique_database, tmpdir):
-    """Test that when a ColumnChunk is written across multiple pages, the index is
-    valid.
-    """
-    self._ctas_table_and_verify_index(vector, unique_database, "tpch.customer",
-        tmpdir)
-    self._ctas_table_and_verify_index(vector, unique_database, "tpch.orders",
-        tmpdir)
-
-  def test_write_index_sorting_column(self, vector, unique_database, tmpdir):
-    """Test that when the schema has a sorting column, the index is valid."""
-    self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.zipcode_incomes", tmpdir, "id")
-
-  def test_write_index_wide_table(self, vector, unique_database, tmpdir):
-    """Test table with wide row."""
-    self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.widerow", tmpdir)
-
-  def test_write_index_many_columns_tables(self, vector, unique_database, tmpdir):
-    """Test tables with wide rows and many columns."""
-    self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.widetable_250_cols", tmpdir)
-    self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.widetable_500_cols", tmpdir)
-    self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.widetable_1000_cols", tmpdir)
-
-  def test_max_string_values(self, vector, unique_database, tmpdir):
-    """Test string values that are all 0xFFs or end with 0xFFs."""
-
-    # String value is all of 0xFFs but its length is less than PAGE_INDEX_TRUNCATE_LENGTH.
-    short_tbl = "short_tbl"
-    short_hdfs_path = self._create_string_table_with_values(vector, unique_database,
-        short_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH - 1))
-    self._validate_parquet_page_index(short_hdfs_path, tmpdir.join(short_tbl))
-
-    # String value is all of 0xFFs and its length is PAGE_INDEX_TRUNCATE_LENGTH.
-    fit_tbl = "fit_tbl"
-    fit_hdfs_path = self._create_string_table_with_values(vector, unique_database,
-        fit_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH))
-    self._validate_parquet_page_index(fit_hdfs_path, tmpdir.join(fit_tbl))
-
-    # All bytes are 0xFFs and the string is longer then PAGE_INDEX_TRUNCATE_LENGTH, so we
-    # should not write page statistics.
-    too_long_tbl = "too_long_tbl"
-    too_long_hdfs_path = self._create_string_table_with_values(vector, unique_database,
-        too_long_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
-    row_group_indexes = self._get_row_groups_from_hdfs_folder(too_long_hdfs_path,
-        tmpdir.join(too_long_tbl))
-    column = row_group_indexes[0][0]
-    assert column.column_index is None
-    # We always write the offset index
-    assert column.offset_index is not None
-
-    # Test string with value that starts with 'aaa' following with 0xFFs and its length is
-    # greater than PAGE_INDEX_TRUNCATE_LENGTH. Max value should be 'aab'.
-    aaa_tbl = "aaa_tbl"
-    aaa_hdfs_path = self._create_string_table_with_values(vector, unique_database,
-        aaa_tbl, "(rpad('aaa', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
-    row_group_indexes = self._get_row_groups_from_hdfs_folder(aaa_hdfs_path,
-        tmpdir.join(aaa_tbl))
-    column = row_group_indexes[0][0]
-    assert len(column.column_index.max_values) == 1
-    max_value = column.column_index.max_values[0]
-    assert max_value == 'aab'


[2/6] impala git commit: IMPALA-2063 Remove newline characters in query status.

Posted by bh...@apache.org.
IMPALA-2063 Remove newline characters in query status.

Remove extraneous whitespace at the end of strings being added to
profiles. Remove any duplicated newline characters within strings
as well.  (The latter step is necessary to allow for a blanket
assertion on this in testing.)

Change-Id: I2bbd7d7fe2c6d0f3799d0e6b336710bccfef0ab1
Reviewed-on: http://gerrit.cloudera.org:8080/11425
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d3db3266
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d3db3266
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d3db3266

Branch: refs/heads/master
Commit: d3db32660537e55948f3132f778d257d078b9197
Parents: f8b2eb5
Author: Michal Ostrowski <mo...@cloudera.com>
Authored: Tue Sep 11 09:23:01 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 5 22:30:44 2018 +0000

----------------------------------------------------------------------
 be/src/util/runtime-profile.cc        | 19 ++++++++++++-------
 be/src/util/runtime-profile.h         |  3 ++-
 tests/query_test/test_cancellation.py | 15 +++++++++++++++
 3 files changed, 29 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d3db3266/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 51ab2fc..dd83ee6 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -27,6 +27,7 @@
 #include <boost/thread/thread.hpp>
 
 #include "common/object-pool.h"
+#include "gutil/strings/strip.h"
 #include "rpc/thrift-util.h"
 #include "util/coding-util.h"
 #include "util/compress.h"
@@ -292,7 +293,7 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx)
       DCHECK(it != info_strings.end());
       InfoStrings::iterator existing = info_strings_.find(key);
       if (existing == info_strings_.end()) {
-        info_strings_.insert(make_pair(key, it->second));
+        info_strings_.emplace(key, it->second);
         info_strings_display_order_.push_back(key);
       } else {
         info_strings_[key] = it->second;
@@ -534,19 +535,23 @@ void RuntimeProfile::AppendInfoString(const string& key, const string& value) {
   return AddInfoStringInternal(key, value, true);
 }
 
-void RuntimeProfile::AddInfoStringInternal(
-    const string& key, const string& value, bool append, bool redact) {
-  const string& info = redact ? RedactCopy(value): value;
+void RuntimeProfile::AddInfoStringInternal(const string& key, string value,
+    bool append, bool redact) {
+
+  if (redact) Redact(&value);
+
+  StripTrailingWhitespace(&value);
+
   lock_guard<SpinLock> l(info_strings_lock_);
   InfoStrings::iterator it = info_strings_.find(key);
   if (it == info_strings_.end()) {
-    info_strings_.insert(make_pair(key, info));
+    info_strings_.emplace(key, std::move(value));
     info_strings_display_order_.push_back(key);
   } else {
     if (append) {
-      it->second += ", " + value;
+      it->second += ", " + std::move(value);
     } else {
-      it->second = info;
+      it->second = std::move(value);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/d3db3266/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index a6b06ba..e0048d2 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -479,8 +479,9 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// Implementation of AddInfoString() and AppendInfoString(). If 'append' is false,
   /// implements AddInfoString(), otherwise implements AppendInfoString().
   /// Redaction rules are applied on the info string if 'redact' is true.
+  /// Trailing whitspace is removed.
   void AddInfoStringInternal(
-      const std::string& key, const std::string& value, bool append, bool redact = false);
+      const std::string& key, std::string value, bool append, bool redact = false);
 
   /// Name of the counter maintaining the total time.
   static const std::string TOTAL_TIME_COUNTER_NAME;

http://git-wip-us.apache.org/repos/asf/impala/blob/d3db3266/tests/query_test/test_cancellation.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py
index ecf3126..0958b69 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -152,12 +152,16 @@ class TestCancellation(ImpalaTestSuite):
 
       def fetch_results():
         threading.current_thread().fetch_results_error = None
+        threading.current_thread().query_profile = None
         try:
           new_client = self.create_impala_client()
           new_client.fetch(query, handle)
         except ImpalaBeeswaxException as e:
           threading.current_thread().fetch_results_error = e
 
+        threading.current_thread().query_profile = \
+          self.impalad_test_service.get_thrift_profile(handle.get_handle().id)
+
       thread = threading.Thread(target=fetch_results)
       thread.start()
 
@@ -179,6 +183,17 @@ class TestCancellation(ImpalaTestSuite):
       # Before accessing fetch_results_error we need to join the fetch thread
       thread.join()
 
+      # IMPALA-2063 Cancellation tests may generate profile text that is otherwise hard
+      # to reproduce for testing mis-formatting.
+      profile = thread.query_profile
+      if profile:
+        for (k, v) in profile.nodes[1].info_strings.iteritems():
+          assert v == v.rstrip(), \
+            "Mis-formatted profile text: %s %s" % (k, v)
+          # "Plan" text may be strangely formatted.
+          assert k == 'Plan' or '\n\n' not in v, \
+            "Mis-formatted profile text: %s %s" % (k, v)
+
       if thread.fetch_results_error is None:
         # If the fetch rpc didn't result in CANCELLED (and auto-close the query) then
         # the close rpc should have succeeded.


[6/6] impala git commit: IMPALA-7626: Throttle catalog partial RPC requests

Posted by bh...@apache.org.
IMPALA-7626: Throttle catalog partial RPC requests

With more coordinators running in local catalog mode, the expected RPC
traffic on the Catalog server is higher compared to the non-local-catalog
mode. Each such RPC is handled in its own thread and consumes some
non-trivial CPU for serializing and deserializing the metadata.

With this change, the maximum number of threads performing the actual work are
capped to a certain limit at any point and the remaining requests (if any)
are blocked until the current requests are serviced or they they exceed
the configured timeout and abort. Adds the following parameters for controlling
this behavior.

--catalog_max_parallel_partial_fetch_rpc
--catalog_partial_fetch_rpc_queue_timeout_s

--catalog_partial_fetch_rpc_queue_timeout_s controls the timeout for queued
requests.

Added some basic supportability to examine the queue length via metrics.

Added a unit test to make sure the concurrent requests for this RPC
method does not exceed the configured value.

Change-Id: I11f77a16cfa38ada42d8b7c859850198ea7dd142
Reviewed-on: http://gerrit.cloudera.org:8080/11561
Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d48ffc2d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d48ffc2d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d48ffc2d

Branch: refs/heads/master
Commit: d48ffc2d45b2a9d4b9c730bba5677d3096311a25
Parents: 843683e
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Sun Sep 30 23:20:34 2018 -0700
Committer: Bharath Vissapragada <bh...@cloudera.com>
Committed: Mon Oct 8 17:09:40 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc                | 43 +++++++--
 be/src/catalog/catalog-server.h                 | 10 +++
 be/src/catalog/catalog.cc                       |  5 ++
 be/src/catalog/catalog.h                        |  5 ++
 be/src/util/backend-gflag-util.cc               |  6 ++
 common/thrift/BackendGflags.thrift              |  4 +
 common/thrift/JniCatalog.thrift                 |  6 ++
 common/thrift/metrics.json                      | 10 +++
 .../impala/catalog/CatalogServiceCatalog.java   | 65 +++++++++++++-
 .../apache/impala/service/BackendConfig.java    |  8 ++
 .../org/apache/impala/service/JniCatalog.java   |  9 ++
 .../impala/catalog/PartialCatalogInfoTest.java  | 93 +++++++++++++++++++-
 .../testutil/CatalogServiceTestCatalog.java     |  3 +-
 13 files changed, 255 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index d4ce559..c4ee301 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -62,6 +62,14 @@ DEFINE_validator(catalog_topic_mode, [](const char* name, const string& val) {
   return false;
 });
 
+DEFINE_int32_hidden(catalog_max_parallel_partial_fetch_rpc, 32, "Maximum number of "
+    "partial catalog object fetch RPCs that can run in parallel. Applicable only when "
+    "local catalog mode is configured.");
+
+DEFINE_int64_hidden(catalog_partial_fetch_rpc_queue_timeout_s, LLONG_MAX, "Maximum time "
+    "(in seconds) a partial catalog object fetch RPC spends in the queue waiting "
+    "to run. Must be set to a value greater than zero.");
+
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_subscriber_port);
 DECLARE_int32(state_store_port);
@@ -73,6 +81,9 @@ string CatalogServer::IMPALA_CATALOG_TOPIC = "catalog-update";
 const string CATALOG_SERVER_TOPIC_PROCESSING_TIMES =
     "catalog-server.topic-processing-time-s";
 
+const string CATALOG_SERVER_PARTIAL_FETCH_RPC_QUEUE_LEN =
+    "catalog.partial-fetch-rpc.queue-len";
+
 const string CATALOG_WEB_PAGE = "/catalog";
 const string CATALOG_TEMPLATE = "catalog.tmpl";
 const string CATALOG_OBJECT_WEB_PAGE = "/catalog_object";
@@ -80,6 +91,8 @@ const string CATALOG_OBJECT_TEMPLATE = "catalog_object.tmpl";
 const string TABLE_METRICS_WEB_PAGE = "/table_metrics";
 const string TABLE_METRICS_TEMPLATE = "table_metrics.tmpl";
 
+const int REFRESH_METRICS_INTERVAL_MS = 1000;
+
 // Implementation for the CatalogService thrift interface.
 class CatalogServiceThriftIf : public CatalogServiceIf {
  public:
@@ -218,6 +231,8 @@ CatalogServer::CatalogServer(MetricGroup* metrics)
     catalog_objects_max_version_(0L) {
   topic_processing_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
       CATALOG_SERVER_TOPIC_PROCESSING_TIMES);
+  partial_fetch_rpc_queue_len_metric_ =
+      metrics->AddGauge(CATALOG_SERVER_PARTIAL_FETCH_RPC_QUEUE_LEN, 0);
 }
 
 Status CatalogServer::Start() {
@@ -230,13 +245,11 @@ Status CatalogServer::Start() {
 
   // This will trigger a full Catalog metadata load.
   catalog_.reset(new Catalog());
-  Status status = Thread::Create("catalog-server", "catalog-update-gathering-thread",
+  RETURN_IF_ERROR(Thread::Create("catalog-server", "catalog-update-gathering-thread",
       &CatalogServer::GatherCatalogUpdatesThread, this,
-      &catalog_update_gathering_thread_);
-  if (!status.ok()) {
-    status.AddDetail("CatalogService failed to start");
-    return status;
-  }
+      &catalog_update_gathering_thread_));
+  RETURN_IF_ERROR(Thread::Create("catalog-server", "catalog-metrics-refresh-thread",
+      &CatalogServer::RefreshMetrics, this, &catalog_metrics_refresh_thread_));
 
   statestore_subscriber_.reset(new StatestoreSubscriber(
      Substitute("catalog-server@$0", TNetworkAddressToString(server_address)),
@@ -249,7 +262,7 @@ Status CatalogServer::Start() {
   // prefix of any key. This saves a bit of network communication from the statestore
   // back to the catalog.
   string filter_prefix = "!";
-  status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC,
+  Status status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC,
       /* is_transient=*/ false, /* populate_min_subscriber_topic_version=*/ false,
       filter_prefix, cb);
   if (!status.ok()) {
@@ -328,7 +341,7 @@ void CatalogServer::UpdateCatalogTopicCallback(
 }
 
 [[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() {
-  while (1) {
+  while (true) {
     unique_lock<mutex> unique_lock(catalog_lock_);
     // Protect against spurious wake-ups by checking the value of topic_updates_ready_.
     // It is only safe to continue on and update the shared pending_topic_updates_
@@ -366,6 +379,20 @@ void CatalogServer::UpdateCatalogTopicCallback(
   }
 }
 
+[[noreturn]] void CatalogServer::RefreshMetrics() {
+  while (true) {
+    SleepForMs(REFRESH_METRICS_INTERVAL_MS);
+    TGetCatalogServerMetricsResponse response;
+    Status status = catalog_->GetCatalogServerMetrics(&response);
+    if (!status.ok()) {
+      LOG(ERROR) << "Error refreshing catalog metrics: " << status.GetDetail();
+      continue;
+    }
+    partial_fetch_rpc_queue_len_metric_->SetValue(
+        response.catalog_partial_fetch_rpc_queue_len);
+  }
+}
+
 void CatalogServer::CatalogUrlCallback(const Webserver::ArgumentMap& args,
     Document* document) {
   GetCatalogUsage(document);

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 1df83a3..76ec2be 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -90,9 +90,15 @@ class CatalogServer {
   /// Metric that tracks the amount of time taken preparing a catalog update.
   StatsMetric<double>* topic_processing_time_metric_;
 
+  /// Tracks the partial fetch RPC call queue length on the Catalog server.
+  IntGauge* partial_fetch_rpc_queue_len_metric_;
+
   /// Thread that polls the catalog for any updates.
   std::unique_ptr<Thread> catalog_update_gathering_thread_;
 
+  /// Thread that periodically wakes up and refreshes certain Catalog metrics.
+  std::unique_ptr<Thread> catalog_metrics_refresh_thread_;
+
   /// Protects catalog_update_cv_, pending_topic_updates_,
   /// catalog_objects_to/from_version_, and last_sent_catalog_version.
   boost::mutex catalog_lock_;
@@ -143,6 +149,9 @@ class CatalogServer {
   /// Also, explicitly releases free memory back to the OS after each complete iteration.
   [[noreturn]] void GatherCatalogUpdatesThread();
 
+  /// Executed by the catalog_metrics_refresh_thread_. Refreshes certain catalog metrics.
+  [[noreturn]] void RefreshMetrics();
+
   /// Example output:
   /// "databases": [
   ///         {
@@ -200,6 +209,7 @@ class CatalogServer {
   /// <host>:25020/table_metrics?name=foo.bar
   void TableMetricsUrlCallback(const Webserver::ArgumentMap& args,
       rapidjson::Document* document);
+
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index a8873b7..a2d5f4b 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -67,6 +67,7 @@ Catalog::Catalog() {
     {"getCatalogDelta", "([B)[B", &get_catalog_delta_id_},
     {"getCatalogUsage", "()[B", &get_catalog_usage_id_},
     {"getCatalogVersion", "()J", &get_catalog_version_id_},
+    {"getCatalogServerMetrics", "()[B", &get_catalog_server_metrics_},
     {"prioritizeLoad", "([B)V", &prioritize_load_id_},
     {"getPartitionStats", "([B)[B", &get_partition_stats_id_},
     {"updateTableUsage", "([B)V", &update_table_usage_id_},
@@ -158,6 +159,10 @@ Status Catalog::GetCatalogUsage(TGetCatalogUsageResponse* response) {
   return JniUtil::CallJniMethod(catalog_, get_catalog_usage_id_, response);
 }
 
+Status Catalog::GetCatalogServerMetrics(TGetCatalogServerMetricsResponse* response) {
+  return JniUtil::CallJniMethod(catalog_, get_catalog_server_metrics_, response);
+}
+
 Status Catalog::GetFunctions(const TGetFunctionsRequest& request,
     TGetFunctionsResponse *response) {
   return JniUtil::CallJniMethod(catalog_, get_functions_id_, request, response);

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/be/src/catalog/catalog.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index 65390a4..c703bcb 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -100,6 +100,10 @@ class Catalog {
   Status GetTableMetrics(const std::string& db, const std::string& tbl,
       std::string* metrics);
 
+  /// Returns the Catalog server metrics in the response object. Refer to
+  /// TGetCatalogServerMetricsResponse definition for details.
+  Status GetCatalogServerMetrics(TGetCatalogServerMetricsResponse* response);
+
   /// Returns the current catalog usage that includes the most frequently accessed
   /// tables as well as the tables with the highest memory requirements.
   Status GetCatalogUsage(TGetCatalogUsageResponse* response);
@@ -139,6 +143,7 @@ class Catalog {
   jmethodID get_catalog_delta_id_;  // JniCatalog.getCatalogDelta()
   jmethodID get_catalog_version_id_;  // JniCatalog.getCatalogVersion()
   jmethodID get_catalog_usage_id_; // JniCatalog.getCatalogUsage()
+  jmethodID get_catalog_server_metrics_; // JniCatalog.getCatalogServerMetrics()
   jmethodID get_dbs_id_; // JniCatalog.getDbs()
   jmethodID get_table_names_id_; // JniCatalog.getTableNames()
   jmethodID get_table_metrics_id_; // JniCatalog.getTableMetrics()

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 09975a8..3760c84 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -65,6 +65,8 @@ DECLARE_double(invalidate_tables_fraction_on_memory_pressure);
 DECLARE_int32(local_catalog_max_fetch_retries);
 DECLARE_int64(kudu_scanner_thread_estimated_bytes_per_column);
 DECLARE_int64(kudu_scanner_thread_max_estimated_bytes);
+DECLARE_int32(catalog_max_parallel_partial_fetch_rpc);
+DECLARE_int64(catalog_partial_fetch_rpc_queue_timeout_s);
 
 namespace impala {
 
@@ -126,6 +128,10 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
       FLAGS_kudu_scanner_thread_estimated_bytes_per_column);
   cfg.__set_kudu_scanner_thread_max_estimated_bytes(
       FLAGS_kudu_scanner_thread_max_estimated_bytes);
+  cfg.__set_catalog_max_parallel_partial_fetch_rpc(
+      FLAGS_catalog_max_parallel_partial_fetch_rpc);
+  cfg.__set_catalog_partial_fetch_rpc_queue_timeout_s(
+      FLAGS_catalog_partial_fetch_rpc_queue_timeout_s);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 5f971c0..531fc2c 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -103,4 +103,8 @@ struct TBackendGflags {
   38: required i64 kudu_scanner_thread_estimated_bytes_per_column
 
   39: required i64 kudu_scanner_thread_max_estimated_bytes
+
+  40: required i32 catalog_max_parallel_partial_fetch_rpc
+
+  41: required i64 catalog_partial_fetch_rpc_queue_timeout_s
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/common/thrift/JniCatalog.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 3be5f70..b936773 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -732,3 +732,9 @@ struct TCommentOnParams {
   // Name of column to alter.
   4: optional TColumnName column_name
 }
+
+// Response to GetCatalogServerMetrics() call.
+struct TGetCatalogServerMetricsResponse {
+  // Partial fetch RPC queue length.
+  1: required i32 catalog_partial_fetch_rpc_queue_len
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 72fab16..ccc5481 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1890,5 +1890,15 @@
     "units": "TIME_NS",
     "kind": "COUNTER",
     "key": "catalog.cache.total-load-time"
+  },
+  {
+    "description": "RPC queue length for partial object fetches.",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "RPC queue length for partial object fetch requests.",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "catalog.partial-fetch-rpc.queue-len"
   }
 ]

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index ebc9b81..b3c714f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -235,6 +236,17 @@ public class CatalogServiceCatalog extends Catalog {
   };
   final TopicMode topicMode_;
 
+  private final long PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S = BackendConfig.INSTANCE
+      .getCatalogPartialFetchRpcQueueTimeoutS();
+
+  private final int MAX_PARALLEL_PARTIAL_FETCH_RPC_COUNT = BackendConfig.INSTANCE
+      .getCatalogMaxParallelPartialFetchRpc();
+
+  // Controls concurrent access to doGetPartialCatalogObject() call. Limits the number
+  // of parallel requests to --catalog_max_parallel_partial_fetch_rpc.
+  private final Semaphore partialObjectFetchAccess_ =
+      new Semaphore(MAX_PARALLEL_PARTIAL_FETCH_RPC_COUNT, /*fair =*/ true);
+
   /**
    * Initialize the CatalogServiceCatalog. If 'loadInBackground' is true, table metadata
    * will be loaded in the background. 'initialHmsCnxnTimeoutSec' specifies the time (in
@@ -270,6 +282,7 @@ public class CatalogServiceCatalog extends Catalog {
         BackendConfig.INSTANCE.getBackendCfg().catalog_topic_mode.toUpperCase());
     catalogdTableInvalidator_ = CatalogdTableInvalidator.create(this,
         BackendConfig.INSTANCE);
+    Preconditions.checkState(PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S > 0);
   }
 
   // Timeout for acquiring a table lock
@@ -387,6 +400,10 @@ public class CatalogServiceCatalog extends Catalog {
     }
   }
 
+  public int getPartialFetchRpcQueueLength() {
+    return partialObjectFetchAccess_.getQueueLength();
+  }
+
   /**
    * Adds a list of cache directive IDs for the given table name. Asynchronously
    * refreshes the table metadata once all cache directives complete.
@@ -2095,13 +2112,59 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * A wrapper around doGetPartialCatalogObject() that controls the number of concurrent
+   * invocations.
+   */
+  public TGetPartialCatalogObjectResponse getPartialCatalogObject(
+      TGetPartialCatalogObjectRequest req) throws CatalogException {
+    try {
+      if (!partialObjectFetchAccess_.tryAcquire(1,
+          PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S, TimeUnit.SECONDS)) {
+        // Timed out trying to acquire the semaphore permit.
+        throw new CatalogException("Timed out while fetching partial object metadata. " +
+            "Please check the metric 'catalog.partial-fetch-rpc.queue-len' for the " +
+            "current queue length and consider increasing " +
+            "'catalog_partial_fetch_rpc_queue_timeout_s' and/or " +
+            "'catalog_max_parallel_partial_fetch_rpc'");
+      }
+      // Acquired the permit at this point, should be released before we exit out of
+      // this method.
+      //
+      // Is there a chance that this thread can get interrupted at this point before it
+      // enters the try block, eventually leading to the semaphore permit not
+      // getting released? It can probably happen if the JVM is already in a bad shape.
+      // In the worst case, every permit is blocked and the subsequent requests throw
+      // the timeout exception and the user can monitor the queue metric to see that it
+      // is full, so the issue should be easy to diagnose.
+      // TODO: Figure out if such a race is possible.
+      try {
+        return doGetPartialCatalogObject(req);
+      } finally {
+        partialObjectFetchAccess_.release();
+      }
+    } catch (InterruptedException e) {
+      throw new CatalogException("Error running getPartialCatalogObject(): ", e);
+    }
+  }
+
+  /**
+   * Returns the number of currently running partial RPCs.
+   */
+  @VisibleForTesting
+  public int getConcurrentPartialRpcReqCount() {
+    // Calculated based on number of currently available semaphore permits.
+    return MAX_PARALLEL_PARTIAL_FETCH_RPC_COUNT - partialObjectFetchAccess_
+        .availablePermits();
+  }
+
+  /**
    * Return a partial view of information about a given catalog object. This services
    * the CatalogdMetaProvider running on impalads when they are configured in
    * "local-catalog" mode. If required objects are not present, for example, the database
    * from which a table is requested, the types of the missing objects will be set in the
    * response's lookup_status.
    */
-  public TGetPartialCatalogObjectResponse getPartialCatalogObject(
+  private TGetPartialCatalogObjectResponse doGetPartialCatalogObject(
       TGetPartialCatalogObjectRequest req) throws CatalogException {
     TCatalogObject objectDesc = Preconditions.checkNotNull(req.object_desc,
         "missing object_desc");

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/fe/src/main/java/org/apache/impala/service/BackendConfig.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 45c75d7..d8a9bc7 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -119,6 +119,14 @@ public class BackendConfig {
     return backendCfg_.local_catalog_max_fetch_retries;
   }
 
+  public int getCatalogMaxParallelPartialFetchRpc() {
+    return backendCfg_.catalog_max_parallel_partial_fetch_rpc;
+  }
+
+  public long getCatalogPartialFetchRpcQueueTimeoutS() {
+    return backendCfg_.catalog_partial_fetch_rpc_queue_timeout_s;
+  }
+
   // Inits the auth_to_local configuration in the static KerberosName class.
   private static void initAuthToLocal() {
     // If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index b393b55..2a709e8 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -46,6 +46,7 @@ import org.apache.impala.thrift.TErrorCode;
 import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TGetCatalogDeltaResponse;
 import org.apache.impala.thrift.TGetCatalogDeltaRequest;
+import org.apache.impala.thrift.TGetCatalogServerMetricsResponse;
 import org.apache.impala.thrift.TGetDbsParams;
 import org.apache.impala.thrift.TGetDbsResult;
 import org.apache.impala.thrift.TGetFunctionsRequest;
@@ -320,4 +321,12 @@ public class JniCatalog {
     JniUtil.deserializeThrift(protocolFactory_, thriftReq, req);
     catalog_.updateTableUsage(thriftReq);
   }
+
+  public byte[] getCatalogServerMetrics() throws ImpalaException, TException {
+    TGetCatalogServerMetricsResponse response = new TGetCatalogServerMetricsResponse();
+    response.setCatalog_partial_fetch_rpc_queue_len(
+        catalog_.getPartialFetchRpcQueueLength());
+    TSerializer serializer = new TSerializer(protocolFactory_);
+    return serializer.serialize(response);
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
index 6dba475..07d3309 100644
--- a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
@@ -23,9 +23,19 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import avro.shaded.com.google.common.collect.Lists;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.impala.common.InternalException;
+import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.testutil.CatalogServiceTestCatalog;
 import org.apache.impala.thrift.TCatalogInfoSelector;
 import org.apache.impala.thrift.TCatalogObject;
@@ -40,6 +50,8 @@ import org.apache.impala.thrift.TTableInfoSelector;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
@@ -48,10 +60,26 @@ public class PartialCatalogInfoTest {
   private static CatalogServiceCatalog catalog_ =
       CatalogServiceTestCatalog.create();
 
+  /**
+   * A Callable wrapper around getPartialCatalogObject() call.
+   */
+  private class CallableGetPartialCatalogObjectRequest
+      implements Callable<TGetPartialCatalogObjectResponse> {
+    private final TGetPartialCatalogObjectRequest request_;
+
+    CallableGetPartialCatalogObjectRequest(TGetPartialCatalogObjectRequest request) {
+      request_ = request;
+    }
+
+    @Override
+    public TGetPartialCatalogObjectResponse call() throws Exception {
+      return sendRequest(request_);
+    }
+  }
+
   private TGetPartialCatalogObjectResponse sendRequest(
       TGetPartialCatalogObjectRequest req)
       throws CatalogException, InternalException, TException {
-    System.err.println("req: " + req);
     TGetPartialCatalogObjectResponse resp;
     resp = catalog_.getPartialCatalogObject(req);
     // Round-trip the response through serialization, so if we accidentally forgot to
@@ -59,10 +87,26 @@ public class PartialCatalogInfoTest {
     byte[] respBytes = new TSerializer().serialize(resp);
     resp.clear();
     new TDeserializer().deserialize(resp, respBytes);
-    System.err.println("resp: " + resp);
     return resp;
   }
 
+  /**
+   * Sends the same 'request' from 'requestCount' threads in parallel and waits for
+   * them to finish.
+   */
+  private void sendParallelRequests(TGetPartialCatalogObjectRequest request, int
+      requestCount) throws Exception {
+    Preconditions.checkState(requestCount > 0);
+    final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(requestCount);
+    final List<Future<TGetPartialCatalogObjectResponse>> tasksToWaitFor =
+        Lists.newArrayList();
+    for (int i = 0; i < requestCount; ++i) {
+      tasksToWaitFor.add(threadPoolExecutor.submit(new
+          CallableGetPartialCatalogObjectRequest(request)));
+    }
+    for (Future task: tasksToWaitFor) task.get();
+  }
+
   @Test
   public void testDbList() throws Exception {
     TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
@@ -179,4 +223,49 @@ public class PartialCatalogInfoTest {
           tle.getMessage());
     }
   }
+
+  @Test
+  public void testConcurrentPartialObjectRequests() throws Exception {
+    // Create a request.
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.TABLE);
+    req.object_desc.table = new TTable("functional", "alltypes");
+    req.table_info_selector = new TTableInfoSelector();
+    req.table_info_selector.want_hms_table = true;
+    req.table_info_selector.want_partition_names = true;
+    req.table_info_selector.want_partition_metadata = true;
+
+    // Run 64 concurrent requests and run a tight loop in the background to make sure the
+    // concurrent request count never exceeds 32 (--catalog_partial_rpc_max_parallel_runs)
+    final AtomicBoolean requestsFinished = new AtomicBoolean(false);
+    final int maxParallelRuns = BackendConfig.INSTANCE
+        .getCatalogMaxParallelPartialFetchRpc();
+
+    // Uses a callable<Void> instead of Runnable because junit does not catch exceptions
+    // from threads other than the main thread. Callable here makes sure the exception
+    // is propagated to the main thread.
+    final Callable<Void> assertReqCount = new Callable() {
+      @Override
+      public Void call() throws Exception {
+        while (!requestsFinished.get()) {
+          int currentReqCount = catalog_.getConcurrentPartialRpcReqCount();
+          assertTrue("Invalid concurrent request count: " + currentReqCount,
+              currentReqCount <= maxParallelRuns);
+        }
+        return null;
+      }
+    };
+    Future assertThreadTask;
+    try {
+      // Assert the request count in a tight loop.
+      assertThreadTask = Executors.newSingleThreadExecutor().submit(assertReqCount);
+      sendParallelRequests(req, 64);
+    } finally {
+      requestsFinished.set(true);
+    }
+    // 5 minutes is a reasonable timeout for this test. If timed out, an exception is
+    // thrown.
+    assertThreadTask.get(5, TimeUnit.MINUTES);
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
index edad74e..3b61028 100644
--- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
@@ -19,10 +19,11 @@ package org.apache.impala.testutil;
 
 import org.apache.impala.authorization.SentryConfig;
 import org.apache.impala.catalog.AuthorizationPolicy;
-import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TUniqueId;
 
 /**