You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2019/01/18 16:53:21 UTC

[impala] 04/04: IMPALA-7986, IMPALA-7987: run daemons in docker containers

This is an automated email from the ASF dual-hosted git repository.

mikeb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ff628d2b136e9b5ca72a7179294dea06f4cdf0d8
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Fri Dec 14 15:42:19 2018 -0800

    IMPALA-7986,IMPALA-7987: run daemons in docker containers
    
    This refactors start-impala-cluster.py to allow multiple implementations
    of the minicluster operations like start and stop. There are now
    two classes implementing the same set of operations -
    MiniClusterOperations and DockerMiniClusterOperations. The docker
    versions start and stop the containers added in IMPALA-7948.
    
    With some configuration (see instructions below), the containers can
    connect back to services (HDFS, HMS, Kudu, Sentry, etc) running on the
    host. Config generation was modified so that services optionally
    communicate via the docker bridge network rather than loopback
    (the host's loopback interface is not accessible to the containers).
    
    Notes:
    * I improved the container build to regenerate containers when cluster
      configs are regenerated (previously the containers could have stale
      configs).
    * Switch from CMD to ENTRYPOINT to allow passing in arguments to "docker
      run" without clobbering default args.
    * Python 2.6 is not supported for this code path. This only affects
      CentOS 6, which has limited support for docker anyway.
    * I deferred implementing wait_for_cluster(), since the existing
      code requires surgery to abstract out assumptions about locating
      processes and web UI ports - see IMPALA-7988.
    
    How to use:
    ==========
    Create a docker network to use for internal cluster communication,
    e.g.:
      docker network create -d bridge --gateway=172.17.0.1 \
          --subnet=172.17.0.1/16 impala-cluster
    
    Add the gateway address of the docker network you created to
    impala-config-local.sh, e.g.:
    
      export INTERNAL_LISTEN_HOST=172.17.0.1
      export DEFAULT_FS=hdfs://${INTERNAL_LISTEN_HOST}:20500
    
    Regenerate configs and docker images:
    
      . bin/impala-config.sh
      ./bin/create-test-configuration.sh
      ninja -j $IMPALA_BUILD_THREADS docker_images
    
    Restart the minicluster and Impala services to pick up the config:
    
      ./testdata/bin/run-all.sh
      start-impala-cluster.py --docker_network impala-cluster
    
    You can connect with impala-shell and run some queries. You will
    likely run into issues, particularly if running against an existing
    data load, since "localhost" or "127.0.0.1" get baked into HMS
    table definitions.
    
    Testing:
    Ran exhaustive tests (not using Docker) to make sure I didn't break
    anything.
    
    Change-Id: I5975cced33fa93df43101dd47d19b8af12e93d11
    Reviewed-on: http://gerrit.cloudera.org:8080/12095
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 bin/create-test-configuration.sh                   |   3 +
 bin/impala-config.sh                               |   8 +-
 bin/start-impala-cluster.py                        | 493 +++++++++++++++------
 fe/CMakeLists.txt                                  |  17 +-
 fe/src/test/resources/authz-policy.ini.template    |   8 +-
 fe/src/test/resources/hbase-site.xml.template      |   4 +-
 fe/src/test/resources/mysql-hive-site.xml.template |   4 +-
 .../resources/postgresql-hive-site.xml.template    |   4 +-
 fe/src/test/resources/sentry-site.xml.template     |   6 +-
 .../test/resources/sentry-site_no_oo.xml.template  |   8 +-
 fe/src/test/resources/sentry-site_oo.xml.template  |   8 +-
 .../resources/sentry-site_oo_nogrant.xml.template  |   8 +-
 testdata/cluster/admin                             |   2 +
 .../common/etc/hadoop/conf/core-site.xml.tmpl      |   2 +-
 .../common/etc/hadoop/conf/hdfs-site.xml.tmpl      |  16 +-
 .../common/etc/hadoop/conf/yarn-site.xml.tmpl      |   8 +-
 tests/custom_cluster/test_breakpad.py              |  11 +-
 17 files changed, 439 insertions(+), 171 deletions(-)

diff --git a/bin/create-test-configuration.sh b/bin/create-test-configuration.sh
index 7fc3ae7..855fb3d 100755
--- a/bin/create-test-configuration.sh
+++ b/bin/create-test-configuration.sh
@@ -203,3 +203,6 @@ else
   # functional subdirectory.
   symlink_subdirs ${IMPALA_AUX_TEST_HOME}/tests ${IMPALA_HOME}/tests
 fi
+
+# Touch a file to mark when this script was last run.
+touch "${CONFIG_DIR}/.test_config_timestamp"
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 399d953..6be8a1f 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -269,6 +269,12 @@ export azure_storage_account_name="${azure_storage_account_name-}"
 export azure_storage_container_name="${azure_storage_container_name-}"
 export HDFS_REPLICATION="${HDFS_REPLICATION-3}"
 export ISILON_NAMENODE="${ISILON_NAMENODE-}"
+# Internal and external interfaces that test cluster services will listen on. The
+# internal interface is used for ports that should not be accessed from outside the
+# host that the cluster is running on. The external interface is used for ports
+# that may need to be accessed from outside, e.g. web UIs.
+export INTERNAL_LISTEN_HOST="${INTERNAL_LISTEN_HOST-localhost}"
+export EXTERNAL_LISTEN_HOST="${EXTERNAL_LISTEN_HOST-0.0.0.0}"
 export DEFAULT_FS="${DEFAULT_FS-hdfs://localhost:20500}"
 export WAREHOUSE_LOCATION_PREFIX="${WAREHOUSE_LOCATION_PREFIX-}"
 export LOCAL_FS="file:${WAREHOUSE_LOCATION_PREFIX}"
@@ -441,7 +447,7 @@ export IMPALA_ALL_LOGS_DIRS="${IMPALA_CLUSTER_LOGS_DIR}
 CORES=$(($(getconf _NPROCESSORS_ONLN) / 2))
 export NUM_CONCURRENT_TESTS="${NUM_CONCURRENT_TESTS-${CORES}}"
 
-export KUDU_MASTER_HOSTS="${KUDU_MASTER_HOSTS:-127.0.0.1}"
+export KUDU_MASTER_HOSTS="${KUDU_MASTER_HOSTS:-${INTERNAL_LISTEN_HOST}}"
 export KUDU_MASTER_PORT="${KUDU_MASTER_PORT:-7051}"
 export KUDU_MASTER_WEBUI_PORT="${KUDU_MASTER_WEBUI_PORT:-8051}"
 
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index a4ff606..1282d95 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -20,14 +20,19 @@
 # Starts up an Impala cluster (ImpalaD + State Store) with the specified number of
 # ImpalaD instances. Each ImpalaD runs on a different port allowing this to be run
 # on a single machine.
+import getpass
+import itertools
+import json
 import logging
 import os
 import psutil
+import shlex
 import sys
 from datetime import datetime
 from getpass import getuser
 from time import sleep, time
 from optparse import OptionParser, SUPPRESS_HELP
+from subprocess import call, check_call
 from testdata.common import cgroups
 from tests.common.environ import build_flavor_timeout
 from tests.common.impala_cluster import ImpalaCluster
@@ -40,6 +45,15 @@ LOG.setLevel(level=logging.DEBUG)
 KUDU_MASTER_HOSTS = os.getenv("KUDU_MASTER_HOSTS", "127.0.0.1")
 DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get("IMPALA_MAX_LOG_FILES", 10)
 
+BASE_BEESWAX_PORT = 21000
+BASE_HS2_PORT = 21050
+BASE_BE_PORT = 22000
+BASE_KRPC_PORT = 27000
+BASE_STATE_STORE_SUBSCRIBER_PORT = 23000
+BASE_IMPALAD_WEBSERVER_PORT = 25000
+BASE_STATESTORED_WEBSERVER_PORT = 25010
+BASE_CATALOGD_WEBSERVER_PORT = 25020
+
 # Options
 parser = OptionParser()
 parser.add_option("-s", "--cluster_size", type="int", dest="cluster_size", default=3,
@@ -91,6 +105,20 @@ parser.add_option("--jvm_args", dest="jvm_args", default="",
 parser.add_option("--kudu_master_hosts", default=KUDU_MASTER_HOSTS,
                   help="The host name or address of the Kudu master. Multiple masters "
                       "can be specified using a comma separated list.")
+parser.add_option("--docker_network", dest="docker_network", default=None,
+                  help="If set, the cluster processes run inside docker containers "
+                      "(which must be already built, e.g. with 'make docker_images'. "
+                      "The containers are connected to the virtual network specified by "
+                      "the argument value. This is currently experimental and not all "
+                      "actions work. This mode only works on python 2.7+")
+parser.add_option("--docker_auto_ports", dest="docker_auto_ports",
+                  action="store_true", default=False,
+                  help="Only has an effect if --docker_network is set. If true, Docker "
+                       "will automatically allocate ports for client-facing endpoints "
+                       "(Beewax, HS2, Web UIs, etc), which avoids collisions with other "
+                       "running processes. If false, ports are mapped to the same ports "
+                       "on localhost as the non-docker impala cluster.")
+
 
 # For testing: list of comma-separated delays, in milliseconds, that delay impalad catalog
 # replica initialization. The ith delay is applied to the ith impalad.
@@ -156,16 +184,14 @@ def exec_impala_process(cmd, args, stderr_log_file_path):
   else:
     redirect_output = "1>{stderr_log_file_path}".format(
         stderr_log_file_path=stderr_log_file_path)
+  # TODO: it would be better to use Popen and pass in tokenised arguments rather than
+  # relying on shell tokenisation.
   cmd = "{cmd} {args} {redirect_output} 2>&1 &".format(
       cmd=cmd,
       args=args,
       redirect_output=redirect_output)
   os.system(cmd)
 
-def kill_cluster_processes(force=False):
-  binaries = ["catalogd", "impalad", "statestored"]
-  kill_matching_processes(binaries, force)
-
 def kill_matching_processes(binary_names, force=False):
   """Kills all processes with the given binary name, waiting for them to exit"""
   # Send all the signals before waiting so that processes can clean up in parallel.
@@ -189,30 +215,17 @@ def kill_matching_processes(binary_names, force=False):
               process_pid=process.pid,
               num_seconds=KILL_TIMEOUT_IN_SECONDS))
 
-def start_statestore():
-  LOG.info("Starting State Store logging to {log_dir}/statestored.INFO".format(
-      log_dir=options.log_dir))
-  stderr_log_file_path = os.path.join(options.log_dir, "statestore-error.log")
-  args = "{impalad_logging_args} {state_store_args}".format(
-      impalad_logging_args=build_impalad_logging_args(0, "statestored"),
-      state_store_args=" ".join(options.state_store_args))
-  exec_impala_process(STATE_STORE_PATH, args, stderr_log_file_path)
-  if not check_process_exists("statestored", 10):
-    raise RuntimeError("Unable to start statestored. Check log or file permissions"
-                       " for more details.")
-
-def start_catalogd():
-  LOG.info("Starting Catalog Service logging to {log_dir}/catalogd.INFO".format(
-      log_dir=options.log_dir))
-  stderr_log_file_path = os.path.join(options.log_dir, "catalogd-error.log")
-  args = "{impalad_logging_args} {catalogd_args} {jvm_args}".format(
-      impalad_logging_args=build_impalad_logging_args(0, "catalogd"),
-      catalogd_args=" ".join(options.catalogd_args),
-      jvm_args=build_jvm_args(options.cluster_size))
-  exec_impala_process(CATALOGD_PATH, args, stderr_log_file_path)
-  if not check_process_exists("catalogd", 10):
-    raise RuntimeError("Unable to start catalogd. Check log or file permissions"
-                       " for more details.")
+
+def choose_impalad_ports(instance_num):
+  """Compute the ports for impalad instance num 'instance_num', returning as a map
+  from the argument name to the port number."""
+  return {'beeswax_port': BASE_BEESWAX_PORT + instance_num,
+          'hs2_port': BASE_HS2_PORT + instance_num,
+          'be_port': BASE_BE_PORT + instance_num,
+          'krpc_port': BASE_KRPC_PORT + instance_num,
+          'state_store_subscriber_port': BASE_STATE_STORE_SUBSCRIBER_PORT + instance_num,
+          'webserver_port': BASE_IMPALAD_WEBSERVER_PORT + instance_num}
+
 
 def build_impalad_port_args(instance_num):
   IMPALAD_PORTS = (
@@ -222,44 +235,77 @@ def build_impalad_port_args(instance_num):
       "-krpc_port={krpc_port} "
       "-state_store_subscriber_port={state_store_subscriber_port} "
       "-webserver_port={webserver_port}")
-  BASE_BEESWAX_PORT = 21000
-  BASE_HS2_PORT = 21050
-  BASE_BE_PORT = 22000
-  BASE_KRPC_PORT = 27000
-  BASE_STATE_STORE_SUBSCRIBER_PORT = 23000
-  BASE_WEBSERVER_PORT = 25000
-  return IMPALAD_PORTS.format(
-      beeswax_port=BASE_BEESWAX_PORT + instance_num,
-      hs2_port=BASE_HS2_PORT + instance_num,
-      be_port=BASE_BE_PORT + instance_num,
-      krpc_port=BASE_KRPC_PORT + instance_num,
-      state_store_subscriber_port=BASE_STATE_STORE_SUBSCRIBER_PORT + instance_num,
-      webserver_port=BASE_WEBSERVER_PORT + instance_num)
-
-def build_impalad_logging_args(instance_num, service_name):
-  return ("-log_filename={log_filename} "
-      "-log_dir={log_dir} "
-      "-v={log_level} "
-      "-logbufsecs=5 "
-      "-max_log_files={max_log_files}").format(
-          log_filename=service_name,
-          log_dir=options.log_dir,
-          log_level=options.log_level,
-          max_log_files=options.max_log_files)
+  return IMPALAD_PORTS.format(**choose_impalad_ports(instance_num))
+
+
+def build_logging_args(service_name):
+  """Return a list of command line arguments to pass to daemon processes to configure
+  logging"""
+  result = ["-logbufsecs=5", "-v={0}".format(options.log_level),
+      "-max_log_files={0}".format(options.max_log_files)]
+  if options.docker_network is None:
+    # Impala inside a docker container should always log to the same location.
+    result += ["-log_filename={0}".format(service_name),
+               "-log_dir={0}".format(options.log_dir)]
+  return result
+
 
 def build_jvm_args(instance_num):
+  """Return a list of command line arguments to pass to start-*.sh to configure the JVM.
+  """
+  # TODO: IMPALA-7999 - Docker build doesn't use the start-*.sh scripts that process
+  # these args. Skip generating until that is fixed.
+  if options.docker_network is not None:
+    return []
   BASE_JVM_DEBUG_PORT = 30000
-  return "-jvm_debug_port={jvm_debug_port} -jvm_args={jvm_args}".format(
-      jvm_debug_port=BASE_JVM_DEBUG_PORT + instance_num,
-      jvm_args=options.jvm_args)
-
-def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordinators):
-  """Start 'cluster_size' impalad instances. The first 'num_coordinator' instances will
-    act as coordinators. 'use_exclusive_coordinators' specifies whether the coordinators
-    will only execute coordinator fragments."""
-  if cluster_size == 0:
-    # No impalad instances should be started.
-    return
+  return ["-jvm_debug_port={0}".format(BASE_JVM_DEBUG_PORT + instance_num),
+          "-jvm_args={0}".format(options.jvm_args)]
+
+
+def impalad_service_name(i):
+  """Return the name to use for the ith impala daemon in the cluster."""
+  if i == 0:
+    # The first impalad always logs to impalad.INFO
+    return "impalad"
+  else:
+    return "impalad_node{node_num}".format(node_num=i)
+
+
+def combine_arg_list_opts(opt_args):
+  """Helper for processing arguments like impalad_args. The input is a list of strings,
+  each of which is the string passed into one instance of the argument, e.g. for
+  --impalad_args="-foo -bar" --impalad_args="-baz", the input to this function is
+  ["-foo -bar", "-baz"]. This function combines the argument lists by tokenised each
+  string into separate arguments, if needed, e.g. to produce the output
+  ["-foo", "-bar", "-baz"]"""
+  return list(itertools.chain(*[shlex.split(arg) for arg in opt_args]))
+
+
+def build_statestored_arg_list():
+  """Build a list of command line arguments to pass to the statestored."""
+  return (build_logging_args("statestored") +
+      combine_arg_list_opts(options.state_store_args))
+
+
+def build_catalogd_arg_list():
+  """Build a list of command line arguments to pass to the catalogd."""
+  return (build_logging_args("catalogd") +
+      ["-kudu_master_hosts", options.kudu_master_hosts] +
+      combine_arg_list_opts(options.catalogd_args) +
+      build_jvm_args(options.cluster_size))
+
+
+def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordinators,
+    remap_ports):
+  """Build the argument lists for impala daemons in the cluster. Returns a list of
+  argument lists, one for each impala daemon in the cluster. Each argument list is
+  simply a string that will be tokenized based on shell rules. 'num_coordinators' and
+  'use_exclusive_coordinators' allow setting up the cluster with dedicated coordinators.
+  If 'remap_ports' is true, the impalad ports are changed from their default values to
+  avoid port conflicts."""
+  # TODO: it would be better to produce a list of arguments rather than a big string
+  # blob that has to be tokenized later. However, that may require more substantial
+  # refactoring.
 
   # Set mem_limit of each impalad to the smaller of 12GB or
   # 1/cluster_size (typically 1/3) of 70% of system memory.
