You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/07 01:31:39 UTC
[3/3] impala git commit: IMPALA-6642 (Part 2): clean up
start-impala-cluster.py
IMPALA-6642 (Part 2): clean up start-impala-cluster.py
We clean up start-impala-cluster.py in general in this patch by using
logging instead of "print" and formatting strings using the format()
function. We make sure to include a timestamp in each log message in
order to make it easier to debug failures in custom cluster tests that
happen when starting the cluster.
Change-Id: I60169203c61ae6bc0a3ccd3dea355799b603efe5
Reviewed-on: http://gerrit.cloudera.org:8080/10780
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/30d196fd
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/30d196fd
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/30d196fd
Branch: refs/heads/master
Commit: 30d196fd50f44519a5289d94388bdcbe970923d1
Parents: 837d386
Author: Taras Bobrovytsky <ta...@apache.org>
Authored: Mon Jun 18 14:34:35 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Sat Jul 7 01:10:36 2018 +0000
----------------------------------------------------------------------
bin/start-impala-cluster.py | 227 +++++++++++++++++++++++-------------
tests/common/impala_service.py | 3 +-
2 files changed, 147 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/30d196fd/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 6856594..4d34d45 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -20,18 +20,24 @@
# Starts up an Impala cluster (ImpalaD + State Store) with the specified number of
# ImpalaD instances. Each ImpalaD runs on a different port allowing this to be run
# on a single machine.
+import logging
import os
import psutil
import sys
+from datetime import datetime
from getpass import getuser
from time import sleep, time
from optparse import OptionParser, SUPPRESS_HELP
from testdata.common import cgroups
from tests.common.environ import specific_build_type_timeout
-KUDU_MASTER_HOSTS = os.getenv('KUDU_MASTER_HOSTS', '127.0.0.1')
-DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get('IMPALA_MAX_LOG_FILES', 10)
+logging.basicConfig(level=logging.ERROR, format="%(asctime)s %(threadName)s: %(message)s",
+ datefmt="%H:%M:%S")
+LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
+LOG.setLevel(level=logging.DEBUG)
+KUDU_MASTER_HOSTS = os.getenv("KUDU_MASTER_HOSTS", "127.0.0.1")
+DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get("IMPALA_MAX_LOG_FILES", 10)
# Options
parser = OptionParser()
@@ -43,7 +49,7 @@ parser.add_option("--use_exclusive_coordinators", dest="use_exclusive_coordinato
action="store_true", default=False, help="If true, coordinators only "
"coordinate queries and execute coordinator fragments. If false, "
"coordinators also act as executors.")
-parser.add_option("--build_type", dest="build_type", default= 'latest',
+parser.add_option("--build_type", dest="build_type", default= "latest",
help="Build type to use - debug / release / latest")
parser.add_option("--impalad_args", dest="impalad_args", action="append", type="string",
default=[],
@@ -67,10 +73,10 @@ parser.add_option("-r", "--restart_impalad_only", dest="restart_impalad_only",
parser.add_option("--in-process", dest="inprocess", action="store_true", default=False,
help="Start all Impala backends and state store in a single process.")
parser.add_option("--log_dir", dest="log_dir",
- default=os.environ['IMPALA_CLUSTER_LOGS_DIR'],
+ default=os.environ["IMPALA_CLUSTER_LOGS_DIR"],
help="Directory to store output logs to.")
-parser.add_option('--max_log_files', default=DEFAULT_IMPALA_MAX_LOG_FILES,
- help='Max number of log files before rotation occurs.')
+parser.add_option("--max_log_files", default=DEFAULT_IMPALA_MAX_LOG_FILES,
+ help="Max number of log files before rotation occurs.")
parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False,
help="Prints all output to stderr/stdout.")
parser.add_option("--log_level", type="int", dest="log_level", default=1,
@@ -92,21 +98,19 @@ parser.add_option("--per_impalad_args", dest="per_impalad_args", type="string"
options, args = parser.parse_args()
-IMPALA_HOME = os.environ['IMPALA_HOME']
-KNOWN_BUILD_TYPES = ['debug', 'release', 'latest']
+IMPALA_HOME = os.environ["IMPALA_HOME"]
+KNOWN_BUILD_TYPES = ["debug", "release", "latest"]
IMPALAD_PATH = os.path.join(IMPALA_HOME,
- 'bin/start-impalad.sh -build_type=%s' % options.build_type)
+ "bin/start-impalad.sh -build_type={build_type}".format(
+ build_type=options.build_type))
STATE_STORE_PATH = os.path.join(IMPALA_HOME,
- 'bin/start-statestored.sh -build_type=%s' % options.build_type)
+ "bin/start-statestored.sh -build_type={build_type}".format(
+ build_type=options.build_type))
CATALOGD_PATH = os.path.join(IMPALA_HOME,
- 'bin/start-catalogd.sh -build_type=%s' % options.build_type)
+ "bin/start-catalogd.sh -build_type={build_type}".format(
+ build_type=options.build_type))
MINI_IMPALA_CLUSTER_PATH = IMPALAD_PATH + " -in-process"
-IMPALA_SHELL = os.path.join(IMPALA_HOME, 'bin/impala-shell.sh')
-IMPALAD_PORTS = ("-beeswax_port=%d -hs2_port=%d -be_port=%d -krpc_port=%d "
- "-state_store_subscriber_port=%d -webserver_port=%d")
-JVM_ARGS = "-jvm_debug_port=%s -jvm_args=%s"
-BE_LOGGING_ARGS = "-log_filename=%s -log_dir=%s -v=%s -logbufsecs=5 -max_log_files=%s"
CLUSTER_WAIT_TIMEOUT_IN_SECONDS = 240
# Kills have a timeout to prevent automated scripts from hanging indefinitely.
# It is set to a high value to avoid failing if processes are slow to shut down.
@@ -143,14 +147,18 @@ def check_process_exists(binary, attempts=1):
def exec_impala_process(cmd, args, stderr_log_file_path):
redirect_output = str()
if options.verbose:
- args += ' -logtostderr=1'
+ args += " -logtostderr=1"
else:
- redirect_output = "1>%s" % stderr_log_file_path
- cmd = '%s %s %s 2>&1 &' % (cmd, args, redirect_output)
+ redirect_output = "1>{stderr_log_file_path}".format(
+ stderr_log_file_path=stderr_log_file_path)
+ cmd = "{cmd} {args} {redirect_output} 2>&1 &".format(
+ cmd=cmd,
+ args=args,
+ redirect_output=redirect_output)
os.system(cmd)
def kill_cluster_processes(force=False):
- binaries = ['catalogd', 'impalad', 'statestored']
+ binaries = ["catalogd", "impalad", "statestored"]
kill_matching_processes(binaries, force)
def kill_matching_processes(binary_names, force=False):
@@ -170,50 +178,75 @@ def kill_matching_processes(binary_names, force=False):
try:
process.wait(KILL_TIMEOUT_IN_SECONDS)
except psutil.TimeoutExpired:
- raise RuntimeError("Unable to kill %s (pid %d) after %d seconds." % (process.name,
- process.pid, KILL_TIMEOUT_IN_SECONDS))
+ raise RuntimeError(("Unable to kill {process_name} (pid {process_pid}) "
+ "after {num_seconds} seconds.").format(
+ process_name=process.name,
+ process_pid=process.pid,
+ num_seconds=KILL_TIMEOUT_IN_SECONDS))
def start_statestore():
- print "Starting State Store logging to %s/statestored.INFO" % options.log_dir
+ LOG.info("Starting State Store logging to {log_dir}/statestored.INFO".format(
+ log_dir=options.log_dir))
stderr_log_file_path = os.path.join(options.log_dir, "statestore-error.log")
- args = "%s %s" % (build_impalad_logging_args(0, "statestored"),
- " ".join(options.state_store_args))
+ args = "{impalad_logging_args} {state_store_args}".format(
+ impalad_logging_args=build_impalad_logging_args(0, "statestored"),
+ state_store_args=" ".join(options.state_store_args))
exec_impala_process(STATE_STORE_PATH, args, stderr_log_file_path)
if not check_process_exists("statestored", 10):
raise RuntimeError("Unable to start statestored. Check log or file permissions"
" for more details.")
def start_catalogd():
- print "Starting Catalog Service logging to %s/catalogd.INFO" % options.log_dir
+ LOG.info("Starting Catalog Service logging to {log_dir}/catalogd.INFO".format(
+ log_dir=options.log_dir))
stderr_log_file_path = os.path.join(options.log_dir, "catalogd-error.log")
- args = "%s %s %s" % (build_impalad_logging_args(0, "catalogd"),
- " ".join(options.catalogd_args),
- build_jvm_args(options.cluster_size))
+ args = "{impalad_logging_args} {catalogd_args} {jvm_args}".format(
+ impalad_logging_args=build_impalad_logging_args(0, "catalogd"),
+ catalogd_args=" ".join(options.catalogd_args),
+ jvm_args=build_jvm_args(options.cluster_size))
exec_impala_process(CATALOGD_PATH, args, stderr_log_file_path)
if not check_process_exists("catalogd", 10):
raise RuntimeError("Unable to start catalogd. Check log or file permissions"
" for more details.")
def build_impalad_port_args(instance_num):
+ IMPALAD_PORTS = (
+ "-beeswax_port={beeswax_port} "
+ "-hs2_port={hs2_port} "
+ "-be_port={be_port} "
+ "-krpc_port={krpc_port} "
+ "-state_store_subscriber_port={state_store_subscriber_port} "
+ "-webserver_port={webserver_port}")
BASE_BEESWAX_PORT = 21000
BASE_HS2_PORT = 21050
BASE_BE_PORT = 22000
BASE_KRPC_PORT = 27000
BASE_STATE_STORE_SUBSCRIBER_PORT = 23000
BASE_WEBSERVER_PORT = 25000
- return IMPALAD_PORTS % (BASE_BEESWAX_PORT + instance_num, BASE_HS2_PORT + instance_num,
- BASE_BE_PORT + instance_num,
- BASE_KRPC_PORT + instance_num,
- BASE_STATE_STORE_SUBSCRIBER_PORT + instance_num,
- BASE_WEBSERVER_PORT + instance_num)
+ return IMPALAD_PORTS.format(
+ beeswax_port=BASE_BEESWAX_PORT + instance_num,
+ hs2_port=BASE_HS2_PORT + instance_num,
+ be_port=BASE_BE_PORT + instance_num,
+ krpc_port=BASE_KRPC_PORT + instance_num,
+ state_store_subscriber_port=BASE_STATE_STORE_SUBSCRIBER_PORT + instance_num,
+ webserver_port=BASE_WEBSERVER_PORT + instance_num)
def build_impalad_logging_args(instance_num, service_name):
- return BE_LOGGING_ARGS % (service_name, options.log_dir, options.log_level,
- options.max_log_files)
+ return ("-log_filename={log_filename} "
+ "-log_dir={log_dir} "
+ "-v={log_level} "
+ "-logbufsecs=5 "
+ "-max_log_files={max_log_files}").format(
+ log_filename=service_name,
+ log_dir=options.log_dir,
+ log_level=options.log_level,
+ max_log_files=options.max_log_files)
def build_jvm_args(instance_num):
BASE_JVM_DEBUG_PORT = 30000
- return JVM_ARGS % (BASE_JVM_DEBUG_PORT + instance_num, options.jvm_args)
+ return "-jvm_debug_port={jvm_debug_port} -jvm_args={jvm_args}".format(
+ jvm_debug_port=BASE_JVM_DEBUG_PORT + instance_num,
+ jvm_args=options.jvm_args)
def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordinators):
"""Start 'cluster_size' impalad instances. The first 'num_coordinator' instances will
@@ -253,45 +286,62 @@ def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordi
# The first impalad always logs to impalad.INFO
service_name = "impalad"
else:
- service_name = "impalad_node%s" % i
+ service_name = "impalad_node{node_num}".format(node_num=i)
# Sleep between instance startup: simultaneous starts hurt the minikdc
# Yes, this is a hack, but it's easier than modifying the minikdc...
# TODO: is this really necessary?
sleep(1)
- print "Starting Impala Daemon logging to %s/%s.INFO" % (options.log_dir,
- service_name)
+ LOG.info("Starting Impala Daemon logging to {log_dir}/{service_name}.INFO".format(
+ log_dir=options.log_dir,
+ service_name=service_name))
# impalad args from the --impalad_args flag. Also replacing '#ID' with the instance.
param_args = (" ".join(options.impalad_args)).replace("#ID", str(i))
- args = "--mem_limit=%s %s %s %s %s" %\
- (mem_limit, # Goes first so --impalad_args will override it.
- build_impalad_logging_args(i, service_name), build_jvm_args(i),
- build_impalad_port_args(i), param_args)
+ args = ("--mem_limit={mem_limit} "
+ "{impala_logging_args} "
+ "{jvm_args} "
+ "{impala_port_args} "
+ "{param_args}").format(
+ mem_limit=mem_limit, # Goes first so --impalad_args will override it.
+ impala_logging_args=build_impalad_logging_args(i, service_name),
+ jvm_args=build_jvm_args(i),
+ impala_port_args=build_impalad_port_args(i),
+ param_args=param_args)
if options.kudu_master_hosts:
# Must be prepended, otherwise the java options interfere.
- args = "-kudu_master_hosts %s %s" % (options.kudu_master_hosts, args)
+ args = "-kudu_master_hosts {kudu_master_hosts} {args}".format(
+ kudu_master_hosts=options.kudu_master_hosts,
+ args=args)
if "kudu_client_rpc_timeout" not in args:
- args = "-kudu_client_rpc_timeout_ms %s %s" % (KUDU_RPC_TIMEOUT, args)
+ args = "-kudu_client_rpc_timeout_ms {kudu_rpc_timeout} {args}".format(
+ kudu_rpc_timeout=KUDU_RPC_TIMEOUT,
+ args=args)
if i >= num_coordinators:
- args = "-is_coordinator=false %s" % (args)
+ args = "-is_coordinator=false {args}".format(args=args)
elif use_exclusive_coordinators:
# Coordinator instance that doesn't execute non-coordinator fragments
- args = "-is_executor=false %s" % (args)
+ args = "-is_executor=false {args}".format(args=args)
if i < len(delay_list):
- args = "-stress_catalog_init_delay_ms=%s %s" % (delay_list[i], args)
+ args = "-stress_catalog_init_delay_ms={delay} {args}".format(
+ delay=delay_list[i],
+ args=args)
if options.disable_krpc:
- args = "-use_krpc=false %s" % (args)
+ args = "-use_krpc=false {args}".format(args=args)
# Appended at the end so they can override previous args.
if i < len(per_impalad_args):
- args = "%s %s" % (args, per_impalad_args[i])
+ args = "{args} {per_impalad_args}".format(
+ args=args,
+ per_impalad_args=per_impalad_args[i])
- stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % service_name)
+ stderr_log_file_path = os.path.join(
+ options.log_dir,
+ "{service_name}-error.log".format(service_name=service_name))
exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
def wait_for_impala_process_count(impala_cluster, retries=10):
@@ -309,8 +359,9 @@ def wait_for_impala_process_count(impala_cluster, retries=10):
msg = str()
if len(impala_cluster.impalads) < options.cluster_size:
impalads_found = len(impala_cluster.impalads)
- msg += "Expected %d impalad(s), only %d found\n" %\
- (options.cluster_size, impalads_found)
+ msg += "Expected {expected_num} impalad(s), only {actual_num} found\n".format(
+ expected_num=options.cluster_size,
+ actual_num=impalads_found)
if not impala_cluster.statestored:
msg += "statestored failed to start.\n"
if not impala_cluster.catalogd:
@@ -341,8 +392,8 @@ def wait_for_cluster_web(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
for impalad in impala_cluster.impalads:
impalad.service.wait_for_num_known_live_backends(expected_num_backends,
timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
- if impalad._get_arg_value('is_coordinator', default='true') == 'true' and \
- impalad._get_arg_value('stress_catalog_init_delay_ms', default=0) == 0:
+ if impalad._get_arg_value("is_coordinator", default="true") == "true" and \
+ impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) == 0:
wait_for_catalog(impalad)
def wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
@@ -356,29 +407,35 @@ def wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS
while (time() - start_time < timeout_in_seconds):
try:
num_dbs, num_tbls = impalad.service.get_metric_values(
- ['catalog.num-databases', 'catalog.num-tables'])
+ ["catalog.num-databases", "catalog.num-tables"])
client_beeswax = impalad.service.create_beeswax_client()
client_hs2 = impalad.service.create_hs2_client()
break
except Exception as e:
- print 'Client services not ready.'
- print 'Waiting for catalog cache: (%s DBs / %s tables). Trying again ...' %\
- (num_dbs, num_tbls)
+ LOG.exception(("Client services not ready. Waiting for catalog cache: "
+ "({num_dbs} DBs / {num_tbls} tables). Trying again ...").format(
+ num_dbs=num_dbs,
+ num_tbls=num_tbls))
finally:
if client_beeswax is not None: client_beeswax.close()
sleep(0.5)
if client_beeswax is None or client_hs2 is None:
- raise RuntimeError('Unable to open client ports within %s seconds.'\
- % timeout_in_seconds)
+ raise RuntimeError("Unable to open client ports within {num_seconds} seconds.".format(
+ num_seconds=timeout_in_seconds))
def wait_for_cluster_cmdline(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
"""Checks if the cluster is "ready" by executing a simple query in a loop"""
start_time = time()
- while os.system('%s -i localhost:21000 -q "%s"' % (IMPALA_SHELL, 'select 1')) != 0:
+ IMPALA_SHELL = os.path.join(IMPALA_HOME, "bin/impala-shell.sh")
+ cmd = "{impala_shell} -i localhost:21000 -q '{query}'".format(
+ impala_shell=IMPALA_SHELL,
+ query="select 1")
+ while os.system(cmd) != 0:
if time() - timeout_in_seconds > start_time:
- raise RuntimeError('Cluster did not start within %d seconds' % timeout_in_seconds)
- print 'Cluster not yet available. Sleeping...'
+ raise RuntimeError("Cluster did not start within {num_seconds} seconds".format(
+ num_seconds=timeout_in_seconds))
+ LOG.info("Cluster not yet available. Sleeping...")
sleep(2)
if __name__ == "__main__":
@@ -387,32 +444,35 @@ if __name__ == "__main__":
sys.exit(0)
if options.build_type not in KNOWN_BUILD_TYPES:
- print 'Invalid build type %s' % options.build_type
- print 'Valid values: %s' % ', '.join(KNOWN_BUILD_TYPES)
+ LOG.error("Invalid build type {0}".format(options.build_type))
+ LOG.error("Valid values: {0}".format(", ".join(KNOWN_BUILD_TYPES)))
sys.exit(1)
if options.cluster_size < 0:
- print 'Please specify a cluster size >= 0'
+ LOG.error("Please specify a cluster size >= 0")
sys.exit(1)
if options.num_coordinators <= 0:
- print 'Please specify a valid number of coordinators > 0'
+ LOG.error("Please specify a valid number of coordinators > 0")
sys.exit(1)
- if options.use_exclusive_coordinators and options.num_coordinators >= options.cluster_size:
- print 'Cannot start an Impala cluster with no executors'
+ if (options.use_exclusive_coordinators and
+ options.num_coordinators >= options.cluster_size):
+ LOG.error("Cannot start an Impala cluster with no executors")
sys.exit(1)
if not os.path.isdir(options.log_dir):
- print 'Log dir does not exist or is not a directory: %s' % options.log_dir
+ LOG.error("Log dir does not exist or is not a directory: {log_dir}".format(
+ log_dir=options.log_dir))
sys.exit(1)
# Kill existing cluster processes based on the current configuration.
if options.restart_impalad_only:
if options.inprocess:
- print 'Cannot perform individual component restarts using an in-process cluster'
+ LOG.error(
+ "Cannot perform individual component restarts using an in-process cluster")
sys.exit(1)
- kill_matching_processes(['impalad'], force=options.force_kill)
+ kill_matching_processes(["impalad"], force=options.force_kill)
else:
kill_cluster_processes(force=options.force_kill)
@@ -420,7 +480,8 @@ if __name__ == "__main__":
import json
wait_for_cluster = wait_for_cluster_web
except ImportError:
- print "json module not found, checking for cluster startup through the command-line"
+ LOG.exception("json module not found, checking "
+ "for cluster startup through the command-line")
wait_for_cluster = wait_for_cluster_cmdline
# If ImpalaCluster cannot be imported, fall back to the command-line to check
@@ -430,10 +491,11 @@ if __name__ == "__main__":
if options.restart_impalad_only:
impala_cluster = ImpalaCluster()
if not impala_cluster.statestored or not impala_cluster.catalogd:
- print 'No running statestored or catalogd detected. Restarting entire cluster.'
+ LOG.info("No running statestored or catalogd detected. "
+ "Restarting entire cluster.")
options.restart_impalad_only = False
except ImportError:
- print 'ImpalaCluster module not found.'
+ LOG.exception("ImpalaCluster module not found.")
# TODO: Update this code path to work similar to the ImpalaCluster code path when
# restarting only impalad processes. Specifically, we should do a full cluster
# restart if either the statestored or catalogd processes are down, even if
@@ -452,14 +514,15 @@ if __name__ == "__main__":
# Check for the cluster to be ready.
wait_for_cluster()
except Exception, e:
- print 'Error starting cluster: %s' % e
+ LOG.exception("Error starting cluster")
sys.exit(1)
if options.use_exclusive_coordinators == True:
executors = options.cluster_size - options.num_coordinators
else:
executors = options.cluster_size
- print 'Impala Cluster Running with %d nodes (%d coordinators, %d executors).' % (
- options.cluster_size,
- min(options.cluster_size, options.num_coordinators),
- executors)
+ LOG.info(("Impala Cluster Running with {num_nodes} nodes "
+ "({num_coordinators} coordinators, {num_executors} executors).").format(
+ num_nodes=options.cluster_size,
+ num_coordinators=min(options.cluster_size, options.num_coordinators),
+ num_executors=executors))
http://git-wip-us.apache.org/repos/asf/impala/blob/30d196fd/tests/common/impala_service.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index 9772b13..0934f78 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -35,7 +35,8 @@ from RuntimeProfile.ttypes import TRuntimeProfileTree
import base64
import zlib
-logging.basicConfig(level=logging.ERROR, format='%(threadName)s: %(message)s')
+logging.basicConfig(level=logging.ERROR, format='%(asctime)s %(threadName)s: %(message)s',
+ datefmt='%H:%M:%S')
LOG = logging.getLogger('impala_service')
LOG.setLevel(level=logging.DEBUG)