You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/07 01:31:39 UTC

[3/3] impala git commit: IMPALA-6642 (Part 2): clean up start-impala-cluster.py

IMPALA-6642 (Part 2): clean up start-impala-cluster.py

We clean up start-impala-cluster.py in general in this patch by using
logging instead of "print" and formatting strings using the format()
function. We make sure to include a timestamp in each log message in
order to make it easier to debug failures in custom cluster tests that
happen when starting the cluster.

Change-Id: I60169203c61ae6bc0a3ccd3dea355799b603efe5
Reviewed-on: http://gerrit.cloudera.org:8080/10780
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/30d196fd
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/30d196fd
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/30d196fd

Branch: refs/heads/master
Commit: 30d196fd50f44519a5289d94388bdcbe970923d1
Parents: 837d386
Author: Taras Bobrovytsky <ta...@apache.org>
Authored: Mon Jun 18 14:34:35 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Sat Jul 7 01:10:36 2018 +0000

----------------------------------------------------------------------
 bin/start-impala-cluster.py    | 227 +++++++++++++++++++++++-------------
 tests/common/impala_service.py |   3 +-
 2 files changed, 147 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/30d196fd/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 6856594..4d34d45 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -20,18 +20,24 @@
 # Starts up an Impala cluster (ImpalaD + State Store) with the specified number of
 # ImpalaD instances. Each ImpalaD runs on a different port allowing this to be run
 # on a single machine.
+import logging
 import os
 import psutil
 import sys
+from datetime import datetime
 from getpass import getuser
 from time import sleep, time
 from optparse import OptionParser, SUPPRESS_HELP
 from testdata.common import cgroups
 from tests.common.environ import specific_build_type_timeout
 
-KUDU_MASTER_HOSTS = os.getenv('KUDU_MASTER_HOSTS', '127.0.0.1')
-DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get('IMPALA_MAX_LOG_FILES', 10)
+logging.basicConfig(level=logging.ERROR, format="%(asctime)s %(threadName)s: %(message)s",
+    datefmt="%H:%M:%S")
+LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
+LOG.setLevel(level=logging.DEBUG)
 
+KUDU_MASTER_HOSTS = os.getenv("KUDU_MASTER_HOSTS", "127.0.0.1")
+DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get("IMPALA_MAX_LOG_FILES", 10)
 
 # Options
 parser = OptionParser()
@@ -43,7 +49,7 @@ parser.add_option("--use_exclusive_coordinators", dest="use_exclusive_coordinato
                   action="store_true", default=False, help="If true, coordinators only "
                   "coordinate queries and execute coordinator fragments. If false, "
                   "coordinators also act as executors.")
