You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2018/03/29 03:40:25 UTC

[1/6] impala git commit: IMPALA-6759: align stress test memory estimation parse pattern

Repository: impala
Updated Branches:
  refs/heads/master 84e30700f -> 8091b2f46


IMPALA-6759: align stress test memory estimation parse pattern

The stress test never expected to see memory estimates on the order of
PB. Apparently it can happen with TPC DS 10000, so update the pattern.

It's not clear how to quickly write a test to catch this, because it
involves crossing language boundaries and possibly having a
massively-scaled dataset. I think leaving a comment in both places is
good enough for now.

Change-Id: I08976f261582b379696fd0e81bc060577e552309


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/25218487
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/25218487
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/25218487

Branch: refs/heads/master
Commit: 25218487533f6bf6959c32ff4ee38b77e0ab30b5
Parents: 84e3070
Author: Michael Brown <mi...@cloudera.com>
Authored: Wed Mar 28 15:14:20 2018 -0700
Committer: Michael Brown <mi...@cloudera.com>
Committed: Wed Mar 28 15:27:10 2018 -0700

----------------------------------------------------------------------
 fe/src/main/java/org/apache/impala/common/PrintUtils.java | 2 ++
 tests/stress/concurrent_select.py                         | 8 +++++---
 tests/util/parse_util.py                                  | 2 ++
 3 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/25218487/fe/src/main/java/org/apache/impala/common/PrintUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/PrintUtils.java b/fe/src/main/java/org/apache/impala/common/PrintUtils.java