@@ -285,23 +331,17 @@ def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordi
   if options.per_impalad_args != "":
     per_impalad_args = [args.strip() for args in options.per_impalad_args.split(";")]
 
-  # Start each impalad instance and optionally redirect the output to a log file.
+  # Build args for each each impalad instance.
+  impalad_args = []
   for i in range(cluster_size):
-    if i == 0:
-      # The first impalad always logs to impalad.INFO
-      service_name = "impalad"
-    else:
-      service_name = "impalad_node{node_num}".format(node_num=i)
-      # Sleep between instance startup: simultaneous starts hurt the minikdc
-      # Yes, this is a hack, but it's easier than modifying the minikdc...
-      # TODO: is this really necessary?
-      sleep(1)
-
-    LOG.info("Starting Impala Daemon logging to {log_dir}/{service_name}.INFO".format(
-        log_dir=options.log_dir,
-        service_name=service_name))
+    service_name = impalad_service_name(i)
 
+    impala_port_args = ""
+    if remap_ports:
+      impala_port_args = build_impalad_port_args(i)
     # impalad args from the --impalad_args flag. Also replacing '#ID' with the instance.
+    # TODO: IMPALA-7940: mem_limit should be set on docker container and detected by
+    # the impala daemon process instead of set manually here.
     param_args = (" ".join(options.impalad_args)).replace("#ID", str(i))
     args = ("--mem_limit={mem_limit} "
         "{impala_logging_args} "
@@ -309,9 +349,9 @@ def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordi
         "{impala_port_args} "
         "{param_args}").format(
             mem_limit=mem_limit,  # Goes first so --impalad_args will override it.
-            impala_logging_args=build_impalad_logging_args(i, service_name),
-            jvm_args=build_jvm_args(i),
-            impala_port_args=build_impalad_port_args(i),
+            impala_logging_args=" ".join(build_logging_args(service_name)),
+            jvm_args=" ".join(build_jvm_args(i)),
+            impala_port_args=impala_port_args,
             param_args=param_args)
     if options.kudu_master_hosts:
       # Must be prepended, otherwise the java options interfere.