-parser.add_option("--build_type", dest="build_type", default= 'latest',
+parser.add_option("--build_type", dest="build_type", default= "latest",
                   help="Build type to use - debug / release / latest")
 parser.add_option("--impalad_args", dest="impalad_args", action="append", type="string",
                   default=[],
@@ -67,10 +73,10 @@ parser.add_option("-r", "--restart_impalad_only", dest="restart_impalad_only",
 parser.add_option("--in-process", dest="inprocess", action="store_true", default=False,
                   help="Start all Impala backends and state store in a single process.")
 parser.add_option("--log_dir", dest="log_dir",
-                  default=os.environ['IMPALA_CLUSTER_LOGS_DIR'],
+                  default=os.environ["IMPALA_CLUSTER_LOGS_DIR"],
                   help="Directory to store output logs to.")
-parser.add_option('--max_log_files', default=DEFAULT_IMPALA_MAX_LOG_FILES,
-                  help='Max number of log files before rotation occurs.')
+parser.add_option("--max_log_files", default=DEFAULT_IMPALA_MAX_LOG_FILES,
+                  help="Max number of log files before rotation occurs.")
 parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False,
                   help="Prints all output to stderr/stdout.")
 parser.add_option("--log_level", type="int", dest="log_level", default=1,
@@ -92,21 +98,19 @@ parser.add_option("--per_impalad_args", dest="per_impalad_args", type="string"
 
 options, args = parser.parse_args()
 
-IMPALA_HOME = os.environ['IMPALA_HOME']
-KNOWN_BUILD_TYPES = ['debug', 'release', 'latest']
+IMPALA_HOME = os.environ["IMPALA_HOME"]
+KNOWN_BUILD_TYPES = ["debug", "release", "latest"]
 IMPALAD_PATH = os.path.join(IMPALA_HOME,
-    'bin/start-impalad.sh -build_type=%s' % options.build_type)
+    "bin/start-impalad.sh -build_type={build_type}".format(
+        build_type=options.build_type))
 STATE_STORE_PATH = os.path.join(IMPALA_HOME,
-    'bin/start-statestored.sh -build_type=%s' % options.build_type)
+    "bin/start-statestored.sh -build_type={build_type}".format(
+        build_type=options.build_type))
 CATALOGD_PATH = os.path.join(IMPALA_HOME,
-    'bin/start-catalogd.sh -build_type=%s' % options.build_type)
+    "bin/start-catalogd.sh -build_type={build_type}".format(
+        build_type=options.build_type))
 MINI_IMPALA_CLUSTER_PATH = IMPALAD_PATH + " -in-process"
 
-IMPALA_SHELL = os.path.join(IMPALA_HOME, 'bin/impala-shell.sh')
-IMPALAD_PORTS = ("-beeswax_port=%d -hs2_port=%d  -be_port=%d -krpc_port=%d "
-                 "-state_store_subscriber_port=%d -webserver_port=%d")
-JVM_ARGS = "-jvm_debug_port=%s -jvm_args=%s"
-BE_LOGGING_ARGS = "-log_filename=%s -log_dir=%s -v=%s -logbufsecs=5 -max_log_files=%s"
 CLUSTER_WAIT_TIMEOUT_IN_SECONDS = 240
 # Kills have a timeout to prevent automated scripts from hanging indefinitely.
 # It is set to a high value to avoid failing if processes are slow to shut down.
@@ -143,14 +147,18 @@ def check_process_exists(binary, attempts=1):
 def exec_impala_process(cmd, args, stderr_log_file_path):
   redirect_output = str()
   if options.verbose:
-    args += ' -logtostderr=1'
+    args += " -logtostderr=1"
   else:
-    redirect_output = "1>%s" % stderr_log_file_path
-  cmd = '%s %s %s 2>&1 &' % (cmd, args, redirect_output)
+    redirect_output = "1>{stderr_log_file_path}".format(
+        stderr_log_file_path=stderr_log_file_path)
+  cmd = "{cmd} {args} {redirect_output} 2>&1 &".format(
+      cmd=cmd,
+      args=args,
+      redirect_output=redirect_output)
   os.system(cmd)
 
 def kill_cluster_processes(force=False):
-  binaries = ['catalogd', 'impalad', 'statestored']
+  binaries = ["catalogd", "impalad", "statestored"]
   kill_matching_processes(binaries, force)
 
 def kill_matching_processes(binary_names, force=False):
@@ -170,50 +178,75 @@ def kill_matching_processes(binary_names, force=False):
     try:
       process.wait(KILL_TIMEOUT_IN_SECONDS)
     except psutil.TimeoutExpired:
-      raise RuntimeError("Unable to kill %s (pid %d) after %d seconds." % (process.name,
-          process.pid, KILL_TIMEOUT_IN_SECONDS))
+      raise RuntimeError(("Unable to kill {process_name} (pid {process_pid}) "
+          "after {num_seconds} seconds.").format(
+              process_name=process.name,
+              process_pid=process.pid,
+              num_seconds=KILL_TIMEOUT_IN_SECONDS))
 
 def start_statestore():
-  print "Starting State Store logging to %s/statestored.INFO" % options.log_dir
+  LOG.info("Starting State Store logging to {log_dir}/statestored.INFO".format(
+      log_dir=options.log_dir))
   stderr_log_file_path = os.path.join(options.log_dir, "statestore-error.log")
-  args = "%s %s" % (build_impalad_logging_args(0, "statestored"),
-                    " ".join(options.state_store_args))
+  args = "{impalad_logging_args} {state_store_args}".format(
+      impalad_logging_args=build_impalad_logging_args(0, "statestored"),
+      state_store_args=" ".join(options.state_store_args))
   exec_impala_process(STATE_STORE_PATH, args, stderr_log_file_path)
   if not check_process_exists("statestored", 10):
     raise RuntimeError("Unable to start statestored. Check log or file permissions"
                        " for more details.")
 
 def start_catalogd():
-  print "Starting Catalog Service logging to %s/catalogd.INFO" % options.log_dir
+  LOG.info("Starting Catalog Service logging to {log_dir}/catalogd.INFO".format(
+      log_dir=options.log_dir))
   stderr_log_file_path = os.path.join(options.log_dir, "catalogd-error.log")
-  args = "%s %s %s" % (build_impalad_logging_args(0, "catalogd"),
-                       " ".join(options.catalogd_args),
-                       build_jvm_args(options.cluster_size))
+  args = "{impalad_logging_args} {catalogd_args} {jvm_args}".format(
+      impalad_logging_args=build_impalad_logging_args(0, "catalogd"),
+      catalogd_args=" ".join(options.catalogd_args),
+      jvm_args=build_jvm_args(options.cluster_size))
   exec_impala_process(CATALOGD_PATH, args, stderr_log_file_path)
   if not check_process_exists("catalogd", 10):
     raise RuntimeError("Unable to start catalogd. Check log or file permissions"
                        " for more details.")
 
 def build_impalad_port_args(instance_num):
+  IMPALAD_PORTS = (
+      "-beeswax_port={beeswax_port} "
+      "-hs2_port={hs2_port} "
+      "-be_port={be_port} "
+      "-krpc_port={krpc_port} "
+      "-state_store_subscriber_port={state_store_subscriber_port} "
+      "-webserver_port={webserver_port}")
   BASE_BEESWAX_PORT = 21000
   BASE_HS2_PORT = 21050
   BASE_BE_PORT = 22000
   BASE_KRPC_PORT = 27000
   BASE_STATE_STORE_SUBSCRIBER_PORT = 23000
   BASE_WEBSERVER_PORT = 25000
-  return IMPALAD_PORTS % (BASE_BEESWAX_PORT + instance_num, BASE_HS2_PORT + instance_num,
-                          BASE_BE_PORT + instance_num,
-                          BASE_KRPC_PORT + instance_num,
-                          BASE_STATE_STORE_SUBSCRIBER_PORT + instance_num,
-                          BASE_WEBSERVER_PORT + instance_num)
+  return IMPALAD_PORTS.format(
+      beeswax_port=BASE_BEESWAX_PORT + instance_num,
+      hs2_port=BASE_HS2_PORT + instance_num,
+      be_port=BASE_BE_PORT + instance_num,
+      krpc_port=BASE_KRPC_PORT + instance_num,
+      state_store_subscriber_port=BASE_STATE_STORE_SUBSCRIBER_PORT + instance_num,
+      webserver_port=BASE_WEBSERVER_PORT + instance_num)
 
 def build_impalad_logging_args(instance_num, service_name):
-  return BE_LOGGING_ARGS % (service_name, options.log_dir, options.log_level,
-                            options.max_log_files)
+  return ("-log_filename={log_filename} "
+      "-log_dir={log_dir} "
+      "-v={log_level} "
+      "-logbufsecs=5 "
+      "-max_log_files={max_log_files}").format(
+          log_filename=service_name,
+          log_dir=options.log_dir,
+          log_level=options.log_level,
+          max_log_files=options.max_log_files)
 
 def build_jvm_args(instance_num):
   BASE_JVM_DEBUG_PORT = 30000
-  return JVM_ARGS % (BASE_JVM_DEBUG_PORT + instance_num, options.jvm_args)
+  return "-jvm_debug_port={jvm_debug_port} -jvm_args={jvm_args}".format(
+      jvm_debug_port=BASE_JVM_DEBUG_PORT + instance_num,
+      jvm_args=options.jvm_args)
 
 def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordinators):
   """Start 'cluster_size' impalad instances. The first 'num_coordinator' instances will
@@ -253,45 +286,62 @@ def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordi
       # The first impalad always logs to impalad.INFO
       service_name = "impalad"
     else:
-      service_name = "impalad_node%s" % i
+      service_name = "impalad_node{node_num}".format(node_num=i)
       # Sleep between instance startup: simultaneous starts hurt the minikdc
       # Yes, this is a hack, but it's easier than modifying the minikdc...
       # TODO: is this really necessary?
       sleep(1)
 
-    print "Starting Impala Daemon logging to %s/%s.INFO" % (options.log_dir,
-        service_name)
+    LOG.info("Starting Impala Daemon logging to {log_dir}/{service_name}.INFO".format(
+        log_dir=options.log_dir,
+        service_name=service_name))
 
     # impalad args from the --impalad_args flag. Also replacing '#ID' with the instance.
     param_args = (" ".join(options.impalad_args)).replace("#ID", str(i))
-    args = "--mem_limit=%s %s %s %s %s" %\
-          (mem_limit,  # Goes first so --impalad_args will override it.
-           build_impalad_logging_args(i, service_name), build_jvm_args(i),
-           build_impalad_port_args(i), param_args)
+    args = ("--mem_limit={mem_limit} "
+        "{impala_logging_args} "
+        "{jvm_args} "
+        "{impala_port_args} "
+        "{param_args}").format(
+            mem_limit=mem_limit,  # Goes first so --impalad_args will override it.
+            impala_logging_args=build_impalad_logging_args(i, service_name),
+            jvm_args=build_jvm_args(i),
+            impala_port_args=build_impalad_port_args(i),
+            param_args=param_args)
     if options.kudu_master_hosts:
       # Must be prepended, otherwise the java options interfere.
-      args = "-kudu_master_hosts %s %s" % (options.kudu_master_hosts, args)
+      args = "-kudu_master_hosts {kudu_master_hosts} {args}".format(
+          kudu_master_hosts=options.kudu_master_hosts,
+          args=args)
 
     if "kudu_client_rpc_timeout" not in args:
-      args = "-kudu_client_rpc_timeout_ms %s %s" % (KUDU_RPC_TIMEOUT, args)
+      args = "-kudu_client_rpc_timeout_ms {kudu_rpc_timeout} {args}".format(
+          kudu_rpc_timeout=KUDU_RPC_TIMEOUT,
+          args=args)
 
     if i >= num_coordinators:
-      args = "-is_coordinator=false %s" % (args)
+      args = "-is_coordinator=false {args}".format(args=args)
     elif use_exclusive_coordinators:
       # Coordinator instance that doesn't execute non-coordinator fragments
-      args = "-is_executor=false %s" % (args)
+      args = "-is_executor=false {args}".format(args=args)
 
     if i < len(delay_list):
-      args = "-stress_catalog_init_delay_ms=%s %s" % (delay_list[i], args)
+      args = "-stress_catalog_init_delay_ms={delay} {args}".format(
+          delay=delay_list[i],
+          args=args)
 
     if options.disable_krpc:
-      args = "-use_krpc=false %s" % (args)
+      args = "-use_krpc=false {args}".format(args=args)
 
     # Appended at the end so they can override previous args.
     if i < len(per_impalad_args):
-      args = "%s %s" % (args, per_impalad_args[i])
+      args = "{args} {per_impalad_args}".format(
+          args=args,
+          per_impalad_args=per_impalad_args[i])
 
-    stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % service_name)
+    stderr_log_file_path = os.path.join(
+        options.log_dir,
+        "{service_name}-error.log".format(service_name=service_name))
     exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
 
 def wait_for_impala_process_count(impala_cluster, retries=10):
@@ -309,8 +359,9 @@ def wait_for_impala_process_count(impala_cluster, retries=10):
   msg = str()
   if len(impala_cluster.impalads) < options.cluster_size:
     impalads_found = len(impala_cluster.impalads)
-    msg += "Expected %d impalad(s), only %d found\n" %\
-        (options.cluster_size, impalads_found)
+    msg += "Expected {expected_num} impalad(s), only {actual_num} found\n".format(
+        expected_num=options.cluster_size,
+        actual_num=impalads_found)
   if not impala_cluster.statestored:
     msg += "statestored failed to start.\n"
   if not impala_cluster.catalogd:
@@ -341,8 +392,8 @@ def wait_for_cluster_web(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
   for impalad in impala_cluster.impalads:
     impalad.service.wait_for_num_known_live_backends(expected_num_backends,
         timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
-    if impalad._get_arg_value('is_coordinator', default='true') == 'true' and \
-       impalad._get_arg_value('stress_catalog_init_delay_ms', default=0) == 0:
+    if impalad._get_arg_value("is_coordinator", default="true") == "true" and \
+       impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) == 0:
       wait_for_catalog(impalad)
 
 def wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
@@ -356,29 +407,35 @@ def wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS
   while (time() - start_time < timeout_in_seconds):
     try:
       num_dbs, num_tbls = impalad.service.get_metric_values(
-          ['catalog.num-databases', 'catalog.num-tables'])
+          ["catalog.num-databases", "catalog.num-tables"])
       client_beeswax = impalad.service.create_beeswax_client()
       client_hs2 = impalad.service.create_hs2_client()
       break
     except Exception as e:
-      print 'Client services not ready.'
-      print 'Waiting for catalog cache: (%s DBs / %s tables). Trying again ...' %\
-        (num_dbs, num_tbls)
+      LOG.exception(("Client services not ready. Waiting for catalog cache: "
+          "({num_dbs} DBs / {num_tbls} tables). Trying again ...").format(
+              num_dbs=num_dbs,
+              num_tbls=num_tbls))
     finally:
       if client_beeswax is not None: client_beeswax.close()
     sleep(0.5)
 
   if client_beeswax is None or client_hs2 is None:
-    raise RuntimeError('Unable to open client ports within %s seconds.'\
-                       % timeout_in_seconds)
+    raise RuntimeError("Unable to open client ports within {num_seconds} seconds.".format(
+        num_seconds=timeout_in_seconds))
 
 def wait_for_cluster_cmdline(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
   """Checks if the cluster is "ready" by executing a simple query in a loop"""
   start_time = time()
-  while os.system('%s -i localhost:21000 -q "%s"' %  (IMPALA_SHELL, 'select 1')) != 0:
+  IMPALA_SHELL = os.path.join(IMPALA_HOME, "bin/impala-shell.sh")
+  cmd = "{impala_shell} -i localhost:21000 -q '{query}'".format(
+      impala_shell=IMPALA_SHELL,
+      query="select 1")
+  while os.system(cmd) != 0:
     if time() - timeout_in_seconds > start_time:
-      raise RuntimeError('Cluster did not start within %d seconds' % timeout_in_seconds)
-    print 'Cluster not yet available. Sleeping...'
+      raise RuntimeError("Cluster did not start within {num_seconds} seconds".format(
+        num_seconds=timeout_in_seconds))
+    LOG.info("Cluster not yet available. Sleeping...")
     sleep(2)
 
 if __name__ == "__main__":
@@ -387,32 +444,35 @@ if __name__ == "__main__":
     sys.exit(0)
 
   if options.build_type not in KNOWN_BUILD_TYPES:
-    print 'Invalid build type %s' % options.build_type
-    print 'Valid values: %s' % ', '.join(KNOWN_BUILD_TYPES)
+    LOG.error("Invalid build type {0}".format(options.build_type))
+    LOG.error("Valid values: {0}".format(", ".join(KNOWN_BUILD_TYPES)))
     sys.exit(1)
 
   if options.cluster_size < 0:
-    print 'Please specify a cluster size >= 0'
+    LOG.error("Please specify a cluster size >= 0")
     sys.exit(1)
 
   if options.num_coordinators <= 0:
-    print 'Please specify a valid number of coordinators > 0'
+    LOG.error("Please specify a valid number of coordinators > 0")
     sys.exit(1)
 
-  if options.use_exclusive_coordinators and options.num_coordinators >= options.cluster_size:
-    print 'Cannot start an Impala cluster with no executors'
+  if (options.use_exclusive_coordinators and
+      options.num_coordinators >= options.cluster_size):
+    LOG.error("Cannot start an Impala cluster with no executors")
     sys.exit(1)
 
   if not os.path.isdir(options.log_dir):
-    print 'Log dir does not exist or is not a directory: %s' % options.log_dir
+    LOG.error("Log dir does not exist or is not a directory: {log_dir}".format(
+        log_dir=options.log_dir))
     sys.exit(1)
 
   # Kill existing cluster processes based on the current configuration.
   if options.restart_impalad_only:
     if options.inprocess:
-      print 'Cannot perform individual component restarts using an in-process cluster'
+      LOG.error(
+          "Cannot perform individual component restarts using an in-process cluster")
       sys.exit(1)
-    kill_matching_processes(['impalad'], force=options.force_kill)
+    kill_matching_processes(["impalad"], force=options.force_kill)
   else:
     kill_cluster_processes(force=options.force_kill)
 
@@ -420,7 +480,8 @@ if __name__ == "__main__":
     import json
     wait_for_cluster = wait_for_cluster_web
   except ImportError:
-    print "json module not found, checking for cluster startup through the command-line"
+    LOG.exception("json module not found, checking "
+        "for cluster startup through the command-line")
     wait_for_cluster = wait_for_cluster_cmdline
 
   # If ImpalaCluster cannot be imported, fall back to the command-line to check
@@ -430,10 +491,11 @@ if __name__ == "__main__":
     if options.restart_impalad_only:
       impala_cluster = ImpalaCluster()
       if not impala_cluster.statestored or not impala_cluster.catalogd:
-        print 'No running statestored or catalogd detected. Restarting entire cluster.'
+        LOG.info("No running statestored or catalogd detected. "
+            "Restarting entire cluster.")
         options.restart_impalad_only = False
   except ImportError:
-    print 'ImpalaCluster module not found.'
+    LOG.exception("ImpalaCluster module not found.")
     # TODO: Update this code path to work similar to the ImpalaCluster code path when
     # restarting only impalad processes. Specifically, we should do a full cluster
     # restart if either the statestored or catalogd processes are down, even if
@@ -452,14 +514,15 @@ if __name__ == "__main__":
     # Check for the cluster to be ready.
     wait_for_cluster()
   except Exception, e:
-    print 'Error starting cluster: %s' % e
+    LOG.exception("Error starting cluster")
     sys.exit(1)
 
   if options.use_exclusive_coordinators == True:
     executors = options.cluster_size - options.num_coordinators
   else:
     executors = options.cluster_size
-  print 'Impala Cluster Running with %d nodes (%d coordinators, %d executors).' % (
-      options.cluster_size,
-      min(options.cluster_size, options.num_coordinators),
-      executors)
+  LOG.info(("Impala Cluster Running with {num_nodes} nodes "
+      "({num_coordinators} coordinators, {num_executors} executors).").format(
+          num_nodes=options.cluster_size,
+          num_coordinators=min(options.cluster_size, options.num_coordinators),
+          num_executors=executors))

http://git-wip-us.apache.org/repos/asf/impala/blob/30d196fd/tests/common/impala_service.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index 9772b13..0934f78 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -35,7 +35,8 @@ from RuntimeProfile.ttypes import TRuntimeProfileTree
 import base64
 import zlib
 
-logging.basicConfig(level=logging.ERROR, format='%(threadName)s: %(message)s')
+logging.basicConfig(level=logging.ERROR, format='%(asctime)s %(threadName)s: %(message)s',
+    datefmt='%H:%M:%S')
 LOG = logging.getLogger('impala_service')
 LOG.setLevel(level=logging.DEBUG)