index 77d77dd..9f75134 100644
--- a/fe/src/main/java/org/apache/impala/common/PrintUtils.java
+++ b/fe/src/main/java/org/apache/impala/common/PrintUtils.java
@@ -39,6 +39,8 @@ public class PrintUtils {
   public static String printBytes(long bytes) {
     double result = bytes;
     // Avoid String.format() due to IMPALA-1572 which happens on JDK7 but not JDK6.
+    // IMPALA-6759: Please update tests/stress/concurrent_select.py MEM_ESTIMATE_PATTERN
+    // if you add additional unit prefixes.
     if (bytes >= PETABYTE) return new DecimalFormat(".00PB").format(result / PETABYTE);
     if (bytes >= TERABYTE) return new DecimalFormat(".00TB").format(result / TERABYTE);
     if (bytes >= GIGABYTE) return new DecimalFormat(".00GB").format(result / GIGABYTE);

http://git-wip-us.apache.org/repos/asf/impala/blob/25218487/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index fa8541c..a4bffd9 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -98,8 +98,10 @@ MEM_LIMIT_EQ_THRESHOLD_PC = 0.975
 MEM_LIMIT_EQ_THRESHOLD_MB = 50
 
 # Regex to extract the estimated memory from an explain plan.
+# The unit prefixes can be found in
+# fe/src/main/java/org/apache/impala/common/PrintUtils.java
 MEM_ESTIMATE_PATTERN = re.compile(
-    r"Per-Host Resource Estimates: Memory=(\d+.?\d*)(T|G|M|K)?B")
+    r"Per-Host Resource Estimates: Memory=(\d+.?\d*)(P|T|G|M|K)?B")
 
 PROFILES_DIR = "profiles"
 RESULT_HASHES_DIR = "result_hashes"
@@ -1359,8 +1361,8 @@ def match_memory_estimate(explain_lines):
     explain_lines: list of str
 
   Returns:
-    2-tuple str of memory limit in decimal string and units (one of 'T', 'G', 'M', 'K',
-    '' bytes)
+    2-tuple str of memory limit in decimal string and units (one of 'P', 'T', 'G', 'M',
+    'K', '' bytes)
 
   Raises:
     Exception if no match found

http://git-wip-us.apache.org/repos/asf/impala/blob/25218487/tests/util/parse_util.py
----------------------------------------------------------------------
diff --git a/tests/util/parse_util.py b/tests/util/parse_util.py
index ad40b68..202d3d3 100644
--- a/tests/util/parse_util.py
+++ b/tests/util/parse_util.py
@@ -65,6 +65,8 @@ def parse_mem_to_mb(mem, units):
     mem *= 10 ** 3
   elif units == "T":
     mem *= 10 ** 6
+  elif units == "P":
+    mem *= 10 ** 9
   else:
     raise Exception('Unexpected memory unit "%s"' % units)
   return int(mem)


[5/6] impala git commit: IMPALA-6759: align stress test memory estimation parse pattern

Posted by bh...@apache.org.
IMPALA-6759: align stress test memory estimation parse pattern

The stress test never expected to see memory estimates on the order of
PB. Apparently it can happen with TPC DS 10000, so update the pattern.

It's not clear how to quickly write a test to catch this, because it
involves crossing language boundaries and possibly having a
massively-scaled dataset. I think leaving a comment in both places is
good enough for now.

Change-Id: I317c271888584ed2a817ee52ad70267eae64d341
Reviewed-on: http://gerrit.cloudera.org:8080/9846
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4028e9c5
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4028e9c5
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4028e9c5

Branch: refs/heads/master
Commit: 4028e9c5ec66fe006d26f8ca5f13daaf474dffbb
Parents: 77efb28
Author: Michael Brown <mi...@cloudera.com>
Authored: Wed Mar 28 15:14:20 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 03:27:25 2018 +0000

----------------------------------------------------------------------
 fe/src/main/java/org/apache/impala/common/PrintUtils.java |  2 ++
 tests/stress/concurrent_select.py                         |  8 +++++---
 tests/util/parse_util.py                                  | 10 ++++++----
 3 files changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4028e9c5/fe/src/main/java/org/apache/impala/common/PrintUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/PrintUtils.java b/fe/src/main/java/org/apache/impala/common/PrintUtils.java
index 77d77dd..9f75134 100644
--- a/fe/src/main/java/org/apache/impala/common/PrintUtils.java
+++ b/fe/src/main/java/org/apache/impala/common/PrintUtils.java
@@ -39,6 +39,8 @@ public class PrintUtils {
   public static String printBytes(long bytes) {
     double result = bytes;
     // Avoid String.format() due to IMPALA-1572 which happens on JDK7 but not JDK6.
+    // IMPALA-6759: Please update tests/stress/concurrent_select.py MEM_ESTIMATE_PATTERN
+    // if you add additional unit prefixes.
     if (bytes >= PETABYTE) return new DecimalFormat(".00PB").format(result / PETABYTE);
     if (bytes >= TERABYTE) return new DecimalFormat(".00TB").format(result / TERABYTE);
     if (bytes >= GIGABYTE) return new DecimalFormat(".00GB").format(result / GIGABYTE);

http://git-wip-us.apache.org/repos/asf/impala/blob/4028e9c5/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index fa8541c..a4bffd9 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -98,8 +98,10 @@ MEM_LIMIT_EQ_THRESHOLD_PC = 0.975
 MEM_LIMIT_EQ_THRESHOLD_MB = 50
 
 # Regex to extract the estimated memory from an explain plan.
+# The unit prefixes can be found in
+# fe/src/main/java/org/apache/impala/common/PrintUtils.java
 MEM_ESTIMATE_PATTERN = re.compile(
-    r"Per-Host Resource Estimates: Memory=(\d+.?\d*)(T|G|M|K)?B")
+    r"Per-Host Resource Estimates: Memory=(\d+.?\d*)(P|T|G|M|K)?B")
 
 PROFILES_DIR = "profiles"
 RESULT_HASHES_DIR = "result_hashes"
@@ -1359,8 +1361,8 @@ def match_memory_estimate(explain_lines):
     explain_lines: list of str
 
   Returns:
-    2-tuple str of memory limit in decimal string and units (one of 'T', 'G', 'M', 'K',
-    '' bytes)
+    2-tuple str of memory limit in decimal string and units (one of 'P', 'T', 'G', 'M',
+    'K', '' bytes)
 
   Raises:
     Exception if no match found

http://git-wip-us.apache.org/repos/asf/impala/blob/4028e9c5/tests/util/parse_util.py
----------------------------------------------------------------------
diff --git a/tests/util/parse_util.py b/tests/util/parse_util.py
index ad40b68..6869489 100644
--- a/tests/util/parse_util.py
+++ b/tests/util/parse_util.py
@@ -56,15 +56,17 @@ def parse_mem_to_mb(mem, units):
   if units.endswith("B"):
     units = units[:-1]
   if not units:
-    mem /= 10 ** 6
+    mem /= 2 ** 20
   elif units == "K":
-    mem /= 10 ** 3
+    mem /= 2 ** 10
   elif units == "M":
     pass
   elif units == "G":
-    mem *= 10 ** 3
+    mem *= 2 ** 10
   elif units == "T":
-    mem *= 10 ** 6
+    mem *= 2 ** 20
+  elif units == "P":
+    mem *= 2 ** 30
   else:
     raise Exception('Unexpected memory unit "%s"' % units)
   return int(mem)


[6/6] impala git commit: IMPALA-5721, IMPALA-6717, IMPALA-6738: improve stress test binary search

Posted by bh...@apache.org.
IMPALA-5721,IMPALA-6717,IMPALA-6738: improve stress test binary search

IMPALA-5721:
- Save profiles of queries at the end of both the spilling and
  non-spilling binary search. These were not being saved before. Note
  these profiles won't have ExecSummary until IMPALA-6640 is addressed.

- Save the profile of any query that produces incorrect results during
  binary search. These were not being saved before, either.

- Use descriptive names, like
  tpch_100_parquet_q12_profile_without_spilling.txt, for profiles
  mentioned above. We do this by introducing the concept of a
  "logical_query_id" whose values look like "tpch_100_parquet_q12".

- Use the logical_query_id in critical error paths and include the
  logical_query_id in result hash files.

IMPALA-6717:
- Plumb --common-query-options through to the binary search.

IMPALA-6738:
- Begin a refactoring to reduce the number of parameters used when doing
  the binary search.

- Introduce a notion of "converted args" via class that does the
  conversion (if needed) via property getters.

- Adjust populate_all_queries() to use converted_args

Change-Id: I33d036ec93df3016cd4703205078dbdba0168acb
Reviewed-on: http://gerrit.cloudera.org:8080/9770
Reviewed-by: David Knupp <dk...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8091b2f4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8091b2f4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8091b2f4

Branch: refs/heads/master
Commit: 8091b2f469a3678561e47e918ffe8db0eb5d48db
Parents: 4028e9c
Author: Michael Brown <mi...@cloudera.com>
Authored: Wed Feb 28 11:02:11 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 03:38:35 2018 +0000

----------------------------------------------------------------------
 tests/stress/concurrent_select.py | 289 ++++++++++++++++++++++++---------
 1 file changed, 213 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/8091b2f4/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index a4bffd9..44d7b34 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -62,17 +62,19 @@ import sys
 import threading
 import traceback
 from Queue import Empty   # Must be before Queue below
+from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
 from collections import defaultdict
 from contextlib import contextmanager
 from datetime import datetime
 from multiprocessing import Lock, Process, Queue, Value
-from random import choice, random, randrange
+from random import choice, random, randrange, shuffle
 from sys import exit, maxint
 from tempfile import gettempdir
 from textwrap import dedent
 from threading import current_thread, Thread
 from time import sleep, time
 
+import tests.comparison.cli_options as cli_options
 import tests.util.test_file_parser as test_file_parser
 from tests.comparison.cluster import Timeout
 from tests.comparison.db_types import Int, TinyInt, SmallInt, BigInt
@@ -110,6 +112,83 @@ RESULT_HASHES_DIR = "result_hashes"
 RUNTIME_INFO_FILE_VERSION = 3
 
 
+class StressArgConverter(object):
+  def __init__(self, args):
+    """
+    Convert arguments as returned from from argparse parse_args() into internal forms.
+
+    The purpose of this object is to do any conversions needed from the type given by
+    parge_args() into internal forms. For example, if a commandline option takes in a
+    complicated string that needs to be converted into a list or dictionary, this is the
+    place to do it. Access works the same as on the object returned by parse_args(),
+    i.e., object.option_attribute.
+
+    In most cases, simple arguments needn't be converted, because argparse handles the
+    type conversion already, and in most cases, type conversion (e.g., "8" <str> to 8
+    <int>) is all that's needed. If a property getter below doesn't exist, it means the
+    argument value is just passed along unconverted.
+
+    Params:
+      args: argparse.Namespace object (from argparse.ArgumentParser().parse_args())
+    """
+    assert isinstance(args, Namespace), "expected Namespace, got " + str(type(args))
+    self._args = args
+    self._common_query_options = None
+
+  def __getattr__(self, attr):
+    # This "proxies through" all the attributes from the Namespace object that are not
+    # defined in this object via property getters below.
+    return getattr(self._args, attr)
+
+  @property
+  def common_query_options(self):
+    # Memoize this, as the integrity checking of --common-query-options need only
+    # happen once.
+    if self._common_query_options is not None:
+      return self._common_query_options
+    # The stress test sets these, so callers cannot override them.
+    IGNORE_QUERY_OPTIONS = frozenset([
+        'ABORT_ON_ERROR',
+        'MEM_LIMIT',
+    ])
+    common_query_options = {}
+    if self._args.common_query_options is not None:
+      for query_option_and_value in self._args.common_query_options:
+        try:
+          query_option, value = query_option_and_value.split('=')
+        except ValueError:
+          LOG.error(
+              "Could not parse --common-query-options: '{common_query_options}'".format(
+                  common_query_options=self._args.common_query_options))
+          exit(1)
+        query_option = query_option.upper()
+        if query_option in common_query_options:
+          LOG.error(
+              "Query option '{query_option}' already defined in --common-query-options: "
+              "'{common_query_options}'".format(
+                  query_option=query_option,
+                  common_query_options=self._args.common_query_options))
+          exit(1)
+        elif query_option in IGNORE_QUERY_OPTIONS:
+          LOG.warn(
+              "Ignoring '{query_option}' in common query options: '{opt}': "
+              "The stress test algorithm needs control of this option.".format(
+                  query_option=query_option, opt=self._args.common_query_options))
+        else:
+          common_query_options[query_option] = value
+          LOG.debug("Common query option '{query_option}' set to '{value}'".format(
+              query_option=query_option, value=value))
+    self._common_query_options = common_query_options
+    return self._common_query_options
+
+  @property
+  def runtime_info_path(self):
+    runtime_info_path = self._args.runtime_info_path
+    if "{cm_host}" in runtime_info_path:
+      runtime_info_path = runtime_info_path.format(cm_host=self._args.cm_host)
+    return runtime_info_path
+
+
 def create_and_start_daemon_thread(fn, name):
   thread = Thread(target=fn, name=name)
   thread.error = None
@@ -169,7 +248,9 @@ def print_crash_info_if_exists(impala, start_time):
 class QueryReport(object):
   """Holds information about a single query run."""
 
-  def __init__(self):
+  def __init__(self, query):
+    self.query = query
+
     self.result_hash = None
     self.runtime_secs = None
     self.mem_was_spilled = False
@@ -180,6 +261,32 @@ class QueryReport(object):
     self.profile = None
     self.query_id = None
 
+  def write_query_profile(self, directory, prefix=None):
+    """
+    Write out the query profile bound to this object to a given directory.
+
+    The file name is generated and will contain the query ID. Use the optional prefix
+    parameter to set a prefix on the filename.
+
+    Example return:
+      tpcds_300_decimal_parquet_q21_00000001_a38c8331_profile.txt
+
+    Parameters:
+      directory (str): Directory to write profile.
+      prefix (str): Prefix for filename.
+    """
+    if not (self.profile and self.query_id):
+      return
+    if prefix is not None:
+      file_name = prefix + '_'
+    else:
+      file_name = ''
+    file_name += self.query.logical_query_id + '_'
+    file_name += self.query_id.replace(":", "_") + "_profile.txt"
+    profile_log_path = os.path.join(directory, file_name)
+    with open(profile_log_path, "w") as profile_log:
+      profile_log.write(self.profile)
+
 
 class MemBroker(object):
   """Provides memory usage coordination for clients running in different processes.
@@ -632,14 +739,18 @@ class StressRunner(object):
             continue
           increment(self._num_successive_errors)
           increment(self._num_other_errors)
-          self._write_query_profile(report)
-          raise Exception("Query {0} failed: {1}".format(report.query_id, error_msg))
+          self._write_query_profile(report, PROFILES_DIR, prefix='error')
+          raise Exception("Query {query} ID {id} failed: {mesg}".format(
+              query=query.logical_query_id,
+              id=report.query_id,
+              mesg=error_msg))
         if (
             report.mem_limit_exceeded and
             not self._mem_broker.was_overcommitted(reservation_id)
         ):
           increment(self._num_successive_errors)
-          self._write_query_profile(report)
+          self._write_query_profile(
+              report, PROFILES_DIR, prefix='unexpected_mem_exceeded')
           raise Exception("Unexpected mem limit exceeded; mem was not overcommitted. "
                           "Query ID: {0}".format(report.query_id))
         if (
@@ -649,18 +760,19 @@ class StressRunner(object):
         ):
           increment(self._num_successive_errors)
           increment(self._num_result_mismatches)
-          self._write_query_profile(report)
+          self._write_query_profile(report, PROFILES_DIR, prefix='incorrect_results')
           raise Exception(dedent("""\
                                  Result hash mismatch; expected {expected}, got {actual}
                                  Query ID: {id}
                                  Query: {query}""".format(expected=query.result_hash,
                                                           actual=report.result_hash,
                                                           id=report.query_id,
-                                                          query=query.sql)))
+                                                          query=query.logical_query_id)))
         if report.timed_out and not should_cancel:
-          self._write_query_profile(report)
+          self._write_query_profile(report, PROFILES_DIR, prefix='timed_out')
           raise Exception(
-              "Query unexpectedly timed out. Query ID: {0}".format(report.query_id))
+              "Query {query} unexpectedly timed out. Query ID: {id}".format(
+                  query=query.logical_query_id, id=report.query_id))
         self._num_successive_errors.value = 0
 
   def _print_status_header(self):
@@ -706,13 +818,10 @@ class StressRunner(object):
     if report.timed_out:
       increment(self._num_queries_timedout)
 
-  def _write_query_profile(self, report):
-    if not (report.profile and report.query_id):
-      return
-    file_name = report.query_id.replace(":", "_") + "_profile.txt"
-    profile_log_path = os.path.join(self.results_dir, PROFILES_DIR, file_name)
-    with open(profile_log_path, "w") as profile_log:
-      profile_log.write(report.profile)
+  def _write_query_profile(self, report, subdir, prefix=None):
+    report.write_query_profile(
+        os.path.join(self.results_dir, subdir),
+        prefix)
 
   def _check_successive_errors(self):
     if (self._num_successive_errors.value >= self.num_successive_errors_needed_to_abort):
@@ -807,6 +916,8 @@ class Query(object):
     self.result_hash = None
     self.required_mem_mb_with_spilling = None
     self.required_mem_mb_without_spilling = None
+    self.solo_runtime_profile_with_spilling = None
+    self.solo_runtime_profile_without_spilling = None
     self.solo_runtime_secs_with_spilling = None
     self.solo_runtime_secs_without_spilling = None
     # Query options to set before running the query.
@@ -818,6 +929,8 @@ class Query(object):
     # UPSERT, DELETE.
     self.query_type = QueryType.SELECT
 
+    self._logical_query_id = None
+
   def __repr__(self):
     return dedent("""
         <Query
@@ -832,6 +945,29 @@ class Query(object):
         Population order: %(population_order)r>
         """.strip() % self.__dict__)
 
+  @property
+  def logical_query_id(self):
+    """
+    Return a meanginful unique str identifier for the query.
+
+    Example: "tpcds_300_decimal_parquet_q21"
+    """
+    if self._logical_query_id is None:
+      self._logical_query_id = '{0}_{1}'.format(self.db_name, self.name)
+    return self._logical_query_id
+
+  def write_runtime_info_profiles(self, directory):
+    """Write profiles for spilling and non-spilling into directory (str)."""
+    profiles_to_write = [
+        (self.logical_query_id + "_profile_with_spilling.txt",
+         self.solo_runtime_profile_with_spilling),
+        (self.logical_query_id + "_profile_without_spilling.txt",
+         self.solo_runtime_profile_without_spilling),
+    ]
+    for filename, profile in profiles_to_write:
+      with open(os.path.join(directory, filename), "w") as fh:
+        fh.write(profile)
+
 
 class QueryRunner(object):
   """Encapsulates functionality to run a query and provide a runtime report."""
@@ -856,7 +992,7 @@ class QueryRunner(object):
       self.impalad_conn = None
 
   def run_query(self, query, timeout_secs, mem_limit_mb, run_set_up=False,
-                should_cancel=False):
+                should_cancel=False, retain_profile=False):
     """Run a query and return an execution report. If 'run_set_up' is True, set up sql
     will be executed before the main query. This should be the case during the binary
     search phase of the stress test.
@@ -868,7 +1004,7 @@ class QueryRunner(object):
       raise Exception("connect() must first be called")
 
     timeout_unix_time = time() + timeout_secs
-    report = QueryReport()
+    report = QueryReport(query)
     try:
       with self.impalad_conn.cursor() as cursor:
         start_time = time()
@@ -914,7 +1050,8 @@ class QueryRunner(object):
           if query.query_type == QueryType.SELECT:
             try:
               report.result_hash = self._hash_result(cursor, timeout_unix_time, query)
-              if query.result_hash and report.result_hash != query.result_hash:
+              if retain_profile or \
+                 query.result_hash and report.result_hash != query.result_hash:
                 fetch_and_set_profile(cursor, report)
             except QueryTimeout:
               self._cancel(cursor, report)
@@ -1004,7 +1141,7 @@ class QueryRunner(object):
     def hash_result_impl():
       result_log = None
       try:
-        file_name = query_id.replace(":", "_")
+        file_name = '_'.join([query.logical_query_id, query_id.replace(":", "_")])
         if query.result_hash is None:
           file_name += "_initial"
         file_name += "_results.txt"
@@ -1159,7 +1296,8 @@ def populate_runtime_info_for_random_queries(
 
 def populate_runtime_info(
     query, impala, use_kerberos, results_dir,
-    timeout_secs=maxint, samples=1, max_conflicting_samples=0
+    timeout_secs=maxint, samples=1, max_conflicting_samples=0,
+    common_query_options=None
 ):
   """Runs the given query by itself repeatedly until the minimum memory is determined
   with and without spilling. Potentially all fields in the Query class (except
@@ -1177,6 +1315,7 @@ def populate_runtime_info(
   LOG.info("Collecting runtime info for query %s: \n%s", query.name, query.sql)
   runner = QueryRunner()
   runner.check_if_mem_was_spilled = True
+  runner.common_query_options = common_query_options
   runner.impalad = impala.impalads[0]
   runner.results_dir = results_dir
   runner.use_kerberos = use_kerberos
@@ -1191,6 +1330,8 @@ def populate_runtime_info(
   old_required_mem_mb_without_spilling = query.required_mem_mb_without_spilling
   old_required_mem_mb_with_spilling = query.required_mem_mb_with_spilling
 
+  profile_error_prefix = query.logical_query_id + "_binsearch_error"
+
   # TODO: This method is complicated enough now that breaking it out into a class may be
   # helpful to understand the structure.
 
@@ -1203,30 +1344,45 @@ def populate_runtime_info(
       ):
         query.required_mem_mb_with_spilling = required_mem
         query.solo_runtime_secs_with_spilling = report.runtime_secs
+        query.solo_runtime_profile_with_spilling = report.profile
     elif (
         query.required_mem_mb_without_spilling is None or
         required_mem < query.required_mem_mb_without_spilling
     ):
       query.required_mem_mb_without_spilling = required_mem
       query.solo_runtime_secs_without_spilling = report.runtime_secs
+      query.solo_runtime_profile_without_spilling = report.profile
 
   def get_report(desired_outcome=None):
     reports_by_outcome = defaultdict(list)
     leading_outcome = None
     for remaining_samples in xrange(samples - 1, -1, -1):
-      report = runner.run_query(query, timeout_secs, mem_limit, run_set_up=True)
+      report = runner.run_query(query, timeout_secs, mem_limit,
+                                run_set_up=True, retain_profile=True)
       if report.timed_out:
-        raise QueryTimeout()
+        report.write_query_profile(
+            os.path.join(results_dir, PROFILES_DIR),
+            profile_error_prefix)
+        raise QueryTimeout(
+            "query {0} timed out during binary search".format(query.logical_query_id))
       if report.non_mem_limit_error:
-        raise report.non_mem_limit_error
+        report.write_query_profile(
+            os.path.join(results_dir, PROFILES_DIR),
+            profile_error_prefix)
+        raise Exception(
+            "query {0} errored during binary search: {1}".format(
+                query.logical_query_id, str(report.non_mem_limit_error)))
       LOG.debug("Spilled: %s" % report.mem_was_spilled)
       if not report.mem_limit_exceeded:
         if query.result_hash is None:
           query.result_hash = report.result_hash
         elif query.result_hash != report.result_hash:
+          report.write_query_profile(
+              os.path.join(results_dir, PROFILES_DIR),
+              profile_error_prefix)
           raise Exception(
-              "Result hash mismatch; expected %s, got %s" %
-              (query.result_hash, report.result_hash))
+              "Result hash mismatch for query %s; expected %s, got %s" %
+              (query.logical_query_id, query.result_hash, report.result_hash))
 
       if report.mem_limit_exceeded:
         outcome = "EXCEEDED"
@@ -1332,8 +1488,16 @@ def populate_runtime_info(
       upper_bound = mem_limit
     if should_break:
       if not query.required_mem_mb_with_spilling:
+        if upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB:
+          # IMPALA-6604: A fair amount of queries go down this path.
+          LOG.info(
+              "Unable to find a memory limit with spilling within the threshold of {0} "
+              "MB. Using the same memory limit for both.".format(
+                  MEM_LIMIT_EQ_THRESHOLD_MB))
         query.required_mem_mb_with_spilling = query.required_mem_mb_without_spilling
         query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling
+        query.solo_runtime_profile_with_spilling = \
+            query.solo_runtime_profile_without_spilling
       break
   LOG.info("Minimum memory is %s MB" % query.required_mem_mb_with_spilling)
   if (
@@ -1349,6 +1513,7 @@ def populate_runtime_info(
         " the absolute minimum memory.")
     query.required_mem_mb_with_spilling = query.required_mem_mb_without_spilling
     query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling
+    query.solo_runtime_profile_with_spilling = query.solo_runtime_profile_without_spilling
   LOG.debug("Query after populating runtime info: %s", query)
 
 
@@ -1420,9 +1585,12 @@ def save_runtime_info(path, query, impala):
     class JsonEncoder(json.JSONEncoder):
       def default(self, obj):
         data = dict(obj.__dict__)
-        # Queries are stored by sql, so remove the duplicate data.
-        if "sql" in data:
-          del data["sql"]
+        # Queries are stored by sql, so remove the duplicate data. Also don't store
+        # profiles as JSON values, but instead separately.
+        for k in ("sql", "solo_runtime_profile_with_spilling",
+                  "solo_runtime_profile_without_spilling"):
+          if k in data:
+            del data[k]
         return data
     json.dump(
         store, file, cls=JsonEncoder, sort_keys=True, indent=2, separators=(',', ': '))
@@ -1690,9 +1858,12 @@ def reset_databases(cursor):
                   " exist in '{1}' database.".format(table_name, cursor.db_name))
 
 
-def populate_all_queries(queries, impala, args, runtime_info_path,
-                         queries_with_runtime_info_by_db_sql_and_options):
+def populate_all_queries(
+    queries, impala, converted_args, queries_with_runtime_info_by_db_sql_and_options
+):
   """Populate runtime info for all queries, ordered by the population_order property."""
+  common_query_options = converted_args.common_query_options
+  runtime_info_path = converted_args.runtime_info_path
   result = []
   queries_by_order = {}
   for query in queries:
@@ -1712,9 +1883,13 @@ def populate_all_queries(queries, impala, args, runtime_info_path,
             query.db_name][query.sql][str(sorted(query.options.items()))])
       else:
         populate_runtime_info(
-            query, impala, args.use_kerberos, args.results_dir,
-            samples=args.samples, max_conflicting_samples=args.max_conflicting_samples)
+            query, impala, converted_args.use_kerberos, converted_args.results_dir,
+            samples=converted_args.samples,
+            max_conflicting_samples=converted_args.max_conflicting_samples,
+            common_query_options=common_query_options)
         save_runtime_info(runtime_info_path, query, impala)
+        query.write_runtime_info_profiles(
+            os.path.join(converted_args.results_dir, PROFILES_DIR))
         result.append(query)
   return result
 
@@ -1745,10 +1920,6 @@ def print_version(cluster):
 
 
 def main():
-  from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
-  from random import shuffle
-  import tests.comparison.cli_options as cli_options
-
   parser = ArgumentParser(
       epilog=dedent("""
       Before running this script a CM cluster must be setup and any needed data
@@ -1891,6 +2062,9 @@ def main():
       "or set valid values. Example: --common-query-options "
       "DISABLE_CODEGEN=true RUNTIME_FILTER_MODE=1")
   args = parser.parse_args()
+  converted_args = StressArgConverter(args)
+  common_query_options = converted_args.common_query_options
+  runtime_info_path = converted_args.runtime_info_path
 
   cli_options.configure_logging(
       args.log_level, debug_log_file=args.debug_log_file, log_thread_name=True,
@@ -1906,40 +2080,6 @@ def main():
         "At least one of --tpcds-db, --tpch-db, --tpch-kudu-db,"
         "--tpcds-kudu-db, --tpch-nested-db, --random-db, --query-file-path is required")
 
-  # The stress test sets these, so callers cannot override them.
-  IGNORE_QUERY_OPTIONS = frozenset([
-      'ABORT_ON_ERROR',
-      'MEM_LIMIT',
-  ])
-
-  common_query_options = {}
-  if args.common_query_options is not None:
-    for query_option_and_value in args.common_query_options:
-      try:
-        query_option, value = query_option_and_value.split('=')
-      except ValueError:
-        LOG.error(
-            "Could not parse --common-query-options: '{common_query_options}'".format(
-                common_query_options=args.common_query_options))
-        exit(1)
-      query_option = query_option.upper()
-      if query_option in common_query_options:
-        LOG.error(
-            "Query option '{query_option}' already defined in --common-query-options: "
-            "'{common_query_options}'".format(
-                query_option=query_option,
-                common_query_options=args.common_query_options))
-        exit(1)
-      elif query_option in IGNORE_QUERY_OPTIONS:
-        LOG.warn(
-            "Ignoring '{query_option}' in common query options: '{opt}': "
-            "The stress test algorithm needs control of this option.".format(
-                query_option=query_option, opt=args.common_query_options))
-      else:
-        common_query_options[query_option] = value
-        LOG.debug("Common query option '{query_option}' set to '{value}'".format(
-            query_option=query_option, value=value))
-
   os.mkdir(os.path.join(args.results_dir, RESULT_HASHES_DIR))
   os.mkdir(os.path.join(args.results_dir, PROFILES_DIR))
 
@@ -1956,9 +2096,6 @@ def main():
     raise Exception("Queries are currently running on the cluster")
   impala.min_impalad_mem_mb = min(impala.find_impalad_mem_mb_limit())
 
-  runtime_info_path = args.runtime_info_path
-  if "{cm_host}" in runtime_info_path:
-    runtime_info_path = runtime_info_path.format(cm_host=args.cm_host)
   queries_with_runtime_info_by_db_sql_and_options = load_runtime_info(
       runtime_info_path, impala)
 
@@ -2026,8 +2163,8 @@ def main():
       with impala.cursor(db_name=database) as cursor:
         reset_databases(cursor)
 
-  queries = populate_all_queries(queries, impala, args, runtime_info_path,
-                                 queries_with_runtime_info_by_db_sql_and_options)
+  queries = populate_all_queries(
+      queries, impala, converted_args, queries_with_runtime_info_by_db_sql_and_options)
 
   # A particular random query may either fail (due to a generator or Impala bug) or
   # take a really long time to complete. So the queries needs to be validated. Since the


[2/6] impala git commit: Revert "IMPALA-6759: align stress test memory estimation parse pattern"

Posted by bh...@apache.org.
Revert "IMPALA-6759: align stress test memory estimation parse pattern"

This reverts commit 25218487533f6bf6959c32ff4ee38b77e0ab30b5.


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2c0926e2
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2c0926e2
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2c0926e2

Branch: refs/heads/master
Commit: 2c0926e2de22ecafafc460f2b31ca2423b8f7e98
Parents: 2521848
Author: Michael Brown <mi...@cloudera.com>
Authored: Wed Mar 28 15:28:48 2018 -0700
Committer: Michael Brown <mi...@cloudera.com>
Committed: Wed Mar 28 15:28:48 2018 -0700

----------------------------------------------------------------------
 fe/src/main/java/org/apache/impala/common/PrintUtils.java | 2 --
 tests/stress/concurrent_select.py                         | 8 +++-----
 tests/util/parse_util.py                                  | 2 --
 3 files changed, 3 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2c0926e2/fe/src/main/java/org/apache/impala/common/PrintUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/PrintUtils.java b/fe/src/main/java/org/apache/impala/common/PrintUtils.java
index 9f75134..77d77dd 100644
--- a/fe/src/main/java/org/apache/impala/common/PrintUtils.java
+++ b/fe/src/main/java/org/apache/impala/common/PrintUtils.java
@@ -39,8 +39,6 @@ public class PrintUtils {
   public static String printBytes(long bytes) {
     double result = bytes;
     // Avoid String.format() due to IMPALA-1572 which happens on JDK7 but not JDK6.
-    // IMPALA-6759: Please update tests/stress/concurrent_select.py MEM_ESTIMATE_PATTERN
-    // if you add additional unit prefixes.
     if (bytes >= PETABYTE) return new DecimalFormat(".00PB").format(result / PETABYTE);
     if (bytes >= TERABYTE) return new DecimalFormat(".00TB").format(result / TERABYTE);
     if (bytes >= GIGABYTE) return new DecimalFormat(".00GB").format(result / GIGABYTE);

http://git-wip-us.apache.org/repos/asf/impala/blob/2c0926e2/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index a4bffd9..fa8541c 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -98,10 +98,8 @@ MEM_LIMIT_EQ_THRESHOLD_PC = 0.975
 MEM_LIMIT_EQ_THRESHOLD_MB = 50
 
 # Regex to extract the estimated memory from an explain plan.
-# The unit prefixes can be found in
-# fe/src/main/java/org/apache/impala/common/PrintUtils.java
 MEM_ESTIMATE_PATTERN = re.compile(
-    r"Per-Host Resource Estimates: Memory=(\d+.?\d*)(P|T|G|M|K)?B")
+    r"Per-Host Resource Estimates: Memory=(\d+.?\d*)(T|G|M|K)?B")
 
 PROFILES_DIR = "profiles"
 RESULT_HASHES_DIR = "result_hashes"
@@ -1361,8 +1359,8 @@ def match_memory_estimate(explain_lines):
     explain_lines: list of str
 
   Returns:
-    2-tuple str of memory limit in decimal string and units (one of 'P', 'T', 'G', 'M',
-    'K', '' bytes)
+    2-tuple str of memory limit in decimal string and units (one of 'T', 'G', 'M', 'K',
+    '' bytes)
 
   Raises:
     Exception if no match found

http://git-wip-us.apache.org/repos/asf/impala/blob/2c0926e2/tests/util/parse_util.py
----------------------------------------------------------------------
diff --git a/tests/util/parse_util.py b/tests/util/parse_util.py
index 202d3d3..ad40b68 100644
--- a/tests/util/parse_util.py
+++ b/tests/util/parse_util.py
@@ -65,8 +65,6 @@ def parse_mem_to_mb(mem, units):
     mem *= 10 ** 3
   elif units == "T":
     mem *= 10 ** 6
-  elif units == "P":
-    mem *= 10 ** 9
   else:
     raise Exception('Unexpected memory unit "%s"' % units)
   return int(mem)


[4/6] impala git commit: KUDU-2385: Fix typo in KinitContext::DoRenewal()

Posted by bh...@apache.org.
KUDU-2385: Fix typo in KinitContext::DoRenewal()

On platforms without krb5_get_init_creds_opt_set_out_ccache(),
krb5_cc_store_cred() is called to insert the newly acquired
credential into the ccache. However, there was a typo in the code
which resulted in inserting the old credential into ccache.
This change fixes the typo to make sure the new credential is
inserted into ccache.

Testing done: confirmed on SLES11 that the new credential
is being inserted by checking the 'auth time' of the ticket
in ccache. Impala uses a slightly different #ifdef which
explicitly checks if krb5_get_init_creds_opt_set_out_ccache()
is defined on the platform so this code path is actually
used when running Impala on SLES11.

Change-Id: I3a22b8d41d15eb1982a3fd5b96575e28edaad31c
Reviewed-on: http://gerrit.cloudera.org:8080/9840
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Todd Lipcon <to...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/9842
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/77efb282
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/77efb282
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/77efb282

Branch: refs/heads/master
Commit: 77efb2820e62f8066c2ee00593bd5deeb9f7d9a6
Parents: 2883c99
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Mar 28 10:53:24 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 01:40:06 2018 +0000

----------------------------------------------------------------------
 be/src/kudu/security/init.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/77efb282/be/src/kudu/security/init.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/init.cc b/be/src/kudu/security/init.cc
index 7674c7e..340ba75 100644
--- a/be/src/kudu/security/init.cc
+++ b/be/src/kudu/security/init.cc
@@ -299,7 +299,7 @@ Status KinitContext::DoRenewal() {
       KRB5_RETURN_NOT_OK_PREPEND(krb5_cc_initialize(g_krb5_ctx, ccache_, principal_),
                                  "Reacquire error: could not init ccache");
 
-      KRB5_RETURN_NOT_OK_PREPEND(krb5_cc_store_cred(g_krb5_ctx, ccache_, &creds),
+      KRB5_RETURN_NOT_OK_PREPEND(krb5_cc_store_cred(g_krb5_ctx, ccache_, &new_creds),
                                  "Reacquire error: could not store creds in cache");
 #endif
     }


[3/6] impala git commit: IMPALA-6747: Automate diagnostics collection.

Posted by bh...@apache.org.
IMPALA-6747: Automate diagnostics collection.

This commit adds the necessary tooling to automate diagnostics
collection for Impala daemons. Following diagnostics are supported.

1. Native core dump (+ shared libs)
2. GDB/Java thread dump (pstack + jstack)
3. Java heap dump (jmap)
4. Minidumps (using breakpad) *
5. Profiles

Given the required inputs, the script outputs a zip compressed
impala diagnostic bundle with all the diagnostics collected.

The script can be run manually with the following command.

python collect_diagnostics.py --help

* minidumps collected here correspond to the state of the Impala
process at the time this script is triggered. This is different
from collect_minidumps.py which archives the entire minidump
directory.

Change-Id: Ib29caec7c3be5b6a31e60461294979c318300f64
Reviewed-on: http://gerrit.cloudera.org:8080/9815
Reviewed-by: Lars Volker <lv...@cloudera.com>
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2883c995
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2883c995
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2883c995

Branch: refs/heads/master
Commit: 2883c9950026db74240a69ab07e867810b8547b0
Parents: 2c0926e
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Mon Dec 4 13:38:09 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 00:12:18 2018 +0000

----------------------------------------------------------------------
 bin/diagnostics/__init__.py            |   0
 bin/diagnostics/collect_diagnostics.py | 518 ++++++++++++++++++++++++++++
 bin/diagnostics/collect_shared_libs.sh |  52 +++
 bin/rat_exclude_files.txt              |   1 +
 tests/unittests/test_command.py        |  49 +++
 5 files changed, 620 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2883c995/bin/diagnostics/__init__.py
----------------------------------------------------------------------
diff --git a/bin/diagnostics/__init__.py b/bin/diagnostics/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/impala/blob/2883c995/bin/diagnostics/collect_diagnostics.py
----------------------------------------------------------------------
diff --git a/bin/diagnostics/collect_diagnostics.py b/bin/diagnostics/collect_diagnostics.py
new file mode 100644
index 0000000..6abc30a
--- /dev/null
+++ b/bin/diagnostics/collect_diagnostics.py
@@ -0,0 +1,518 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import datetime
+import errno
+import getpass
+import glob
+import logging
+import math
+import os
+import shutil
+import subprocess
+import sys
+import time
+import tempfile
+import traceback
+
+from collections import namedtuple
+from struct import Struct
+from threading import Timer
+
+# This script is for automating the collection of following diagnostics from a host
+# running an Impala service daemon (catalogd/statestored/impalad). Following diagnostics
+# are supported.
+#
+# 1. Native core dump (+ shared libs)
+# 2. GDB/Java thread dump (pstack + jstack)
+# 3. Java heap dump (jmap)
+# 4. Minidumps (using breakpad)
+# 5. Profiles
+#
+# Dependencies:
+# 1. gdb package should be installed to collect native thread stacks/coredump. The binary
+#    location is picked up from the system path. In case of pstacks, the script falls back
+#    to the breakpad minidumps if the 'pstack' binary is not in system path.
+# 2. jstack/jmap from a JRE/JDK. Default location is picked up from system path but can be
+#    overriden with --java_home PATH_TO_JAVA_HOME.
+# 3. Mindumps are collected by sending a SIGUSR1 signal to the Impala process. Impala
+#    versions without full breakpad support (<= release 2.6) will reliably crash if
+#    we attempt to do that since those versions do not have the corresponding signal
+#    handler. Hence it is suggested to run this script only on releases 2.7 and later.
+#
+# Usage: python collect_diagnostics.py --help
+#
+# Few example usages:
+#
+# Collect 3 jstacks, pstacks from an impalad process 3s apart.
+#  python collect_diagnostics.py --pid $(pidof impalad) --stacks 3 3
+#
+# Collect core dump and a Java heapdump from the catalogd process
+#  python collect_diagnostics.py --pid $(pidof impalad) --jmap --gcore
+#
+# Collect 5 breakpad minidumps from a statestored process 5s apart.
+#  python collect_diagnostics.py --pid $(pidof statestored) --minidumps 5 5
+#      --minidumps_dir /var/log/statestored/minidumps
+#
+#
+class Command(object):
+  """Wrapper around subprocess.Popen() that is canceled after a configurable timeout."""
+  def __init__(self, cmd, timeout=30):
+    self.cmd = cmd
+    self.timeout = timeout
+    self.child_killed_by_timeout = False
+
+  def run(self, cmd_stdin=None, cmd_stdout=subprocess.PIPE):
+    """Runs the command 'cmd' by setting the appropriate stdin/out. The command is killed
+    if hits a timeout (controlled by self.timeout)."""
+    cmd_string = " ".join(self.cmd)
+    logging.info("Starting command %s with a timeout of %s"
+        % (cmd_string, str(self.timeout)))
+    self.child = subprocess.Popen(self.cmd, stdin=cmd_stdin, stdout=cmd_stdout)
+    timer = Timer(self.timeout, self.kill_child)
+    try:
+      timer.start()
+      # self.stdout is set to None if cmd_stdout is anything other than PIPE. The actual
+      # stdout is written to the file corresponding to cmd_stdout.
+      self.stdout = self.child.communicate()[0]
+      if self.child.returncode == 0:
+        logging.info("Command finished successfully: " + cmd_string)
+      else:
+        cmd_status = "timed out" if self.child_killed_by_timeout else "failed"
+        logging.error("Command %s: %s" % (cmd_status, cmd_string))
+      return self.child.returncode
+    finally:
+      timer.cancel()
+    return -1
+
+  def kill_child(self):
+    """Kills the running command (self.child)."""
+    self.child_killed_by_timeout = True
+    self.child.kill()
+
+class ImpalaDiagnosticsHandler(object):
+  IMPALA_PROCESSES = ["impalad", "catalogd", "statestored"]
+  OUTPUT_DIRS_TO_CREATE = ["stacks", "gcores", "jmaps", "profiles",
+      "shared_libs", "minidumps"]
+  MINIDUMP_HEADER = namedtuple("MDRawHeader", "signature version stream_count \
+      stream_directory_rva checksum time_date_stamp flags")
+
+  def __init__(self, args):
+    """Initializes the state by setting the paths of required executables."""
+    self.args = args
+    if args.pid <= 0:
+      return
+
+    self.script_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
+    # Name of the Impala process for which diagnostics should be collected.
+    self.target_process_name = self.get_target_process_name()
+
+    self.java_home = self.get_java_home_from_env()
+    if not self.java_home and args.java_home:
+      self.java_home = os.path.abspath(args.java_home)
+    self.jstack_cmd = os.path.join(self.java_home, "bin/jstack")
+    self.java_cmd = os.path.join(self.java_home, "bin/java")
+    self.jmap_cmd = os.path.join(self.java_home, "bin/jmap")
+
+    self.gdb_cmd = self.get_command_from_path("gdb")
+    self.gcore_cmd = self.get_command_from_path("gcore")
+    self.pstack_cmd = self.get_command_from_path("pstack")
+
+  def create_output_dir_structure(self):
+    """Creates the skeleton directory structure for the diagnostics output collection."""
+    self.collection_root_dir = tempfile.mkdtemp(prefix="impala-diagnostics-%s" %
+        datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S-"),
+        dir=os.path.abspath(self.args.output_dir))
+    for dirname in self.OUTPUT_DIRS_TO_CREATE:
+      os.mkdir(os.path.join(self.collection_root_dir, dirname))
+
+  def get_command_from_path(self, cmd):
+    """Returns the path to a given command executable, if one exists in the
+    system PATH."""
+    for path in os.environ["PATH"].split(os.pathsep):
+      cmd_path = os.path.join(path, cmd)
+      if os.access(cmd_path, os.X_OK):
+        return cmd_path
+    return ""
+
+  def get_target_process_name(self):
+    """Returns the process name of the target process for which diagnostics
+    should be collected."""
+    try:
+      return open("/proc/%s/comm" % self.args.pid).read().strip()
+    except Exception:
+      logging.exception("Failed to get target process name.")
+      return ""
+
+  def get_num_child_proc(self, name):
+    """Returns number of processes with the given name and target Impala pid
+    as parent."""
+    cmd = Command(["pgrep", "-c", "-P", str(self.args.pid), name])
+    cmd.run()
+    return int(cmd.stdout.strip())
+
+  def get_java_home_from_env(self):
+    """Returns JAVA_HOME set in the env of the target process."""
+    try:
+      envs = open("/proc/%s/environ" % self.args.pid).read().split("\0")
+      for s in envs:
+        k, v = s.split("=", 1)
+        if k == "JAVA_HOME":
+          return v
+    except Exception:
+      logging.exception("Failed to determine JAVA_HOME from proc env.")
+      return ""
+
+  def get_free_disk_space_gbs(self, path):
+    """Returns free disk space (in GBs) of the partition hosting the given path."""
+    s = os.statvfs(path)
+    return (s.f_bsize * s.f_bavail)/(1024.0 * 1024.0 * 1024.0)
+
+  def get_minidump_create_timestamp(self, minidump_path):
+    """Returns the unix timestamp of the minidump create time. It is extracted from
+    the minidump header."""
+    # Read the minidump's header to extract the create time stamp. More information about
+    # the mindump header format can be found here: https://goo.gl/uxKZVe
+    #
+    # typedef struct {
+    #   uint32_t  signature;
+    #   uint32_t  version;
+    #   uint32_t  stream_count;
+    #   MDRVA     stream_directory_rva;  /* A |stream_count|-sized array of
+    #                                     * MDRawDirectory structures. */
+    #   uint32_t  checksum;              /* Can be 0.  In fact, that's all that's
+    #                                     * been found in minidump files. */
+    #   uint32_t  time_date_stamp;       /* time_t */
+    #   uint64_t  flags;
+    # } MDRawHeader;  /* MINIDUMP_HEADER */
+    s = Struct("IIIiIIQ")
+    data = open(minidump_path, "rb").read(s.size)
+    header = self.MINIDUMP_HEADER(*s.unpack_from(data))
+    return header.time_date_stamp
+
+  def wait_for_minidump(self):
+    """Minidump collection is async after sending the SIGUSR1 signal. So this method
+    waits till it is written to the disk. Since minidump forks off a new process from
+    the parent Impala process we need to wait till the forked process exits.
+    Returns after 30s to prevent infinite waiting. Should be called after sending the
+    SIGUSR1 signal to the Impala process."""
+    MAX_WAIT_TIME_S = 30
+    start_time = time.time()
+    while time.time() < start_time + MAX_WAIT_TIME_S:
+      # Sleep for a bit to ensure that the process fork to write minidump has started.
+      # Otherwise the subsequent check on the process count could pass even when the
+      # fork didn't succeed. This sleep reduces the likelihood of such race.
+      time.sleep(1)
+      if self.get_num_child_proc(self.target_process_name) == 1:
+        break
+    return
+
+  def validate_args(self):
+    """Returns True if self.args are valid, false otherwise"""
+    if self.args.pid <= 0:
+      logging.critical("Invalid PID provided.")
+      return False
+
+    if self.target_process_name not in self.IMPALA_PROCESSES:
+      logging.critical("No valid Impala process with the given PID %s" % str(self.args.pid))
+      return False
+
+    if not self.java_home:
+      logging.critical("JAVA_HOME could not be inferred from process env.\
+          Please specify --java_home.")
+      return False
+
+    if self.args.jmap and not os.path.exists(self.jmap_cmd):
+      logging.critical("jmap binary not found, required to collect a Java heap dump.")
+      return False
+
+    if self.args.gcore and not os.path.exists(self.gcore_cmd):
+      logging.critical("gcore binary not found, required to collect a core dump.")
+      return False
+
+    if self.args.profiles_dir and not os.path.isdir(self.args.profiles_dir):
+      logging.critical("No valid profiles directory at path: %s" % self.args.profiles_dir)
+      return False
+
+    return True
+
+  def collect_thread_stacks(self):
+    """Collects jstack/jstack-m/pstack for the given pid in that order. pstack collection
+    falls back to minidumps if pstack binary is missing from the system path. Minidumps
+    are collected by sending a SIGUSR1 to the Impala process and then archiving the
+    contents of the minidump directory. The number of times stacks are collected and the
+    sleep time between the collections are controlled by --stacks argument."""
+    stacks_count, stacks_interval_secs = self.args.stacks
+    if stacks_count <= 0 or stacks_interval_secs < 0:
+      return
+
+    # Skip jstack collection if the jstack binary does not exist.
+    skip_jstacks = not os.path.exists(self.jstack_cmd)
+    if skip_jstacks:
+      logging.info("Skipping jstack collection since jstack binary couldn't be located.")
+
+    # Fallback to breakpad minidump collection if pstack binaries are missing.
+    fallback_to_minidump = False
+    if not self.pstack_cmd:
+      # Fall back to collecting a minidump if pstack is not installed.
+      if not os.path.exists(self.args.minidumps_dir):
+        logging.info("Skipping pstacks since pstack binary couldn't be located. Provide "
+            + "--minidumps_dir for collecting minidumps instead.")
+        # At this point, we can't proceed since we have nothing to collect.
+        if skip_jstacks:
+          return
+      else:
+        fallback_to_minidump = True;
+        logging.info("Collecting breakpad minidumps since pstack/gdb binaries are " +
+            "missing.")
+
+    stacks_dir = os.path.join(self.collection_root_dir, "stacks")
+    # Populate the commands to run in 'cmds_to_run' depending on what kinds of thread
+    # stacks to collect. Each entry is a tuple of form
+    # (Command, stdout_prefix, is_minidump). 'is_minidump' tells whether the command
+    # is trying to trigger a minidump collection.
+    cmds_to_run = []
+    if not skip_jstacks:
+      cmd_args = [self.jstack_cmd, str(self.args.pid)]
+      cmds_to_run.append((Command(cmd_args, self.args.timeout), "jstack", False))
+      # Collect mixed-mode jstack, contains native stack frames.
+      cmd_args_mixed_mode = [self.jstack_cmd, "-m", str(self.args.pid)]
+      cmds_to_run.append(
+          (Command(cmd_args_mixed_mode, self.args.timeout), "jstack-m", False))
+
+    if fallback_to_minidump:
+      cmd_args = ["kill", "-SIGUSR1", str(self.args.pid)]
+      cmds_to_run.append((Command(cmd_args, self.args.timeout), None, True))
+    elif self.pstack_cmd:
+      cmd_args = [self.pstack_cmd, str(self.args.pid)]
+      cmds_to_run.append((Command(cmd_args, self.args.timeout), "pstack", False))
+
+    collection_start_ts = time.time()
+    for i in xrange(stacks_count):
+      for cmd, file_prefix, is_minidump in cmds_to_run:
+        if file_prefix:
+          stdout_file = os.path.join(stacks_dir, file_prefix + "-" + str(i) + ".txt")
+          with open(stdout_file, "w") as output:
+            cmd.run(cmd_stdout=output)
+        else:
+          cmd.run()
+          # Incase of minidump collection, wait for it to be written.
+          if is_minidump:
+            self.wait_for_minidump()
+      time.sleep(stacks_interval_secs)
+
+    # Copy minidumps if required.
+    if fallback_to_minidump:
+      minidump_out_dir =  os.path.join(self.collection_root_dir, "minidumps")
+      self.copy_minidumps(minidump_out_dir, collection_start_ts);
+
+  def collect_minidumps(self):
+    """Collects minidumps on the Impala process based on argument --minidumps. The
+    minidumps are collected by sending a SIGUSR1 signal to the Impala process and then
+    the resulting minidumps are copied to the target directory."""
+    minidump_count, minidump_interval_secs = self.args.minidumps
+    if minidump_count <= 0 or minidump_interval_secs < 0:
+      return
+    # Impala process writes a minidump when it encounters a SIGUSR1.
+    cmd_args = ["kill", "-SIGUSR1", str(self.args.pid)]
+    cmd = Command(cmd_args, self.args.timeout)
+    collection_start_ts = time.time()
+    for i in xrange(minidump_count):
+      cmd.run()
+      self.wait_for_minidump()
+      time.sleep(minidump_interval_secs)
+    out_dir = os.path.join(self.collection_root_dir, "minidumps")
+    self.copy_minidumps(out_dir, collection_start_ts);
+
+  def copy_minidumps(self, target, start_ts):
+    """Copies mindumps with create time >= start_ts to 'target' directory."""
+    logging.info("Copying minidumps from %s to %s with ctime >= %s"
+        % (self.args.minidumps_dir, target, start_ts))
+    for filename in glob.glob(os.path.join(self.args.minidumps_dir, "*.dmp")):
+      try:
+        minidump_ctime = self.get_minidump_create_timestamp(filename)
+        if minidump_ctime >= math.floor(start_ts):
+          shutil.copy2(filename, target)
+        else:
+          logging.info("Ignored mindump: %s ctime: %s" % (filename, minidump_ctime))
+      except Exception:
+        logging.exception("Error processing minidump at path: %s. Skipping it." % filename)
+
+  def collect_java_heapdump(self):
+    """Generates the Java heap dump of the Impala process using the 'jmap' command."""
+    if not self.args.jmap:
+      return
+    jmap_dir = os.path.join(self.collection_root_dir, "jmaps")
+    out_file = os.path.join(jmap_dir, self.target_process_name + "_heap.bin")
+    # jmap command requires it to be run as the process owner.
+    # Command: jmap -dump:format=b,file=<outfile> <pid>
+    cmd_args = [self.jmap_cmd, "-dump:format=b,file=" + out_file, str(self.args.pid)]
+    Command(cmd_args, self.args.timeout).run()
+
+  def collect_native_coredump(self):
+    """Generates the core dump of the Impala process using the 'gcore' command"""
+    if not self.args.gcore:
+      return
+    # Command: gcore -o <outfile> <pid>
+    gcore_dir = os.path.join(self.collection_root_dir, "gcores")
+    out_file_name = self.target_process_name + "-" +\
+        datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + ".core"
+    out_file = os.path.join(gcore_dir, out_file_name)
+    cmd_args = [self.gcore_cmd, "-o", out_file, str(self.args.pid)]
+    Command(cmd_args, self.args.timeout).run()
+
+  def collect_query_profiles(self):
+    """Collects Impala query profiles from --profiles_dir. Enforces an uncompressed limit
+    of --profiles_max_size_limit bytes on the copied profile logs."""
+    if not self.args.profiles_dir:
+      return
+    out_dir = os.path.join(self.collection_root_dir, "profiles")
+    # Hardcoded in Impala
+    PROFILE_LOG_FILE_PATTERN = "impala_profile_log_1.1-*";
+    logging.info("Collecting profile data, limiting size to %f GB" %
+        (self.args.profiles_max_size_limit/(1024 * 1024 * 1024)))
+
+    profiles_path = os.path.join(self.args.profiles_dir, PROFILE_LOG_FILE_PATTERN)
+    # Sort the profiles by creation time and copy the most recent ones in that order.
+    sorted_profiles =\
+        sorted(glob.iglob(profiles_path), key=os.path.getctime, reverse=True)
+    profile_size_included_so_far = 0
+    for profile_path in sorted_profiles:
+      try:
+        file_size = os.path.getsize(profile_path)
+        if profile_size_included_so_far + file_size > self.args.profiles_max_size_limit:
+          # Copying the whole file violates profiles_max_size_limit. Copy a part of it.
+          # Profile logs are newline delimited with a single profile per line.
+          num_bytes_to_copy =\
+              self.args.profiles_max_size_limit - profile_size_included_so_far
+          file_name = os.path.basename(profile_path)
+          copied_bytes = 0
+          with open(profile_path, "rb") as in_file,\
+              open(os.path.join(out_dir, file_name), "wb") as out_file:
+            for line in in_file.readlines():
+              if copied_bytes + len(line) > num_bytes_to_copy:
+                break
+              out_file.write(line)
+              copied_bytes += len(line)
+          return
+        profile_size_included_so_far += file_size
+        shutil.copy2(profile_path, out_dir)
+      except:
+        logging.exception("Encountered an error while collecting profile %s. Skipping it."
+            % profile_path)
+
+  def collect_shared_libs(self):
+    """Collects shared libraries loaded by the target Impala process."""
+    # Shared libs are collected if either of core dump or minidumps are enabled.
+    if not (self.args.gcore or self.args.minidumps_dir):
+      return
+    out_dir = os.path.join(self.collection_root_dir, "shared_libs")
+
+    script_path = os.path.join(self.script_dir, "collect_shared_libs.sh")
+    cmd_args = [script_path, self.gdb_cmd, str(self.args.pid), out_dir]
+    Command(cmd_args, self.args.timeout).run()
+
+  def cleanup(self):
+    """Cleans up the directory to which diagnostics were written."""
+    shutil.rmtree(self.collection_root_dir, ignore_errors=True)
+
+  def get_diagnostics(self):
+    """Calls all collect_*() methods to collect diagnostics. Returns True if no errors
+    were encountered during diagnostics collection, False otherwise."""
+    if not self.validate_args():
+      return False
+    logging.info("Using JAVA_HOME: %s" % self.java_home)
+    self.create_output_dir_structure()
+    logging.info("Free disk space: %.2fGB" %
+        self.get_free_disk_space_gbs(self.collection_root_dir))
+    os.chdir(self.args.output_dir)
+    collection_methods = [self.collect_shared_libs, self.collect_query_profiles,
+        self.collect_native_coredump, self.collect_java_heapdump, self.collect_minidumps,
+        self.collect_thread_stacks]
+    exception_encountered = False
+    for method in collection_methods:
+      try:
+        method()
+      except IOError as e:
+        if e.errno == errno.ENOSPC:
+          # Clean up and abort if we are low on disk space. Other IOErrors are logged and
+          # ignored.
+          logging.exception("Disk space low, aborting.")
+          self.cleanup()
+          return False
+        logging.exception("Encountered an IOError calling: %s" % method.__name__)
+        exception_encountered = True
+      except Exception:
+        exception_encountered = True
+        logging.exception("Encountered an exception calling: %s" % method.__name__)
+    if exception_encountered:
+      logging.error("Encountered an exception collecting diagnostics. Final output " +
+          "could be partial.\n")
+    # Archive the directory, even if it is partial.
+    archive_path = self.collection_root_dir + ".tar.gz"
+    logging.info("Archiving diagnostics to path: %s" % archive_path)
+    shutil.make_archive(self.collection_root_dir, "gztar", self.collection_root_dir)
+    self.cleanup()
+    logging.info("Diagnostics collected at path: %s" % archive_path)
+    return not exception_encountered
+
+def get_args_parser():
+  """Creates the argument parser and adds the flags"""
+  parser = argparse.ArgumentParser(description="Impala diagnostics collection")
+  parser.add_argument("--pid", action="store", dest="pid", type=int, default=0,
+      help="PID of the Impala process for which diagnostics should be collected.")
+  parser.add_argument("--java_home", action="store", dest="java_home", default="",
+      help="If not set, it is set to the JAVA_HOME from the pid's environment.")
+  parser.add_argument("--timeout", action="store", dest="timeout", default=300,
+      type=int, help="Timeout (in seconds) for each of the diagnostics commands")
+  parser.add_argument("--stacks", action="store", dest="stacks", nargs=2, type=int,
+      default=[0, 0], metavar=("COUNT", "INTERVAL (in seconds)"),
+      help="Collect jstack, mixed-mode jstack and pstacks of the Impala process.\
+      Breakpad minidumps are collected in case of missing pstack binaries.")
+  parser.add_argument("--jmap", action="store_true", dest="jmap", default=False,
+      help="Collect heap dump of the Java process")
+  parser.add_argument("--gcore", action="store_true", dest="gcore", default=False,
+      help="Collect the native core dump using gdb. Requires gdb to be installed.")
+  parser.add_argument("--minidumps", action="store", dest="minidumps", type=int,
+      nargs=2, default=[0, 0], metavar=("COUNT", "INTERVAL (in seconds)"),
+      help="Collect breakpad minidumps for the Impala process. Requires --minidumps_dir\
+      be set.")
+  parser.add_argument("--minidumps_dir", action="store", dest="minidumps_dir", default="",
+      help="Path of the directory to which Impala process' minidumps are written")
+  parser.add_argument("--profiles_dir", action="store", dest="profiles_dir", default="",
+      help="Path of the profiles directory to be included in the diagnostics output.")
+  parser.add_argument("--profiles_max_size_limit", action="store",
+      dest="profiles_max_size_limit", default=3*1024*1024*1024,
+      type=float, help="Uncompressed limit (in Bytes) on profile logs collected from\
+      --profiles_dir. Defaults to 3GB.")
+  parser.add_argument("--output_dir", action="store", dest="output_dir",
+      default = tempfile.gettempdir(), help="Output directory that contains the final "
+      "diagnostics data. Defaults to %s" % tempfile.gettempdir())
+  return parser
+
+if __name__ == "__main__":
+  parser = get_args_parser()
+  if len(sys.argv) == 1:
+    parser.print_usage()
+    sys.exit(1)
+  logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, datefmt="%Y-%m-%d %H:%M:%S",
+      format="%(asctime)s %(levelname)-8s %(message)s")
+  diagnostics_handler = ImpalaDiagnosticsHandler(parser.parse_args())
+  logging.info("Running as user: %s" % getpass.getuser())
+  logging.info("Input args: %s" % " ".join(sys.argv))
+  sys.exit(0 if diagnostics_handler.get_diagnostics() else 1)

http://git-wip-us.apache.org/repos/asf/impala/blob/2883c995/bin/diagnostics/collect_shared_libs.sh
----------------------------------------------------------------------
diff --git a/bin/diagnostics/collect_shared_libs.sh b/bin/diagnostics/collect_shared_libs.sh
new file mode 100755
index 0000000..d5de349
--- /dev/null
+++ b/bin/diagnostics/collect_shared_libs.sh
@@ -0,0 +1,52 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# $1 - gdb binary path
+# $2 - pid of the Impala process
+# $3 - Output directory to copy the sharedlibs to.
+
+set -euxo pipefail
+
+if [ "$#" -ne 3 ]; then
+  echo "Incorrect usage. Expected: $0 <gdb executable path> <target PID> <output dir>"
+  exit 1
+fi
+
+if [ ! -d $3 ]; then
+  echo "Directory $3 does not exist. This script expects the output directory to exist."
+  exit 1
+fi
+
+# Generate the list of shared libs path to copy.
+shared_libs_to_copy=$(mktemp)
+$1 --pid $2 --batch -ex 'info shared' 2> /dev/null | sed '1,/Shared Object Library/d' |
+    sed 's/\(.*\s\)\(\/.*\)/\2/' | grep \/ > $shared_libs_to_copy
+
+echo "Generated shared library listing for the process."
+
+# Copy the files to the target directory keeping the directory structure intact.
+# We use rsync instead of 'cp --parents' since the latter has permission issues
+# copying from system level directories. https://goo.gl/6yYNhw
+rsync -LR --files-from=$shared_libs_to_copy / $3
+
+echo "Copied the shared libraries to the target directory: $3"
+
+rm -f $shared_libs_to_copy
+# Make sure the impala user has write permissions on all the copied sharedlib paths.
+chmod 755 -R $3

http://git-wip-us.apache.org/repos/asf/impala/blob/2883c995/bin/rat_exclude_files.txt
----------------------------------------------------------------------
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index 1819938..8c7977d 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -17,6 +17,7 @@ shell/__init__.py
 ssh_keys/id_rsa_impala
 testdata/__init__.py
 tests/__init__.py
+bin/diagnostics/__init__.py
 www/index.html
 
 # See $IMPALA_HOME/LICENSE.txt

http://git-wip-us.apache.org/repos/asf/impala/blob/2883c995/tests/unittests/test_command.py
----------------------------------------------------------------------
diff --git a/tests/unittests/test_command.py b/tests/unittests/test_command.py
new file mode 100644
index 0000000..a2a9e4c
--- /dev/null
+++ b/tests/unittests/test_command.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Unit tests for collect_diagnostics.Command
+
+import os
+import pytest
+import sys
+
+# Update the sys.path to include the modules from bin/diagnostics.
+sys.path.insert(0,
+    os.path.abspath(os.path.join(os.path.dirname(__file__), '../../bin/diagnostics')))
+from collect_diagnostics import Command
+
+class TestCommand(object):
+  """ Unit tests for the Command class"""
+
+  def test_simple_commands(self):
+    # Successful command
+    c = Command(["echo", "foo"], 1000)
+    assert c.run() == 0, "Command expected to succeed, but failed"
+    assert c.stdout.strip("\n") == "foo"
+
+    # Failed command, check return code
+    c = Command(["false"], 1000)
+    assert c.run() == 1
+
+  def test_command_timer(self):
+    # Try to run a command that sleeps for 1000s and set a
+    # timer for 1 second. The command should timed out.
+    c = Command(["sleep", "1000"], 1)
+    assert c.run() != 0, "Command expected to timeout but succeeded."
+    assert c.child_killed_by_timeout, "Command didn't timeout as expected."
+
+