@@ -338,13 +378,219 @@ def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordi
     # Appended at the end so they can override previous args.
     if i < len(per_impalad_args):
       args = "{args} {per_impalad_args}".format(
-          args=args,
-          per_impalad_args=per_impalad_args[i])
+          args=args, per_impalad_args=per_impalad_args[i])
+    impalad_args.append(args)
+  return impalad_args
+
+
+class MiniClusterOperations(object):
+  """Implementations of operations for the non-containerized minicluster
+  implementation."""
+  def kill_all_daemons(self, force=False):
+    kill_matching_processes(["catalogd", "impalad", "statestored"], force)
+
+  def kill_all_impalads(self, force=False):
+    kill_matching_processes(["impalad"], force=force)
+
+  def kill_catalogd(self, force=False):
+    kill_matching_processes(["catalogd"], force=force)
+
+  def kill_statestored(self, force=False):
+    kill_matching_processes(["statestored"], force=force)
+
+  def start_statestore(self):
+    LOG.info("Starting State Store logging to {log_dir}/statestored.INFO".format(
+        log_dir=options.log_dir))
+    stderr_log_file_path = os.path.join(options.log_dir, "statestore-error.log")
+    args = " ".join(build_statestored_arg_list())
+    exec_impala_process(STATE_STORE_PATH, args, stderr_log_file_path)
+    if not check_process_exists("statestored", 10):
+      raise RuntimeError("Unable to start statestored. Check log or file permissions"
+                         " for more details.")
+
+  def start_catalogd(self):
+    LOG.info("Starting Catalog Service logging to {log_dir}/catalogd.INFO".format(
+        log_dir=options.log_dir))
+    stderr_log_file_path = os.path.join(options.log_dir, "catalogd-error.log")
+    args = " ".join(build_catalogd_arg_list())
+    exec_impala_process(CATALOGD_PATH, args, stderr_log_file_path)
+    if not check_process_exists("catalogd", 10):
+      raise RuntimeError("Unable to start catalogd. Check log or file permissions"
+                         " for more details.")
+
+  def start_impalads(self, cluster_size, num_coordinators, use_exclusive_coordinators):
+    """Start 'cluster_size' impalad instances. The first 'num_coordinator' instances will
+      act as coordinators. 'use_exclusive_coordinators' specifies whether the coordinators
+      will only execute coordinator fragments."""
+    if cluster_size == 0:
+      # No impalad instances should be started.
+      return
+
+    impalad_arg_lists = build_impalad_arg_lists(
+        cluster_size, num_coordinators, use_exclusive_coordinators, remap_ports=True)
+    assert cluster_size == len(impalad_arg_lists)
+    for i in xrange(cluster_size):
+      service_name = impalad_service_name(i)
+      LOG.info("Starting Impala Daemon logging to {log_dir}/{service_name}.INFO".format(
+          log_dir=options.log_dir, service_name=service_name))
+      stderr_log_file_path = os.path.join(
+          options.log_dir, "{service_name}-error.log".format(service_name=service_name))
+      exec_impala_process(IMPALAD_PATH, impalad_arg_lists[i], stderr_log_file_path)
+
+  def wait_for_cluster(self):
+    """Checks if the cluster is "ready"
+
+    A cluster is deemed "ready" if:
+      - All backends are registered with the statestore.
+      - Each impalad knows about all other impalads.
+      - Each coordinator impalad's catalog cache is ready.
+    This information is retrieved by querying the statestore debug webpage
+    and each individual impalad's metrics webpage.
+    """
+    impala_cluster = ImpalaCluster()
+    # impalad processes may take a while to come up.
+    wait_for_impala_process_count(impala_cluster)
+
+    # TODO: fix this for coordinator-only nodes as well.
+    expected_num_backends = options.cluster_size
+    if options.catalog_init_delays != "":
+      for delay in options.catalog_init_delays.split(","):
+        if int(delay.strip()) != 0: expected_num_backends -= 1
+
+    for impalad in impala_cluster.impalads:
+      impalad.service.wait_for_num_known_live_backends(expected_num_backends,
+          timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
+      if impalad._get_arg_value("is_coordinator", default="true") == "true" and \
+         impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) == 0:
+        wait_for_catalog(impalad)
+
+
+class DockerMiniClusterOperations(object):
+  """Implementations of operations for the containerized minicluster implementation
+  with all processes attached to a user-defined docker bridge network.
+
+  We assume that only one impala cluster is running on the network - existing containers
+  created by this script (or with names that collide with those generated by this script)
+  can be destroyed if present.
+
+  We use a naming convention for the created docker containers so that we can easily
+  refer to them with docker commands:
+    impala-test-cluster-<network_name>-<daemon_name>[-<instance_num>],
+  e.g. impala-test-cluster-impala_network-catalogd or
+  impala-test-cluster-impala_network-impalad-0.
+  """
+  def __init__(self, network_name):
+    self.network_name = network_name
+    # Make sure that the network actually exists.
+    check_call(["docker", "network", "inspect", network_name])
+
+  def kill_all_daemons(self, force=False):
+    self.kill_statestored(force=force)
+    self.kill_catalogd(force=force)
+    self.kill_all_impalads(force=force)
+
+  def kill_all_impalads(self, force=False):
+    # List all running containers on the network and kill those with the impalad name
+    # prefix to make sure that no running container are left over from previous clusters.
+    container_name_prefix = self.__gen_container_name__("impalad")
+    for container_id, info in self.__get_network_info__()["Containers"].iteritems():
+      container_name = info["Name"]
+      if container_name.startswith(container_name_prefix):
+        LOG.info("Stopping container {0}".format(container_name))
+        check_call(["docker", "stop", container_name])
+
+  def kill_catalogd(self, force=False):
+    self.__stop_container__("catalogd")
+
+  def kill_statestored(self, force=False):
+    self.__stop_container__("statestored")
+
+  def start_statestore(self):
+    self.__run_container__("statestored", build_statestored_arg_list(),
+        {BASE_STATESTORED_WEBSERVER_PORT: BASE_STATESTORED_WEBSERVER_PORT})
+
+  def start_catalogd(self):
+    self.__run_container__("catalogd", build_catalogd_arg_list(),
+          {BASE_CATALOGD_WEBSERVER_PORT: BASE_CATALOGD_WEBSERVER_PORT})
+
+  def start_impalads(self, cluster_size, num_coordinators, use_exclusive_coordinators):
+    impalad_arg_lists = build_impalad_arg_lists(
+        cluster_size, num_coordinators, use_exclusive_coordinators, remap_ports=False)
+    assert cluster_size == len(impalad_arg_lists)
+    for i in xrange(cluster_size):
+      chosen_ports = choose_impalad_ports(i)
+      port_map = {BASE_BEESWAX_PORT: chosen_ports['beeswax_port'],
+                  BASE_HS2_PORT: chosen_ports['hs2_port'],
+                  BASE_IMPALAD_WEBSERVER_PORT: chosen_ports['webserver_port']}
+      self.__run_container__("impalad_coord_exec",
+          shlex.split(impalad_arg_lists[i]), port_map, i)
+
+  def wait_for_cluster(self):
+    # TODO: IMPALA-7988: this requires ImpalaCluster to understand containerisation.
+    LOG.info("Impala minicluster started on docker network {0}. Please wait a moment "
+        "before submitting queries.""".format(self.network_name))
+
+  def __gen_container_name__(self, daemon, instance=None):
+    """Generate the name for the container, which should be unique among containers
+    managed by this script."""
+    return "impala-test-cluster-{0}-{1}".format(
+        self.network_name, self.__gen_host_name__(daemon, instance))
+
+  def __gen_host_name__(self, daemon, instance=None):
+    """Generate the host name for the daemon inside the network, e.g. catalogd or
+    impalad-1."""
+    if instance is None:
+      return daemon
+    return "{0}-{1}".format(daemon, instance)
+
+  def __run_container__(self, daemon, args, port_map, instance=None):
+    """Launch a container with the daemon - impalad, catalogd, or statestored. If there
+    are multiple impalads in the cluster, a unique instance number must be specified.
+    'args' are command-line arguments to be appended to the end of the daemon command
+    line. 'port_map' determines a mapping from container ports to ports on localhost. If
+    --docker_auto_ports was set on the command line, 'port_map' is ignored and Docker
+    will automatically choose the mapping. If there is an existing running or stopped
+    container with the same name, it will be destroyed."""
+    self.__destroy_container__(daemon, instance)
+    if options.docker_auto_ports:
+      port_args = ["-P"]
+    else:
+      port_args = ["-p{dst}:{src}".format(src=src, dst=dst)
+                   for src, dst in port_map.iteritems()]
+    # Impersonate the current user for operations against the minicluster. This is
+    # necessary because the user name inside the container is "root".
+    env_args = ["-e", "HADOOP_USER_NAME={0}".format(getpass.getuser())]
+    # The container build processes tags the generated image with the daemon name.
+    image_tag = daemon
+    host_name = self.__gen_host_name__(daemon, instance)
+    container_name = self.__gen_container_name__(daemon, instance)
+    LOG.info("Running container {0}".format(container_name))
+    run_cmd = (["docker", "run", "-d"] + env_args + port_args + ["--network",
+      self.network_name, "--name", container_name, "--network-alias", host_name,
+      image_tag] + args)
+    LOG.info("Running command {0}".format(run_cmd))
+    check_call(run_cmd)
+    port_mapping = check_output(["docker", "port", container_name])
+    LOG.info("Launched container {0} with port mapping:\n{1}".format(
+        container_name, port_mapping))
+
+  def __stop_container__(self, daemon, instance=None):
+    """Stop a container that was created by __run_container__()."""
+    container_name = self.__gen_container_name__(daemon, instance)
+    check_call(["docker", "stop", container_name])
+
+  def __destroy_container__(self, daemon, instance=None):
+    """Destroy a container that was created by __run_container__()."""
+    container_name = self.__gen_container_name__(daemon, instance)
+    if call(["docker", "rm", "-f", container_name]) == 0:
+      LOG.info("Destroyed container {0}".format(container_name))
+
+  def __get_network_info__(self):
+    """Get the output of "docker network inspect" as a python data structure."""
+    output = check_output(["docker", "network", "inspect", self.network_name])
+    # Only one network should be present in the top level array.
+    return json.loads(output)[0]
 
