You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2018/05/01 15:45:03 UTC

[2/4] impala git commit: IMPALA-6070: Further improvements to test-with-docker.

IMPALA-6070: Further improvements to test-with-docker.

This commit tackles a few additions and improvements to
test-with-docker. In general, I'm adding workloads (e.g., exhaustive,
rat-check), tuning memory setting and parallelism, and trying to speed
things up.

Bug fixes:

* Embarassingly, I was still skipping thrift-server-test in the backend
  tests. This was a mistake in handling feedback from my last review.

* I made the timeline a little bit taller to clip less.

Adding workloads:

* I added the RAT licensing check.

* I added exhaustive runs. This led me to model the suites a little
  bit more in Python, with a class representing a suite with a
  bunch of data about the suite. It's not perfect and still
  coupled with the entrypoint.sh shell script, but it feels
  workable. As part of adding exhaustive tests, I had
  to re-work the timeout handling, since now different
  suites meaningfully have different timeouts.

Speed ups:

* To speed up test runs, I added a mechanism to split py.test suites into
  multiple shards with a py.test argument. This involved a little bit of work in
  conftest.py, and exposing $RUN_CUSTOM_CLUSTER_TESTS_ARGS in run-all-tests.sh.

  Furthermore, I moved a bit more logic about managing the
  list of suites into Python.

* Doing the full build with "-notests" and only building
  the backend tests in the relevant target that needs them. This speeds
  up "docker commit" significantly by removing about 20GB from the
  container.  I had to indicates that expr-codegen-test depends on
  expr-codegen-test-ir, which was missing.

* I sped up copying the Kudu data: previously I did
  both a move and a copy; now I'm doing a move followed by a move. One
  of the moves is cross-filesystem so is slow, but this does half the
  amount of copying.

Memory usage:

* I tweaked the memlimit_gb settings to have a higher default. I've been
  fighting empirically to have the tests run well on c4.8xlarge and
  m4.10xlarge.

The more memory a minicluster and test suite run uses, the fewer parallel
suites we can run. By observing the peak processes at the tail of a run (with a
new "memory_usage" function that uses a ps/sort/awk trick) and by observing
peak container total_rss, I found that we had several JVMs that
didn't have Xmx settings set. I added Xms/Xmx settings in a few
places:

 * The non-first Impalad does very little JVM work, so having
   an Xmx keeps it small, even in the parallel tests.
 * Datanodes do work, but they essentially were never garbage
   collecting, because JVM defaults let them use up to 1/4th
   the machine memory. (I observed this based on RSS at the
   end of the run; nothing fancier.) Adding Xms/Xmx settings
   helped.
 * Similarly, I piped the settings through to HBase.

A few daemons still run without resource limitations, but they don't
seem to be a problem.

Change-Id: I43fe124f00340afa21ad1eeb6432d6d50151ca7c
Reviewed-on: http://gerrit.cloudera.org:8080/10123
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/10248
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d733ea68
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d733ea68
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d733ea68

Branch: refs/heads/2.x
Commit: d733ea68ca144798ff67054faf577dfdae0f201e
Parents: 1dc6dc3
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Fri Apr 6 10:16:39 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue May 1 02:46:38 2018 +0000

----------------------------------------------------------------------
 be/src/exprs/CMakeLists.txt                     |   1 +
 bin/run-all-tests.sh                            |   5 +-
 docker/entrypoint.sh                            | 173 +++++++----
 docker/monitor.py                               |  29 +-
 docker/test-with-docker.py                      | 296 ++++++++++++++-----
 docker/timeline.html.template                   |   9 +-
 testdata/bin/run-hbase.sh                       |   1 +
 .../common/etc/init.d/hdfs-common               |   7 +
 tests/conftest.py                               |  35 +++
 tests/run-tests.py                              |  19 +-
 10 files changed, 426 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/be/src/exprs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index cff391c..755c166 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -72,5 +72,6 @@ ADD_BE_TEST(expr-codegen-test)
 # expr-codegen-test includes test IR functions
 COMPILE_TO_IR(expr-codegen-test.cc)
 add_dependencies(expr-codegen-test-ir gen-deps)
+add_dependencies(expr-codegen-test expr-codegen-test-ir)
 
 ADD_UDF_TEST(aggregate-functions-test)

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/bin/run-all-tests.sh
----------------------------------------------------------------------
diff --git a/bin/run-all-tests.sh b/bin/run-all-tests.sh
index 4743a4f..7702134 100755
--- a/bin/run-all-tests.sh
+++ b/bin/run-all-tests.sh
@@ -57,6 +57,8 @@ fi
 : ${TEST_START_CLUSTER_ARGS:=}
 # Extra args to pass to run-tests.py
 : ${RUN_TESTS_ARGS:=}
+# Extra args to pass to run-custom-cluster-tests.sh
+: ${RUN_CUSTOM_CLUSTER_TESTS_ARGS:=}
 if [[ "${TARGET_FILESYSTEM}" == "local" ]]; then
   # TODO: Remove abort_on_config_error flag from here and create-load-data.sh once
   # checkConfiguration() accepts the local filesystem (see IMPALA-1850).
@@ -223,7 +225,8 @@ do
     # Run the custom-cluster tests after all other tests, since they will restart the
     # cluster repeatedly and lose state.
     # TODO: Consider moving in to run-tests.py.
-    if ! "${IMPALA_HOME}/tests/run-custom-cluster-tests.sh" ${COMMON_PYTEST_ARGS}; then
+    if ! "${IMPALA_HOME}/tests/run-custom-cluster-tests.sh" ${COMMON_PYTEST_ARGS} \
+        ${RUN_CUSTOM_CLUSTER_TESTS_ARGS}; then
       TEST_RET_CODE=1
     fi
     export IMPALA_MAX_LOG_FILES="${IMPALA_MAX_LOG_FILES_SAVE}"

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/docker/entrypoint.sh
----------------------------------------------------------------------
diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh
index d371d25..00b2ee6 100755
--- a/docker/entrypoint.sh
+++ b/docker/entrypoint.sh
@@ -90,12 +90,6 @@ function impala_environment() {
 function boot_container() {
   pushd /home/impdev/Impala
 
-  # Required for metastore
-  sudo service postgresql start
-
-  # Required for starting HBase
-  sudo service ssh start
-
   # Make log directories. This is typically done in buildall.sh.
   mkdir -p logs/be_tests logs/fe_tests/coverage logs/ee_tests logs/custom_cluster_tests
 
@@ -112,17 +106,37 @@ function boot_container() {
   echo Hosts file:
   cat /etc/hosts
 
-  # Make a copy of Kudu's WALs to avoid isue with Docker filesystems (aufs and
+  popd
+}
+
+function start_minicluster {
+  # The subshell here avoids the verbose output from set -x.
+  (echo ">>> Starting PostgreSQL and SSH") 2> /dev/null
+  pushd /home/impdev/Impala
+
+  # Required for metastore
+  sudo service postgresql start
+
+  # Required for starting HBase
+  sudo service ssh start
+
+  (echo ">>> Copying Kudu Data") 2> /dev/null
+  # Move around Kudu's WALs to avoid issue with Docker filesystems (aufs and
   # overlayfs) that don't support os.rename(2) on directories, which Kudu
   # requires. We make a fresh copy of the data, in which case rename(2) works
   # presumably because there's only one layer involved. See
   # https://issues.apache.org/jira/browse/KUDU-1419.
-  cd /home/impdev/Impala/testdata
+  set -x
+  pushd /home/impdev/Impala/testdata
   for x in cluster/cdh*/node-*/var/lib/kudu/*/wal; do
+    echo $x
+    # This mv takes time, as it's actually copying into the latest layer.
     mv $x $x-orig
-    cp -r $x-orig $x
-    rm -r $x-orig
+    mkdir $x
+    mv $x-orig/* $x
+    rmdir $x-orig
   done
+  popd
 
   # Wait for postgresql to really start; if it doesn't, Hive Metastore will fail to start.
   for i in {1..120}; do
@@ -135,6 +149,9 @@ function boot_container() {
   done
   sudo -u postgres psql -c "select 1"
 
+  (echo ">>> Starting mini cluster") 2> /dev/null
+  testdata/bin/run-all.sh
+
   popd
 }
 
@@ -164,8 +181,13 @@ function build_impdev() {
 
   # Builds Impala and loads test data.
   # Note that IMPALA-6494 prevents us from using shared library builds,
-  # which are smaller and thereby speed things up.
-  ./buildall.sh -noclean -format -testdata -skiptests
+  # which are smaller and thereby speed things up. We use "-notests"
+  # to avoid building backend tests, which are sizable, and
+  # can be built when executing those tests.
+  ./buildall.sh -noclean -format -testdata -notests
+
+  # Dump current memory usage to logs, before shutting things down.
+  memory_usage
 
   # Shut down things cleanly.
   testdata/bin/kill-all.sh
@@ -176,9 +198,34 @@ function build_impdev() {
   # Clean up things we don't need to reduce image size
   find be -name '*.o' -execdir rm '{}' + # ~1.6GB
 
+  # Clean up dangling symlinks. These (typically "cluster/cdh*-node-*")
+  # may point to something inside a container that no longer exists
+  # and can confuse Jenkins.
+  find /logs -xtype l -execdir rm '{}' ';'
+
   popd
 }
 
+# Prints top 20 RSS consumers (and other, total), in megabytes Common culprits
+# are Java processes without Xmx set. Since most things don't reclaim memory,
+# this is a decent proxy for peak memory usage by long-lived processes.
+function memory_usage() {
+  (
+  echo "Top 20 memory consumers (RSS in MBs)"
+  sudo ps -axho rss,args | \
+    sed -e 's/^ *//' | \
+    sed -e 's, ,\t,' | \
+    sort -nr | \
+    awk -F'\t' '
+    FNR < 20 { print $1/1024.0, $2; total += $1/1024.0 }
+    FNR >= 20 { other+= $1/1024.0; total += $1/1024.0 }
+    END {
+      if (other) { print other, "-- other --" };
+      print total, "-- total --"
+    }'
+  ) >& /logs/memory_usage.txt
+}
+
 # Runs a suite passed in as the first argument. Tightly
 # coupled with Impala's run-all-tests and the suite names.
 # from test-with-docker.py.
@@ -189,6 +236,8 @@ function test_suite() {
 
   # These test suites are for testing.
   if [[ $1 == NOOP ]]; then
+    # Sleep busily for 10 seconds.
+    bash -c 'while [[ $SECONDS -lt 10 ]]; do :; done'
     return 0
   fi
   if [[ $1 == NOOP_FAIL ]]; then
@@ -208,31 +257,9 @@ function test_suite() {
   boot_container
   impala_environment
 
-  # By default, the JVM will use 1/4 of your OS memory for its heap size. For a
-  # long-running test, this will delay GC inside of impalad's leading to
-  # unnecessarily large process RSS footprints. We cap the heap size at
-  # a more reasonable size.  Note that "test_insert_large_string" fails
-  # at 2g and 3g, so the suite that includes it (EE_TEST_PARALLEL) gets
-  # additional memory.
-  #
-  # Similarly, bin/start-impala-cluster typically configures the memlimit
-  # to be 80% of the machine memory, divided by the number of daemons.
-  # If multiple containers are to be run simultaneously, this is scaled
-  # down in test-with-docker.py (and further configurable with --impalad-mem-limit-bytes)
-  # and passed in via $IMPALAD_MEM_LIMIT_BYTES to the container. There is a
-  # relationship between the number of parallel tests that can be run by py.test and this
-  # limit.
-  JVM_HEAP_GB=2
-  if [[ $1 = EE_TEST_PARALLEL ]]; then
-    JVM_HEAP_GB=4
-  fi
-  export TEST_START_CLUSTER_ARGS="--jvm_args=-Xmx${JVM_HEAP_GB}g \
-    --impalad_args=--mem_limit=$IMPALAD_MEM_LIMIT_BYTES"
-
   # BE tests don't require the minicluster, so we can run them directly.
   if [[ $1 = BE_TEST ]]; then
-    # IMPALA-6494: thrift-server-test fails in Ubuntu16.04 for the moment; skip it.
-    export SKIP_BE_TEST_PATTERN='thrift-server-test*'
+    make -j$(nproc) --load-average=$(nproc) be-test be-benchmarks
     if ! bin/run-backend-tests.sh; then
       echo "Tests $1 failed!"
       return 1
@@ -242,37 +269,73 @@ function test_suite() {
     fi
   fi
 
+  if [[ $1 == RAT_CHECK ]]; then
+    # Runs Apache RAT (a license checker)
+    git archive --prefix=rat/ -o rat-impala.zip HEAD
+    wget --quiet https://archive.apache.org/dist/creadur/apache-rat-0.12/apache-rat-0.12-bin.tar.gz
+    tar xzf apache-rat-0.12-bin.tar.gz
+    java -jar apache-rat-0.12/apache-rat-0.12.jar -x rat-impala.zip > logs/rat.xml
+    bin/check-rat-report.py bin/rat_exclude_files.txt logs/rat.xml
+    return $?
+  fi
+
   # Start the minicluster
-  testdata/bin/run-all.sh
+  start_minicluster
 
-  export MAX_PYTEST_FAILURES=0
-  # Choose which suite to run; this is how run-all.sh chooses between them.
-  export FE_TEST=false
-  export BE_TEST=false
-  export EE_TEST=false
-  export JDBC_TEST=false
-  export CLUSTER_TEST=false
-
-  eval "export ${1}=true"
-
-  if [[ ${1} = "EE_TEST_SERIAL" ]]; then
-    # We bucket the stress tests with the parallel tests.
-    export RUN_TESTS_ARGS="--skip-parallel --skip-stress"
-    export EE_TEST=true
-  elif [[ ${1} = "EE_TEST_PARALLEL" ]]; then
-    export RUN_TESTS_ARGS="--skip-serial"
-    export EE_TEST=true
+  # By default, the JVM will use 1/4 of your OS memory for its heap size. For a
+  # long-running test, this will delay GC inside of impalad's leading to
+  # unnecessarily large process RSS footprints. To combat this, we
+  # set a small initial heap size, and then cap it at a more reasonable
+  # size. The small initial heap sizes help for daemons that do little
+  # in the way of JVM work (e.g., the 2nd and 3rd impalad's).
+  # Note that "test_insert_large_string" fails at 2g and 3g, so the suite that
+  # includes it (EE_TEST_PARALLEL) gets additional memory.
+
+  # Note that we avoid using TEST_START_CLUSTER_ARGS="--jvm-args=..."
+  # because it gets flattened along the way if we need to provide
+  # more than one Java argument. We use JAVA_TOOL_OPTIONS instead.
+  JVM_HEAP_MAX_GB=2
+  if [[ $1 = EE_TEST_PARALLEL ]]; then
+    JVM_HEAP_MAX_GB=4
+  elif [[ $1 = EE_TEST_PARALLEL_EXHAUSTIVE ]]; then
+    JVM_HEAP_MAX_GB=8
   fi
+  JAVA_TOOL_OPTIONS="-Xms512M -Xmx${JVM_HEAP_MAX_GB}G"
+
+  # Similarly, bin/start-impala-cluster typically configures the memlimit
+  # to be 80% of the machine memory, divided by the number of daemons.
+  # If multiple containers are to be run simultaneously, this is scaled
+  # down in test-with-docker.py (and further configurable with --impalad-mem-limit-bytes)
+  # and passed in via $IMPALAD_MEM_LIMIT_BYTES to the container. There is a
+  # relationship between the number of parallel tests that can be run by py.test and this
+  # limit.
+  export TEST_START_CLUSTER_ARGS="--impalad_args=--mem_limit=$IMPALAD_MEM_LIMIT_BYTES"
+
+  export MAX_PYTEST_FAILURES=0
+
+  # Asserting that these should are all set (to either true or false as strings).
+  # This is how run-all.sh chooses between them.
+  [[ $FE_TEST && $BE_TEST && $EE_TEST && $JDBC_TEST && $CLUSTER_TEST ]]
 
   ret=0
 
+  if [[ ${EE_TEST} = true ]]; then
+    # test_insert_parquet.py depends on this binary
+    make -j$(nproc) --load-average=$(nproc) parquet-reader
+  fi
+
   # Run tests.
-  if ! time -p bin/run-all-tests.sh; then
+  (echo ">>> $1: Starting run-all-test") 2> /dev/null
+  if ! time -p bash -x bin/run-all-tests.sh; then
     ret=1
     echo "Tests $1 failed!"
   else
     echo "Tests $1 succeeded!"
   fi
+
+  # Save memory usage after tests have run but before shutting down the cluster.
+  memory_usage || true
+
   # Oddly, I've observed bash fail to exit (and wind down the container),
   # leading to test-with-docker.py hitting a timeout. Killing the minicluster
   # daemons fixes this.
@@ -312,6 +375,8 @@ function main() {
   shift
 
   echo ">>> ${CMD} $@ (begin)"
+  # Dump environment, for debugging
+  env | grep -vE "AWS_(SECRET_)?ACCESS_KEY"
   set -x
   if "${CMD}" "$@"; then
     ret=0

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/docker/monitor.py
----------------------------------------------------------------------
diff --git a/docker/monitor.py b/docker/monitor.py
index 64cde0c..58e2a83 100644
--- a/docker/monitor.py
+++ b/docker/monitor.py
@@ -232,28 +232,37 @@ class Timeline(object):
         if self.interesting_re.search(line)]
     return [(container.name,) + split_timestamp(line) for line in interesting_lines]
 
-  @staticmethod
-  def parse_metrics(f):
+  def parse_metrics(self, f):
     """Parses timestamped metric lines.
 
     Given metrics lines like:
 
     2017-10-25 10:08:30.961510 87d5562a5fe0ea075ebb2efb0300d10d23bfa474645bb464d222976ed872df2a cpu user 33 system 15
 
-    Returns an iterable of (ts, container, user_cpu, system_cpu)
+    Returns an iterable of (ts, container, user_cpu, system_cpu). It also updates
+    container.peak_total_rss and container.total_user_cpu and container.total_system_cpu.
     """
     prev_by_container = {}
+    peak_rss_by_container = {}
     for line in f:
       ts, rest = split_timestamp(line.rstrip())
+      total_rss = None
       try:
         container, metric_type, rest2 = rest.split(" ", 2)
-        if metric_type != "cpu":
-          continue
-        _, user_cpu_s, _, system_cpu_s = rest2.split(" ", 3)
+        if metric_type == "cpu":
+          _, user_cpu_s, _, system_cpu_s = rest2.split(" ", 3)
+        elif metric_type == "memory":
+          memory_metrics = rest2.split(" ")
+          total_rss = int(memory_metrics[memory_metrics.index("total_rss") + 1 ])
       except:
         logging.warning("Skipping metric line: %s", line)
         continue
 
+      if total_rss is not None:
+        peak_rss_by_container[container] = max(peak_rss_by_container.get(container, 0),
+            total_rss)
+        continue
+
       prev_ts, prev_user, prev_system = prev_by_container.get(
           container, (None, None, None))
       user_cpu = int(user_cpu_s)
@@ -267,6 +276,14 @@ class Timeline(object):
               (system_cpu - prev_system)/dt/USER_HZ
       prev_by_container[container] = ts, user_cpu, system_cpu
 
+    # Now update container totals
+    for c in self.containers:
+      if c.id in prev_by_container:
+        _, u, s = prev_by_container[c.id]
+        c.total_user_cpu, c.total_system_cpu = u / USER_HZ, s / USER_HZ
+      if c.id in peak_rss_by_container:
+        c.peak_total_rss = peak_rss_by_container[c.id]
+
   def create(self, output):
     # Read logfiles
     timelines = []

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/docker/test-with-docker.py
----------------------------------------------------------------------
diff --git a/docker/test-with-docker.py b/docker/test-with-docker.py
index 59640b4..c3f4427 100755
--- a/docker/test-with-docker.py
+++ b/docker/test-with-docker.py
@@ -95,14 +95,10 @@ as part of the build, in logs/docker/*/timeline.html.
 # Suggested speed improvement TODOs:
 #   - Speed up testdata generation
 #   - Skip generating test data for variants not being run
-#   - Make container image smaller; perhaps make BE test binaries
-#     smaller
-#   - Split up cluster tests into two groups
+#   - Make container image smaller
 #   - Analyze .xml junit files to find slow tests; eradicate
 #     or move to different suite.
-#   - Avoid building BE tests, and build them during execution,
-#     saving on container space as well as baseline build
-#     time.
+#   - Run BE tests earlier (during data load)
 
 # We do not use Impala's python environment here, nor do we depend on
 # non-standard python libraries to avoid needing extra build steps before
@@ -125,11 +121,11 @@ if __name__ == '__main__' and __package__ is None:
 
 base = os.path.dirname(os.path.abspath(__file__))
 
+LOG_FORMAT="%(asctime)s %(threadName)s: %(message)s"
 
-def main():
 
-  logging.basicConfig(level=logging.INFO,
-                      format='%(asctime)s %(threadName)s: %(message)s')
+def main():
+  logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
 
   default_parallel_test_concurrency, default_suite_concurrency, default_memlimit_gb = \
       _compute_defaults()
@@ -160,20 +156,25 @@ def main():
   parser.add_argument(
       '--build-image', metavar='IMAGE',
       help='Skip building, and run tests on pre-existing image.')
-  parser.add_argument(
+
+  suite_group = parser.add_mutually_exclusive_group()
+  suite_group.add_argument(
       '--suite', metavar='VARIANT', action='append',
-      help="Run specific test suites; can be specified multiple times. \
-          If not specified, all tests are run. Choices: " + ",".join(ALL_SUITES))
+      help="""
+        Run specific test suites; can be specified multiple times.
+        Test-with-docker may shard some suites to improve parallelism.
+        If not specified, default tests are run.
+        Default: %s, All Choices: %s
+        """ % (",".join([ s.name for s in DEFAULT_SUITES]),
+          ",".join([ s.name for s in ALL_SUITES ])))
+  suite_group.add_argument('--all-suites', action='store_true', default=False,
+      help="If set, run all available suites.")
   parser.add_argument(
       '--name', metavar='NAME',
       help="Use a specific name for the test run. The name is used " +
       "as a prefix for the container and image names, and " +
       "as part of the log directory naming. Defaults to include a timestamp.",
       default=datetime.datetime.now().strftime("i-%Y%m%d-%H%M%S"))
-  parser.add_argument('--timeout', metavar='MINUTES',
-                      help="Timeout for test suites, in minutes.",
-                      type=int,
-                      default=60*2)
   parser.add_argument('--ccache-dir', metavar='DIR',
                       help="CCache directory to use",
                       default=os.path.expanduser("~/.ccache"))
@@ -181,22 +182,28 @@ def main():
   args = parser.parse_args()
 
   if not args.suite:
-    args.suite = ALL_SUITES
+    if args.all_suites:
+      # Ignore "NOOP" tasks, as they are just for testing.
+      args.suite = [ s.name for s in ALL_SUITES if not s.name.startswith("NOOP") ]
+    else:
+      args.suite = [ s.name for s in DEFAULT_SUITES ]
   t = TestWithDocker(
-      build_image=args.build_image, suites=args.suite,
-      name=args.name, timeout=args.timeout, cleanup_containers=args.cleanup_containers,
+      build_image=args.build_image, suite_names=args.suite,
+      name=args.name, cleanup_containers=args.cleanup_containers,
       cleanup_image=args.cleanup_image, ccache_dir=args.ccache_dir, test_mode=args.test,
       parallel_test_concurrency=args.parallel_test_concurrency,
       suite_concurrency=args.suite_concurrency,
       impalad_mem_limit_bytes=args.impalad_mem_limit_bytes)
 
-  logging.getLogger('').addHandler(
-      logging.FileHandler(os.path.join(_make_dir_if_not_exist(t.log_dir), "log.txt")))
+  fh = logging.FileHandler(os.path.join(_make_dir_if_not_exist(t.log_dir), "log.txt"))
+  fh.setFormatter(logging.Formatter(LOG_FORMAT))
+  logging.getLogger('').addHandler(fh)
 
   logging.info("Arguments: %s", args)
 
   ret = t.run()
   t.create_timeline()
+  t.log_summary()
 
   if not ret:
     sys.exit(1)
@@ -229,10 +236,11 @@ def _compute_defaults():
   logging.info("CPUs: %s Memory (GB): %s", cpus, total_memory_gb)
 
   parallel_test_concurrency = min(cpus, 8)
-  memlimit_gb = 7
+  memlimit_gb = 8
 
   if total_memory_gb >= 95:
     suite_concurrency = 4
+    memlimit_gb = 11
     parallel_test_concurrency = min(cpus, 12)
   elif total_memory_gb >= 65:
     suite_concurrency = 3
@@ -244,19 +252,100 @@ def _compute_defaults():
 
   return parallel_test_concurrency, suite_concurrency, memlimit_gb * 1024 * 1024 * 1024
 
+class Suite(object):
+  """Encapsulates a test suite.
+
+  A test suite is a named thing that the user can select to run,
+  and it runs in its own container, in parallel with other suites.
+  The actual running happens from entrypoint.sh and is controlled
+  mostly by environment variables. When complexity is easier
+  to handle in Python (with its richer data types), we prefer
+  it here.
+  """
+  def __init__(self, name, **envs):
+    """Create suite with given name and environment."""
+    self.name = name
+    self.envs = dict(
+        FE_TEST="false",
+        BE_TEST="false",
+        EE_TEST="false",
+        JDBC_TEST="false",
+        CLUSTER_TEST="false")
+    # If set, this suite is sharded past a certain suite concurrency threshold.
+    self.shard_at_concurrency = None
+    # Variable to which to append --shard_tests
+    self.sharding_variable = None
+    self.envs[name] = "true"
+    self.envs.update(envs)
+    self.timeout_minutes = 120
+
+  def copy(self, name, **envs):
+    """Duplicates current suite allowing for environment updates."""
+    v = dict()
+    v.update(self.envs)
+    v.update(envs)
+    ret = Suite(name, **v)
+    ret.shard_at_concurrency = self.shard_at_concurrency
+    ret.sharding_variable = self.sharding_variable
+    ret.timeout_minutes = self.timeout_minutes
+    return ret
+
+  def exhaustive(self):
+    """Returns an "exhaustive" copy of the suite."""
+    r = self.copy(self.name + "_EXHAUSTIVE", EXPLORATION_STRATEGY="exhaustive")
+    r.timeout_minutes = 240
+    return r
+
+  def sharded(self, shards):
+    """Returns a list of sharded copies of the list.
+
+    key is the name of the variable which needs to be appended with "--shard-tests=..."
+    """
+    # RUN_TESTS_ARGS
+    ret = []
+    for i in range(1, shards + 1):
+      s = self.copy("%s_%d_of_%d" % (self.name, i, shards))
+      s.envs[self.sharding_variable] = self.envs.get(self.sharding_variable, "") \
+          + " --shard_tests=%s/%s" % (i, shards)
+      ret.append(s)
+    return ret
 
-# The names of all the test tracks supported.  NOOP isn't included here, but is
-# handy for testing.  These are organized slowest-to-fastest, so that, when
-# parallelism of suites is limited, the total time is not impacted.
-ALL_SUITES = [
-    "EE_TEST_SERIAL",
-    "EE_TEST_PARALLEL",
-    "CLUSTER_TEST",
-    "BE_TEST",
-    "FE_TEST",
-    "JDBC_TEST",
+# Definitions of all known suites:
+ee_test_serial = Suite("EE_TEST_SERIAL", EE_TEST="true",
+    RUN_TESTS_ARGS="--skip-parallel --skip-stress")
+ee_test_serial.shard_at_concurrency = 4
+ee_test_serial.sharding_variable = "RUN_TESTS_ARGS"
+ee_test_serial_exhaustive = ee_test_serial.exhaustive()
+ee_test_parallel = Suite("EE_TEST_PARALLEL", EE_TEST="true",
+    RUN_TESTS_ARGS="--skip-serial")
+ee_test_parallel_exhaustive = ee_test_parallel.exhaustive()
+cluster_test = Suite("CLUSTER_TEST")
+cluster_test.shard_at_concurrency = 4
+cluster_test.sharding_variable = "RUN_CUSTOM_CLUSTER_TESTS_ARGS"
+cluster_test_exhaustive = cluster_test.exhaustive()
+
+# Default supported suites. These are organized slowest-to-fastest, so that,
+# when parallelism is limited, the total time is least impacted.
+DEFAULT_SUITES = [
+    ee_test_serial,
+    ee_test_parallel,
+    cluster_test,
+    Suite("BE_TEST"),
+    Suite("FE_TEST"),
+    Suite("JDBC_TEST")
 ]
 
+OTHER_SUITES = [
+    ee_test_parallel_exhaustive,
+    ee_test_serial_exhaustive,
+    cluster_test_exhaustive,
+    Suite("RAT_CHECK"),
+    # These are used for testing this script
+    Suite("NOOP"),
+    Suite("NOOP_FAIL"),
+    Suite("NOOP_SLEEP_FOREVER")
+]
+ALL_SUITES = DEFAULT_SUITES + OTHER_SUITES
 
 def _call(args, check=True):
   """Wrapper for calling a subprocess.
@@ -297,6 +386,12 @@ class Container(object):
     self.running = running
     self.start = None
     self.end = None
+    self.removed = False
+
+    # Updated by Timeline class
+    self.total_user_cpu = -1
+    self.total_system_cpu = -1
+    self.peak_total_rss = -1
 
   def runtime_seconds(self):
     if self.start and self.end:
@@ -311,15 +406,13 @@ class Container(object):
 class TestWithDocker(object):
   """Tests Impala using Docker containers for parallelism."""
 
-  def __init__(self, build_image, suites, name, timeout, cleanup_containers,
+  def __init__(self, build_image, suite_names, name, cleanup_containers,
                cleanup_image, ccache_dir, test_mode,
                suite_concurrency, parallel_test_concurrency,
                impalad_mem_limit_bytes):
     self.build_image = build_image
-    self.suites = [TestSuiteRunner(self, suite) for suite in suites]
     self.name = name
     self.containers = []
-    self.timeout_minutes = timeout
     self.git_root = _check_output(["git", "rev-parse", "--show-toplevel"]).strip()
     self.cleanup_containers = cleanup_containers
     self.cleanup_image = cleanup_image
@@ -336,6 +429,26 @@ class TestWithDocker(object):
     self.parallel_test_concurrency = parallel_test_concurrency
     self.impalad_mem_limit_bytes = impalad_mem_limit_bytes
 
+    # Map suites back into objects; we ignore case for this mapping.
+    suites = []
+    suites_by_name = {}
+    for suite in ALL_SUITES:
+      suites_by_name[suite.name.lower()] = suite
+    for suite_name in suite_names:
+      suites.append(suites_by_name[suite_name.lower()])
+
+    # If we have enough concurrency, shard some suites into two halves.
+    suites2 = []
+    for suite in suites:
+      if suite.shard_at_concurrency is not None and \
+          suite_concurrency >= suite.shard_at_concurrency:
+        suites2.extend(suite.sharded(2))
+      else:
+        suites2.append(suite)
+    suites = suites2
+
+    self.suite_runners = [TestSuiteRunner(self, suite) for suite in suites]
+
   def _create_container(self, image, name, logdir, logname, entrypoint, extras=None):
     """Returns a new container.
 
@@ -374,8 +487,10 @@ class TestWithDocker(object):
         + extras
         + [image]
         + entrypoint).strip()
-    return Container(name=name, id_=container_id,
+    ctr = Container(name=name, id_=container_id,
                      logfile=os.path.join(self.log_dir, logdir, logname))
+    logging.info("Created container %s", ctr)
+    return ctr
 
   def _run_container(self, container):
     """Runs container, and returns True if the container had a successful exit value.
@@ -413,15 +528,17 @@ class TestWithDocker(object):
   @staticmethod
   def _stop_container(container):
     """Stops container. Ignores errors (e.g., if it's already exited)."""
-    _call(["docker", "stop", container.id], check=False)
     if container.running:
+      _call(["docker", "stop", container.id], check=False)
       container.end = time.time()
       container.running = False
 
   @staticmethod
   def _rm_container(container):
     """Removes container."""
-    _call(["docker", "rm", container.id], check=False)
+    if not container.removed:
+      _call(["docker", "rm", container.id], check=False)
+      container.removed = True
 
   def _create_build_image(self):
     """Creates the "build image", with Impala compiled and data loaded."""
@@ -451,36 +568,36 @@ class TestWithDocker(object):
         self._rm_container(container)
 
   def _run_tests(self):
-    start_time = time.time()
-    timeout_seconds = self.timeout_minutes * 60
-    deadline = start_time + timeout_seconds
     pool = multiprocessing.pool.ThreadPool(processes=self.suite_concurrency)
     outstanding_suites = []
-    for suite in self.suites:
+    for suite in self.suite_runners:
       suite.task = pool.apply_async(suite.run)
       outstanding_suites.append(suite)
 
     ret = True
-    while time.time() < deadline and len(outstanding_suites) > 0:
-      for suite in list(outstanding_suites):
-        task = suite.task
-        if task.ready():
-          this_task_ret = task.get()
-          outstanding_suites.remove(suite)
-          if this_task_ret:
-            logging.info("Suite %s succeeded.", suite.name)
-          else:
-            logging.info("Suite %s failed.", suite.name)
-            ret = False
-      time.sleep(10)
-    if len(outstanding_suites) > 0:
-      for container in self.containers:
-        self._stop_container(container)
-      for suite in outstanding_suites:
-        suite.task.get()
-      raise Exception("Tasks not finished within timeout (%s minutes): %s" %
-                      (self.timeout_minutes, ",".join([
-                          suite.name for suite in outstanding_suites])))
+    try:
+      while len(outstanding_suites) > 0:
+        for suite in list(outstanding_suites):
+          if suite.timed_out():
+            msg = "Task %s not finished within timeout %s" % (suite.name,
+                suite.suite.timeout_minutes,)
+            logging.error(msg)
+            raise Exception(msg)
+          task = suite.task
+          if task.ready():
+            this_task_ret = task.get()
+            outstanding_suites.remove(suite)
+            if this_task_ret:
+              logging.info("Suite %s succeeded.", suite.name)
+            else:
+              logging.info("Suite %s failed.", suite.name)
+              ret = False
+        time.sleep(5)
+    except KeyboardInterrupt:
+      logging.info("\n\nDetected KeyboardInterrupt; shutting down!\n\n")
+      raise
+    finally:
+      pool.terminate()
     return ret
 
   def run(self):
@@ -495,17 +612,13 @@ class TestWithDocker(object):
       else:
         self.image = self.build_image
       ret = self._run_tests()
-      logging.info("Containers:")
-      for c in self.containers:
-        def to_success_string(exitcode):
-          if exitcode == 0:
-            return "SUCCESS"
-          return "FAILURE"
-        logging.info("%s %s %s %s", to_success_string(c.exitcode), c.name, c.logfile,
-                     c.runtime_seconds())
       return ret
     finally:
       self.monitor.stop()
+      if self.cleanup_containers:
+        for c in self.containers:
+          self._stop_container(c)
+          self._rm_container(c)
       if self.cleanup_image and self.image:
         _call(["docker", "rmi", self.image], check=False)
       logging.info("Memory usage: %s GB min, %s GB max",
@@ -526,6 +639,23 @@ class TestWithDocker(object):
         interesting_re=self._INTERESTING_RE)
     timeline.create(os.path.join(self.log_dir, "timeline.html"))
 
+  def log_summary(self):
+    logging.info("Containers:")
+    def to_success_string(exitcode):
+      if exitcode == 0:
+        return "SUCCESS"
+      return "FAILURE"
+
+    for c in self.containers:
+      logging.info("%s %s %s %0.1fm wall, %0.1fm user, %0.1fm system, " +
+            "%0.1fx parallelism, %0.1f GB peak RSS",
+          to_success_string(c.exitcode), c.name, c.logfile,
+          c.runtime_seconds() / 60.0,
+          c.total_user_cpu / 60.0,
+          c.total_system_cpu / 60.0,
+          (c.total_user_cpu + c.total_system_cpu) / max(c.runtime_seconds(), 0.0001),
+          c.peak_total_rss / 1024.0 / 1024.0 / 1024.0)
+
 
 class TestSuiteRunner(object):
   """Runs a single test suite."""
@@ -534,12 +664,24 @@ class TestSuiteRunner(object):
     self.test_with_docker = test_with_docker
     self.suite = suite
     self.task = None
-    self.name = self.suite.lower()
+    self.name = suite.name.lower()
+    # Set at the beginning of run and facilitates enforcing timeouts
+    # for individual suites.
+    self.deadline = None
+
+  def timed_out(self):
+    return self.deadline is not None and time.time() > self.deadline
 
   def run(self):
     """Runs given test. Returns true on success, based on exit code."""
+    self.deadline = time.time() + self.suite.timeout_minutes * 60
     test_with_docker = self.test_with_docker
     suite = self.suite
+    envs = ["-e", "NUM_CONCURRENT_TESTS=" + str(test_with_docker.parallel_test_concurrency)]
+    for k, v in sorted(suite.envs.iteritems()):
+      envs.append("-e")
+      envs.append("%s=%s" % (k, v))
+
     self.start = time.time()
 
     # io-file-mgr-test expects a real-ish file system at /tmp;
@@ -554,13 +696,11 @@ class TestSuiteRunner(object):
         name=container_name,
         extras=[
             "-v", tmpdir + ":/tmp",
-            "-u", str(os.getuid()),
-            "-e", "NUM_CONCURRENT_TESTS=" +
-            str(test_with_docker.parallel_test_concurrency),
-        ],
+            "-u", str(os.getuid())
+        ] + envs,
         logdir=self.name,
-        logname="log-test-" + self.suite + ".txt",
-        entrypoint=["/mnt/base/entrypoint.sh", "test_suite", suite])
+        logname="log-test-" + self.suite.name + ".txt",
+        entrypoint=["/mnt/base/entrypoint.sh", "test_suite", suite.name])
 
     test_with_docker.containers.append(container)
     test_with_docker.monitor.add(container)
@@ -569,7 +709,7 @@ class TestSuiteRunner(object):
     except:
       return False
     finally:
-      logging.info("Cleaning up containers for %s" % (suite,))
+      logging.info("Cleaning up containers for %s" % (suite.name,))
       test_with_docker._stop_container(container)
       if test_with_docker.cleanup_containers:
         test_with_docker._rm_container(container)

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/docker/timeline.html.template
----------------------------------------------------------------------
diff --git a/docker/timeline.html.template b/docker/timeline.html.template
index c8de821..d2960d7 100644
--- a/docker/timeline.html.template
+++ b/docker/timeline.html.template
@@ -96,9 +96,7 @@ function ts_to_date(secs) {
 }
 
 function drawChart() {
-  var container = document.getElementById('container');
-  var timelineContainer = document.createElement("div");
-  container.appendChild(timelineContainer);
+  var timelineContainer = document.getElementById('timelineContainer');
   var chart = new google.visualization.Timeline(timelineContainer);
   var dataTable = new google.visualization.DataTable();
   dataTable.addColumn({ type: 'string', id: 'Position' });
@@ -115,7 +113,7 @@ function drawChart() {
 
   for (const k of Object.keys(data.metrics)) {
     var lineChart = document.createElement("div");
-    container.appendChild(lineChart);
+    lineChartContainer.appendChild(lineChart);
 
     var dataTable = new google.visualization.DataTable();
     dataTable.addColumn({ type: 'timeofday', id: 'Time' });
@@ -139,4 +137,5 @@ function drawChart() {
   }
 }
 </script>
-<div id="container" style="height: 200px;"></div>
+<div id="timelineContainer" style="height: 400px;"></div>
+<div id="lineChartContainer" style="height: 200px;"></div>

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/testdata/bin/run-hbase.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/run-hbase.sh b/testdata/bin/run-hbase.sh
index f264b65..241ccd2 100755
--- a/testdata/bin/run-hbase.sh
+++ b/testdata/bin/run-hbase.sh
@@ -36,6 +36,7 @@ cat > ${HBASE_CONF_DIR}/hbase-env.sh <<EOF
 export JAVA_HOME=${JAVA_HOME}
 export HBASE_LOG_DIR=${HBASE_LOGDIR}
 export HBASE_PID_DIR=${HBASE_LOGDIR}
+export HBASE_HEAPSIZE=1g
 EOF
 
 # Put zookeeper things in the logs/cluster/zoo directory.

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/testdata/cluster/node_templates/common/etc/init.d/hdfs-common
----------------------------------------------------------------------
diff --git a/testdata/cluster/node_templates/common/etc/init.d/hdfs-common b/testdata/cluster/node_templates/common/etc/init.d/hdfs-common
index 9a7ddd3..d53ecce 100644
--- a/testdata/cluster/node_templates/common/etc/init.d/hdfs-common
+++ b/testdata/cluster/node_templates/common/etc/init.d/hdfs-common
@@ -18,3 +18,10 @@
 export HADOOP_LOG_DIR="$LOG_DIR/hadoop-hdfs"
 export HADOOP_ROOT_LOGGER="${HADOOP_ROOT_LOGGER:-INFO,RFA}"
 export HADOOP_LOGFILE=$(basename $0).log
+
+# Force minicluster processes to have a maximum heap.
+# If unset, on large machines, the JVM default
+# is 1/4th of the RAM (or so), and processes like DataNode
+# end up never garbage collecting.
+export HADOOP_HEAPSIZE_MIN=512m
+export HADOOP_HEAPSIZE_MAX=2g

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/tests/conftest.py
----------------------------------------------------------------------
diff --git a/tests/conftest.py b/tests/conftest.py
index 144fc5c..1e6adda 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -120,6 +120,10 @@ def pytest_addoption(parser):
                    default=False, help="Run all tests with KRPC disabled. This assumes "
                    "that the test cluster has been started with --disable_krpc.")
 
+  parser.addoption("--shard_tests", default=None,
+                   help="If set to N/M (e.g., 3/5), will split the tests into "
+                   "M partitions and run the Nth partition. 1-indexed.")
+
 
 def pytest_assertrepr_compare(op, left, right):
   """
@@ -501,3 +505,34 @@ def validate_pytest_config():
     if any(pytest.config.option.impalad.startswith(loc) for loc in local_prefixes):
       logging.error("--testing_remote_cluster can not be used with a local impalad")
       pytest.exit("Invalid pytest config option: --testing_remote_cluster")
+
+
+@pytest.hookimpl(trylast=True)
+def pytest_collection_modifyitems(items, config, session):
+  """Hook to handle --shard_tests command line option.
+
+  If set, this "deselects" a subset of tests, by hashing
+  their id into buckets.
+  """
+  if not config.option.shard_tests:
+    return
+
+  num_items = len(items)
+  this_shard, num_shards = map(int, config.option.shard_tests.split("/"))
+  assert 0 <= this_shard <= num_shards
+  if this_shard == num_shards:
+    this_shard = 0
+
+  items_selected, items_deselected = [], []
+  for i in items:
+    if hash(i.nodeid) % num_shards == this_shard:
+      items_selected.append(i)
+    else:
+      items_deselected.append(i)
+  config.hook.pytest_deselected(items=items_deselected)
+
+  # We must modify the items list in place for it to take effect.
+  items[:] = items_selected
+
+  logging.info("pytest shard selection enabled %s. Of %d items, selected %d items by hash.",
+      config.option.shard_tests, num_items, len(items))

http://git-wip-us.apache.org/repos/asf/impala/blob/d733ea68/tests/run-tests.py
----------------------------------------------------------------------
diff --git a/tests/run-tests.py b/tests/run-tests.py
index 95e0d11..b1a9fdd 100755
--- a/tests/run-tests.py
+++ b/tests/run-tests.py
@@ -180,6 +180,10 @@ def build_test_args(base_name, valid_dirs=VALID_TEST_DIRS):
     #
     explicit_tests = pytest.config.getoption(FILE_OR_DIR)
     config_options = [arg for arg in commandline_args if arg not in explicit_tests]
+    # We also want to strip out any --shard_tests option and its corresponding value.
+    while "--shard_tests" in config_options:
+      i = config_options.index("--shard_tests")
+      del config_options[i:i+2]
     test_args = ignored_dirs + logging_args + config_options
 
   return test_args
@@ -237,6 +241,11 @@ if __name__ == "__main__":
     test_executor.run_tests(sys.argv[1:])
     sys.exit(0)
 
+  def run(args):
+    """Helper to print out arguments of test_executor before invoking."""
+    print "Running TestExecutor with args: %s" % (args,)
+    test_executor.run_tests(args)
+
   os.chdir(TEST_DIR)
 
   # Create the test result directory if it doesn't already exist.
@@ -248,25 +257,25 @@ if __name__ == "__main__":
   # pytest warnings/messages and displays collected tests
 
   if '--collect-only' in sys.argv:
-    test_executor.run_tests(sys.argv[1:])
+    run(sys.argv[1:])
   else:
     print_metrics('connections')
     # First run query tests that need to be executed serially
     if not skip_serial:
       base_args = ['-m', 'execute_serially']
-      test_executor.run_tests(base_args + build_test_args('serial'))
+      run(base_args + build_test_args('serial'))
       print_metrics('connections')
 
     # Run the stress tests tests
     if not skip_stress:
       base_args = ['-m', 'stress', '-n', NUM_STRESS_CLIENTS]
-      test_executor.run_tests(base_args + build_test_args('stress'))
+      run(base_args + build_test_args('stress'))
       print_metrics('connections')
 
     # Run the remaining query tests in parallel
     if not skip_parallel:
       base_args = ['-m', 'not execute_serially and not stress', '-n', NUM_CONCURRENT_TESTS]
-      test_executor.run_tests(base_args + build_test_args('parallel'))
+      run(base_args + build_test_args('parallel'))
 
     # The total number of tests executed at this point is expected to be >0
     # If it is < 0 then the script needs to exit with a non-zero
@@ -277,7 +286,7 @@ if __name__ == "__main__":
     # Finally, validate impalad/statestored metrics.
     args = build_test_args(base_name='verify-metrics', valid_dirs=['verifiers'])
     args.append('verifiers/test_verify_metrics.py')
-    test_executor.run_tests(args)
+    run(args)
 
   if test_executor.tests_failed:
     sys.exit(1)