-    stderr_log_file_path = os.path.join(
-        options.log_dir,
-        "{service_name}-error.log".format(service_name=service_name))
-    exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
 
 def wait_for_impala_process_count(impala_cluster, retries=10):
   """Checks that the desired number of impalad/statestored processes are running.
@@ -372,34 +618,7 @@ def wait_for_impala_process_count(impala_cluster, retries=10):
     raise RuntimeError(msg)
 
 
-def wait_for_cluster(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
-  """Checks if the cluster is "ready"
-
-  A cluster is deemed "ready" if:
-    - All backends are registered with the statestore.
-    - Each impalad knows about all other impalads.
-    - Each coordinator impalad's catalog cache is ready.
-  This information is retrieved by querying the statestore debug webpage
-  and each individual impalad's metrics webpage.
-  """
-  impala_cluster = ImpalaCluster()
-  # impalad processes may take a while to come up.
-  wait_for_impala_process_count(impala_cluster)
-
-  # TODO: fix this for coordinator-only nodes as well.
-  expected_num_backends = options.cluster_size
-  if options.catalog_init_delays != "":
-    for delay in options.catalog_init_delays.split(","):
-      if int(delay.strip()) != 0: expected_num_backends -= 1
-
-  for impalad in impala_cluster.impalads:
-    impalad.service.wait_for_num_known_live_backends(expected_num_backends,
-        timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
-    if impalad._get_arg_value("is_coordinator", default="true") == "true" and \
-       impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) == 0:
-      wait_for_catalog(impalad)
-
-def wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
+def wait_for_catalog(impalad):
   """Waits for a catalog copy to be received by the impalad. When its received,
      additionally waits for client ports to be opened."""
   start_time = time()
@@ -407,7 +626,7 @@ def wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS
   hs2_port_is_open = False
   num_dbs = 0
   num_tbls = 0
-  while ((time() - start_time < timeout_in_seconds) and
+  while ((time() - start_time < CLUSTER_WAIT_TIMEOUT_IN_SECONDS) and
       not (beeswax_port_is_open and hs2_port_is_open)):
     try:
       num_dbs, num_tbls = impalad.service.get_metric_values(
@@ -423,13 +642,10 @@ def wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS
 
   if not hs2_port_is_open or not beeswax_port_is_open:
     raise RuntimeError("Unable to open client ports within {num_seconds} seconds.".format(
-        num_seconds=timeout_in_seconds))
+        num_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS))
 
-if __name__ == "__main__":
-  if options.kill_only:
-    kill_cluster_processes(force=options.force_kill)
-    sys.exit(0)
 
+def validate_options():
   if options.build_type not in KNOWN_BUILD_TYPES:
     LOG.error("Invalid build type {0}".format(options.build_type))
     LOG.error("Valid values: {0}".format(", ".join(KNOWN_BUILD_TYPES)))
@@ -466,15 +682,30 @@ if __name__ == "__main__":
         "Cannot perform individual component restarts using an in-process cluster")
       sys.exit(1)
 
+
+if __name__ == "__main__":
+  validate_options()
+  if options.docker_network is None:
+    cluster_ops = MiniClusterOperations()
+  else:
+    if sys.version_info < (2, 7):
+      raise Exception("Docker minicluster only supported on Python 2.7+")
+    # We use some functions in the docker code that don't exist in Python 2.6.
+    from subprocess import check_output
+    cluster_ops = DockerMiniClusterOperations(options.docker_network)
+
   # Kill existing cluster processes based on the current configuration.
   if options.restart_impalad_only:
-    kill_matching_processes(["impalad"], force=options.force_kill)
+    cluster_ops.kill_all_impalads(force=options.force_kill)
   elif options.restart_catalogd_only:
-    kill_matching_processes(["catalogd"], force=options.force_kill)
+    cluster_ops.kill_catalogd(force=options.force_kill)
   elif options.restart_statestored_only:
-    kill_matching_processes(["statestored"], force=options.force_kill)
+    cluster_ops.kill_statestored(force=options.force_kill)
   else:
-    kill_cluster_processes(force=options.force_kill)
+    cluster_ops.kill_all_daemons(force=options.force_kill)
+
+  if options.kill_only:
+    sys.exit(0)
 
   if options.restart_impalad_only:
     impala_cluster = ImpalaCluster()
@@ -485,22 +716,22 @@ if __name__ == "__main__":
 
   try:
     if options.restart_catalogd_only:
-      start_catalogd()
+      cluster_ops.start_catalogd()
     elif options.restart_statestored_only:
-      start_statestore()
+      cluster_ops.start_statestore()
     elif options.restart_impalad_only:
-      start_impalad_instances(options.cluster_size, options.num_coordinators,
+      cluster_ops.start_impalads(options.cluster_size, options.num_coordinators,
                               options.use_exclusive_coordinators)
     else:
-      start_statestore()
-      start_catalogd()
-      start_impalad_instances(options.cluster_size, options.num_coordinators,
+      cluster_ops.start_statestore()
+      cluster_ops.start_catalogd()
+      cluster_ops.start_impalads(options.cluster_size, options.num_coordinators,
                               options.use_exclusive_coordinators)
     # Sleep briefly to reduce log spam: the cluster takes some time to start up.
     sleep(3)
 
     # Check for the cluster to be ready.
-    wait_for_cluster()
+    cluster_ops.wait_for_cluster()
   except Exception, e:
     LOG.exception("Error starting cluster")
     sys.exit(1)
diff --git a/fe/CMakeLists.txt b/fe/CMakeLists.txt
index 6c42bf8..c6af7db 100644
--- a/fe/CMakeLists.txt
+++ b/fe/CMakeLists.txt
@@ -20,6 +20,21 @@ add_custom_target(fe ALL DEPENDS
   COMMAND ${CMAKE_SOURCE_DIR}/bin/mvn-quiet.sh -B install -DskipTests
 )
 
-add_custom_target(test-configuration
+# Timestamp file is touched by bin/create-test-configuration.sh. This is used
+# for downstream targets like the docker build that need to pick up regenerated configs
+# when they change.
+set(TEST_CONFIGURATION_TIMESTAMP
+  ${CMAKE_SOURCE_DIR}/fe/src/test/resources/.test_config_timestamp
+)
+
+add_custom_command(
+  OUTPUT ${TEST_CONFIGURATION_TIMESTAMP}
   COMMAND ${CMAKE_SOURCE_DIR}/bin/create-test-configuration.sh
+  DEPENDS ${CMAKE_SOURCE_DIR}/bin/create-test-configuration.sh
+  COMMENT "Creating test configuration."
+  VERBATIM
+)
+
+add_custom_target(test-configuration
+  DEPENDS ${TEST_CONFIGURATION_TIMESTAMP}
 )
diff --git a/fe/src/test/resources/authz-policy.ini.template b/fe/src/test/resources/authz-policy.ini.template
index e26ee55..80ac068 100644
--- a/fe/src/test/resources/authz-policy.ini.template
+++ b/fe/src/test/resources/authz-policy.ini.template
@@ -93,10 +93,10 @@ select_column_level_functional =\
     server=server1->db=functional->table=allcomplextypes->column=int_map_col->action=select
 select_column_level_functional_avro =\
     server=server1->db=functional_avro->table=alltypessmall->column=id->action=select
-new_table_uri = server=server1->uri=hdfs://localhost:20500/test-warehouse/new_table
-tpch_data_uri = server=server1->uri=hdfs://localhost:20500/test-warehouse/tpch.lineitem
-upper_case_uri = server=server1->uri=hdfs://localhost:20500/test-warehouse/UPPER_CASE
-libtestudfs_uri = server=server1->uri=hdfs://localhost:20500/test-warehouse/libTestUdfs.so
+new_table_uri = server=server1->uri=hdfs://${INTERNAL_LISTEN_HOST}:20500/test-warehouse/new_table
+tpch_data_uri = server=server1->uri=hdfs://${INTERNAL_LISTEN_HOST}:20500/test-warehouse/tpch.lineitem
+upper_case_uri = server=server1->uri=hdfs://${INTERNAL_LISTEN_HOST}:20500/test-warehouse/UPPER_CASE
+libtestudfs_uri = server=server1->uri=hdfs://${INTERNAL_LISTEN_HOST}:20500/test-warehouse/libTestUdfs.so
 
 # This section allows for an admin specified mapping of users -> groups rather than using
 # the built-in hadoop group mapping. This section is only applicable if using the
diff --git a/fe/src/test/resources/hbase-site.xml.template b/fe/src/test/resources/hbase-site.xml.template
index 2fa4848..f282a8e 100644
--- a/fe/src/test/resources/hbase-site.xml.template
+++ b/fe/src/test/resources/hbase-site.xml.template
@@ -22,7 +22,7 @@
 <configuration>
   <property>
     <name>hbase.rootdir</name>
-    <value>hdfs://localhost:20500/hbase</value>
+    <value>hdfs://${INTERNAL_LISTEN_HOST}:20500/hbase</value>
     <description>The directory shared by RegionServers.
     </description>
   </property>
@@ -112,7 +112,7 @@
 
   <property>
     <name>hbase.zookeeper.quorum</name>
-    <value>localhost</value> <!-- The comma-separated
+    <value>${INTERNAL_LISTEN_HOST}</value> <!-- The comma-separated
     list of hostnames of the ZooKeeper Quorum hosts -->
   </property>
 
diff --git a/fe/src/test/resources/mysql-hive-site.xml.template b/fe/src/test/resources/mysql-hive-site.xml.template
index e5967ba..544e8ac 100644
--- a/fe/src/test/resources/mysql-hive-site.xml.template
+++ b/fe/src/test/resources/mysql-hive-site.xml.template
@@ -26,7 +26,7 @@
 <!-- resource).                                                                                 -->
 <property>
  <name>hive.metastore.uris</name>
- <value>thrift://localhost:9083</value>
+ <value>thrift://${INTERNAL_LISTEN_HOST}:9083</value>
 </property>
 <property>
  <name>javax.jdo.option.ConnectionURL</name>
@@ -66,7 +66,7 @@
 </property>
 <property>
   <name>hive.stats.dbconnectionstring</name>
-  <value>jdbc:mysql://localhost:3306/${METASTORE_DB}_Stats?createDatabaseIfNotExist=true&amp;user=hiveuser&amp;password=password</value>
+  <value>jdbc:mysql://${INTERNAL_LISTEN_HOST}:3306/${METASTORE_DB}_Stats?createDatabaseIfNotExist=true&amp;user=hiveuser&amp;password=password</value>
 </property>
 <property>
   <name>hive.stats.jdbcdriver</name>
diff --git a/fe/src/test/resources/postgresql-hive-site.xml.template b/fe/src/test/resources/postgresql-hive-site.xml.template
index 9e25024..51ce62c 100644
--- a/fe/src/test/resources/postgresql-hive-site.xml.template
+++ b/fe/src/test/resources/postgresql-hive-site.xml.template
@@ -26,7 +26,7 @@
 <!-- resource).                                                                                 -->
 <property>
  <name>hive.metastore.uris</name>
- <value>thrift://localhost:9083</value>
+ <value>thrift://${INTERNAL_LISTEN_HOST}:9083</value>
 </property>
 <property>
  <name>javax.jdo.option.ConnectionURL</name>
@@ -179,7 +179,7 @@
 
 <property>
   <name>hbase.zookeeper.quorum</name>
-  <value>localhost</value>
+  <value>${INTERNAL_LISTEN_HOST}</value>
 </property>
 <!-- END IMPALA-4125 -->
 
diff --git a/fe/src/test/resources/sentry-site.xml.template b/fe/src/test/resources/sentry-site.xml.template
index 597a130..e48b8ff 100644
--- a/fe/src/test/resources/sentry-site.xml.template
+++ b/fe/src/test/resources/sentry-site.xml.template
@@ -32,7 +32,7 @@
 </property>
 <property>
  <name>sentry.service.server.rpc-address</name>
- <value>localhost</value>
+ <value>${INTERNAL_LISTEN_HOST}</value>
 </property>
 <property>
  <name>sentry.service.server.rpc-port</name>
@@ -40,14 +40,14 @@
 </property>
 <property>
  <name>sentry.service.client.server.rpc-address</name>
- <value>localhost</value>
+ <value>${INTERNAL_LISTEN_HOST}</value>
 </property>
 <property>
 <!--
  TODO(IMPALA-5686): Remove this or the previous property when Sentry standardizes on one.
 -->
  <name>sentry.service.client.server.rpc-addresses</name>
- <value>localhost</value>
+ <value>${INTERNAL_LISTEN_HOST}</value>
 </property>
 <property>
  <name>sentry.service.client.server.rpc-port</name>
diff --git a/fe/src/test/resources/sentry-site_no_oo.xml.template b/fe/src/test/resources/sentry-site_no_oo.xml.template
index cfebc9a..3c29b5d 100644
--- a/fe/src/test/resources/sentry-site_no_oo.xml.template
+++ b/fe/src/test/resources/sentry-site_no_oo.xml.template
@@ -32,7 +32,7 @@
   </property>
   <property>
     <name>sentry.service.server.rpc-address</name>
-    <value>localhost</value>
+    <value>${INTERNAL_LISTEN_HOST}</value>
   </property>
   <property>
     <name>sentry.service.server.rpc-port</name>
@@ -40,14 +40,14 @@
   </property>
   <property>
     <name>sentry.service.client.server.rpc-address</name>
-    <value>localhost</value>
+    <value>${INTERNAL_LISTEN_HOST}</value>
   </property>
   <property>
   <!--
    TODO(IMPALA-5686): Remove this or the previous property when Sentry standardizes on one.
   -->
     <name>sentry.service.client.server.rpc-addresses</name>
-    <value>localhost</value>
+    <value>${INTERNAL_LISTEN_HOST}</value>
   </property>
   <property>
     <name>sentry.service.client.server.rpc-port</name>
@@ -55,7 +55,7 @@
   </property>
   <property>
     <name>sentry.store.jdbc.url</name>
-    <value>jdbc:postgresql://localhost:5432/${SENTRY_POLICY_DB}/;create=true</value>
+    <value>jdbc:postgresql://${INTERNAL_LISTEN_HOST}:5432/${SENTRY_POLICY_DB}/;create=true</value>
   </property>
   <property>
     <name>sentry.store.jdbc.user</name>
diff --git a/fe/src/test/resources/sentry-site_oo.xml.template b/fe/src/test/resources/sentry-site_oo.xml.template
index 3cd1b80..bccb1c5 100644
--- a/fe/src/test/resources/sentry-site_oo.xml.template
+++ b/fe/src/test/resources/sentry-site_oo.xml.template
@@ -32,7 +32,7 @@
   </property>
   <property>
     <name>sentry.service.server.rpc-address</name>
-    <value>localhost</value>
+    <value>${INTERNAL_LISTEN_HOST}</value>
   </property>
   <property>
     <name>sentry.service.server.rpc-port</name>
@@ -40,14 +40,14 @@
   </property>
   <property>
     <name>sentry.service.client.server.rpc-address</name>
-    <value>localhost</value>
+    <value>${INTERNAL_LISTEN_HOST}</value>
   </property>
   <property>
   <!--
    TODO(IMPALA-5686): Remove this or the previous property when Sentry standardizes on one.
   -->
     <name>sentry.service.client.server.rpc-addresses</name>
-    <value>localhost</value>
+    <value>${INTERNAL_LISTEN_HOST}</value>
   </property>
   <property>
     <name>sentry.service.client.server.rpc-port</name>
@@ -55,7 +55,7 @@
   </property>
   <property>
     <name>sentry.store.jdbc.url</name>
-    <value>jdbc:postgresql://localhost:5432/${SENTRY_POLICY_DB}/;create=true</value>
+    <value>jdbc:postgresql://${INTERNAL_LISTEN_HOST}:5432/${SENTRY_POLICY_DB}/;create=true</value>
   </property>
   <property>
     <name>sentry.store.jdbc.user</name>
diff --git a/fe/src/test/resources/sentry-site_oo_nogrant.xml.template b/fe/src/test/resources/sentry-site_oo_nogrant.xml.template
index 7489bf6..6e0076a 100644
--- a/fe/src/test/resources/sentry-site_oo_nogrant.xml.template
+++ b/fe/src/test/resources/sentry-site_oo_nogrant.xml.template
@@ -32,7 +32,7 @@
   </property>
   <property>
     <name>sentry.service.server.rpc-address</name>
-    <value>localhost</value>
+    <value>${INTERNAL_LISTEN_HOST}</value>
   </property>
   <property>
     <name>sentry.service.server.rpc-port</name>
@@ -40,14 +40,14 @@
   </property>
   <property>
     <name>sentry.service.client.server.rpc-address</name>
-    <value>localhost</value>
+    <value>${INTERNAL_LISTEN_HOST}</value>
   </property>
   <property>
   <!--
    TODO(IMPALA-5686): Remove this or the previous property when Sentry standardizes on one.
   -->
     <name>sentry.service.client.server.rpc-addresses</name>
-    <value>localhost</value>
+    <value>${INTERNAL_LISTEN_HOST}</value>
   </property>
   <property>
     <name>sentry.service.client.server.rpc-port</name>
@@ -55,7 +55,7 @@
   </property>
   <property>
     <name>sentry.store.jdbc.url</name>
-    <value>jdbc:postgresql://localhost:5432/${SENTRY_POLICY_DB}/;create=true</value>
+    <value>jdbc:postgresql://${INTERNAL_LISTEN_HOST}:5432/${SENTRY_POLICY_DB}/;create=true</value>
   </property>
   <property>
     <name>sentry.store.jdbc.user</name>
diff --git a/testdata/cluster/admin b/testdata/cluster/admin
index ca438db..ef00b4e 100755
--- a/testdata/cluster/admin
+++ b/testdata/cluster/admin
@@ -219,6 +219,8 @@ function create_cluster {
     ${MINIKDC_INIT} stop
   fi
 
+  echo "Hostname for internal communication: ${INTERNAL_LISTEN_HOST}" \
+       "and for external communication: ${EXTERNAL_LISTEN_HOST}"
   # For consistency, the first node will host all the master roles.
   for ((NODE_IDX=$NODE_COUNT; NODE_IDX >= 1; NODE_IDX--)); do
     NODE=${NODE_PREFIX}$NODE_IDX
diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.tmpl b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.tmpl
index 982b919..7eaff6f 100644
--- a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.tmpl
+++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.tmpl
@@ -64,7 +64,7 @@ DEFAULT</value>
   <!-- Location of the KMS key provider -->
   <property>
     <name>hadoop.security.key.provider.path</name>
-    <value>kms://http@127.0.0.1:9600/kms</value>
+    <value>kms://http@${INTERNAL_LISTEN_HOST}:9600/kms</value>
   </property>
 
   <!-- BEGIN Kerberos settings -->
diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl b/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl
index 717ae7c..6bf8cf8 100644
--- a/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl
+++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl
@@ -4,7 +4,11 @@
 <configuration>
   <property>
     <name>dfs.namenode.http-address</name>
-    <value>0.0.0.0:${HDFS_WEBUI_PORT}</value>
+    <value>${INTERNAL_LISTEN_HOST}:${HDFS_WEBUI_PORT}</value>
+  </property>
+  <property>
+    <name>dfs.namenode.http-address</name>
+    <value>${EXTERNAL_LISTEN_HOST}:${HDFS_WEBUI_PORT}</value>
   </property>
 
   <property>
@@ -31,22 +35,22 @@
 
   <property>
     <name>dfs.datanode.address</name>
-    <value>127.0.0.1:${DATANODE_PORT}</value>
+    <value>${INTERNAL_LISTEN_HOST}:${DATANODE_PORT}</value>
   </property>
 
   <property>
     <name>dfs.datanode.http.address</name>
-    <value>127.0.0.1:${DATANODE_HTTP_PORT}</value>
+    <value>${INTERNAL_LISTEN_HOST}:${DATANODE_HTTP_PORT}</value>
   </property>
 
   <property>
     <name>dfs.datanode.ipc.address</name>
-    <value>127.0.0.1:${DATANODE_IPC_PORT}</value>
+    <value>${INTERNAL_LISTEN_HOST}:${DATANODE_IPC_PORT}</value>
   </property>
 
   <property>
     <name>dfs.datanode.https.address</name>
-    <value>127.0.0.1:${DATANODE_HTTPS_PORT}</value>
+    <value>${INTERNAL_LISTEN_HOST}:${DATANODE_HTTPS_PORT}</value>
   </property>
 
   <!-- Configuration to enable disk location metadata -->
@@ -126,7 +130,7 @@
   <!-- Location of the KMS key provider -->
   <property>
     <name>dfs.encryption.key.provider.uri</name>
-    <value>kms://http@127.0.0.1:9600/kms</value>
+    <value>kms://http@${INTERNAL_LISTEN_HOST}:9600/kms</value>
   </property>
 
 
diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/yarn-site.xml.tmpl b/testdata/cluster/node_templates/common/etc/hadoop/conf/yarn-site.xml.tmpl
index 0dd7bec..036a21c 100644
--- a/testdata/cluster/node_templates/common/etc/hadoop/conf/yarn-site.xml.tmpl
+++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/yarn-site.xml.tmpl
@@ -5,12 +5,12 @@
 <configuration>
   <property>
     <name>yarn.resourcemanager.webapp.address</name>
-    <value>0.0.0.0:${YARN_WEBUI_PORT}</value>
+    <value>${EXTERNAL_LISTEN_HOST}:${YARN_WEBUI_PORT}</value>
   </property>
 
   <property>
     <name>yarn.nodemanager.address</name>
-    <value>127.0.0.1:${NODEMANAGER_PORT}</value>
+    <value>${INTERNAL_LISTEN_HOST}:${NODEMANAGER_PORT}</value>
   </property>
 
   <property>
@@ -50,12 +50,12 @@
 
   <property>
     <name>yarn.nodemanager.localizer.address</name>
-    <value>127.0.0.1:${NODEMANAGER_LOCALIZER_PORT}</value>
+    <value>${INTERNAL_LISTEN_HOST}:${NODEMANAGER_LOCALIZER_PORT}</value>
   </property>
 
   <property>
     <name>yarn.nodemanager.webapp.address</name>
-    <value>127.0.0.1:${NODEMANAGER_WEBUI_PORT}</value>
+    <value>${INTERNAL_LISTEN_HOST}:${NODEMANAGER_WEBUI_PORT}</value>
   </property>
 
   <property>
diff --git a/tests/custom_cluster/test_breakpad.py b/tests/custom_cluster/test_breakpad.py
index ed3e427..ff7aa12 100644
--- a/tests/custom_cluster/test_breakpad.py
+++ b/tests/custom_cluster/test_breakpad.py
@@ -280,8 +280,15 @@ class TestBreakpadExhaustive(TestBreakpadBase):
     # Maximum number of minidumps that the impalads should keep for this test.
     max_minidumps = 2
     self.start_cluster_with_args(minidump_path=self.tmp_dir,
-                                 max_minidumps=max_minidumps)
-    assert self.count_minidumps('impalad') == min(cluster_size, max_minidumps)
+                                 max_minidumps=max_minidumps,
+                                 logbufsecs=1)
+    # Wait for log maintenance thread to clean up minidumps asynchronously.
+    start = time.time()
+    expected_impalad_minidumps = min(cluster_size, max_minidumps)
+    while (self.count_minidumps('impalad') != expected_impalad_minidumps
+        and time.time() - start < 10):
+      time.sleep(0.1)
+    assert self.count_minidumps('impalad') == expected_impalad_minidumps
     assert self.count_minidumps('statestored') == 1
     assert self.count_minidumps('catalogd') == 1