You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wi...@apache.org on 2014/06/05 18:49:38 UTC
[3/4] Make style consistent with build-support/python/checkstyle-check
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/aurora/tools/java/organize_imports.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/tools/java/organize_imports.py b/src/main/python/apache/aurora/tools/java/organize_imports.py
index f5472ec..99b8ec4 100644
--- a/src/main/python/apache/aurora/tools/java/organize_imports.py
+++ b/src/main/python/apache/aurora/tools/java/organize_imports.py
@@ -12,10 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
'''
-Organizes a java source file's import statements in a way that pleases Apache Aurora's checkstyle
-configuration. This expects exactly one argument: the name of the file to modify with preferred import
-ordering.
+Organizes a java source file's import statements in a way that pleases
+Apache Aurora's checkstyle configuration. This expects exactly one
+argument: the name of the file to modify with preferred import ordering.
'''
from __future__ import print_function
@@ -25,6 +26,8 @@ import sys
from collections import defaultdict
IMPORT_RE = re.compile('import(?: static)? (.*);')
+
+
def get_group(import_statement):
matcher = IMPORT_RE.match(import_statement)
assert matcher, 'Could not parse import statement: %s' % import_statement
@@ -42,6 +45,8 @@ def index_by_group(import_statements):
IMPORT_CLASS_RE = re.compile(
'import(?: static)? (?P<outer>[^A-Z]*[A-Z]\w+)(?:\.(?P<inners>[\w][^;]*))?')
+
+
def get_all_group_lines(import_groups):
if not import_groups:
return []
@@ -75,64 +80,69 @@ def get_all_group_lines(import_groups):
all_lines += get_group_lines(group)
return all_lines
-
-if len(sys.argv) != 2:
- print('usage: %s FILE' % sys.argv[0])
- sys.exit(1)
-
BEFORE_IMPORTS = 'before_imports'
IMPORTS = 'imports'
STATIC_IMPORTS = 'static_imports'
AFTER_IMPORTS = 'after_imports'
-print('Organizing imports in %s' % sys.argv[1])
-lines_before_imports = []
-import_lines = []
-static_import_lines = []
-lines_after_imports = []
-with open(sys.argv[1], 'r') as f:
- position = BEFORE_IMPORTS
- for line in f:
- line = line.rstrip()
- if position == BEFORE_IMPORTS:
- if line.startswith('import'):
- position = IMPORTS
- else:
- lines_before_imports.append(line)
- if position == IMPORTS:
- if line.startswith('import static'):
- position = STATIC_IMPORTS
- elif line.startswith('import'):
- import_lines.append(line)
- elif line.strip():
- position = AFTER_IMPORTS
- if position == STATIC_IMPORTS:
- if line.startswith('import static'):
- static_import_lines.append(line)
- elif line.strip():
- position = AFTER_IMPORTS
- if position == AFTER_IMPORTS:
- lines_after_imports.append(line)
-
-import_groups = index_by_group(import_lines)
-static_import_groups = index_by_group(static_import_lines)
-
-def ensure_line_padding(lines):
- if lines and lines[-1] != '':
- lines.append('')
- return lines
-
-file_lines = lines_before_imports
-if import_groups:
- ensure_line_padding(file_lines)
- file_lines += get_all_group_lines(import_groups)
-if static_import_groups:
- ensure_line_padding(file_lines)
- file_lines += get_all_group_lines(static_import_groups)
-if lines_after_imports:
- ensure_line_padding(file_lines)
- file_lines += lines_after_imports
-
-with open(sys.argv[1], 'w') as f:
- for line in file_lines:
- print(line, file=f)
+
+def main(argv):
+ if len(argv) != 2:
+ print('usage: %s FILE' % argv[0])
+ sys.exit(1)
+
+ print('Organizing imports in %s' % argv[1])
+ lines_before_imports = []
+ import_lines = []
+ static_import_lines = []
+ lines_after_imports = []
+ with open(argv[1], 'r') as f:
+ position = BEFORE_IMPORTS
+ for line in f:
+ line = line.rstrip()
+ if position == BEFORE_IMPORTS:
+ if line.startswith('import'):
+ position = IMPORTS
+ else:
+ lines_before_imports.append(line)
+ if position == IMPORTS:
+ if line.startswith('import static'):
+ position = STATIC_IMPORTS
+ elif line.startswith('import'):
+ import_lines.append(line)
+ elif line.strip():
+ position = AFTER_IMPORTS
+ if position == STATIC_IMPORTS:
+ if line.startswith('import static'):
+ static_import_lines.append(line)
+ elif line.strip():
+ position = AFTER_IMPORTS
+ if position == AFTER_IMPORTS:
+ lines_after_imports.append(line)
+
+ import_groups = index_by_group(import_lines)
+ static_import_groups = index_by_group(static_import_lines)
+
+ def ensure_line_padding(lines):
+ if lines and lines[-1] != '':
+ lines.append('')
+ return lines
+
+ file_lines = lines_before_imports
+ if import_groups:
+ ensure_line_padding(file_lines)
+ file_lines += get_all_group_lines(import_groups)
+ if static_import_groups:
+ ensure_line_padding(file_lines)
+ file_lines += get_all_group_lines(static_import_groups)
+ if lines_after_imports:
+ ensure_line_padding(file_lines)
+ file_lines += lines_after_imports
+
+ with open(argv[1], 'w') as f:
+ for line in file_lines:
+ print(line, file=f)
+
+
+if __name__ == '__main__':
+ main(sys.argv)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/aurora/tools/java/thrift_wrapper_codegen.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/tools/java/thrift_wrapper_codegen.py b/src/main/python/apache/aurora/tools/java/thrift_wrapper_codegen.py
index ea9a482..9dc92fd 100644
--- a/src/main/python/apache/aurora/tools/java/thrift_wrapper_codegen.py
+++ b/src/main/python/apache/aurora/tools/java/thrift_wrapper_codegen.py
@@ -110,7 +110,8 @@ STRUCT_ASSIGNMENT = '''this.%(field)s = !wrapped.%(isset)s()
: %(type)s.buildNoCopy(wrapped.%(fn_name)s());'''
-IMMUTABLE_COLLECTION_DECLARATION = '''private final Immutable%(collection)s<%(params)s> %(field)s;'''
+IMMUTABLE_COLLECTION_DECLARATION = (
+ '''private final Immutable%(collection)s<%(params)s> %(field)s;''')
IMMUTABLE_COLLECTION_ASSIGNMENT = '''this.%(field)s = !wrapped.%(isset)s()
? Immutable%(collection)s.<%(params)s>of()
: Immutable%(collection)s.copyOf(wrapped.%(fn_name)s());'''
@@ -273,7 +274,7 @@ class GeneratedCode(object):
'wrapped': self._wrapped_type,
'imports': '\n\n'.join(import_groups),
'accessors': '\n\n'.join(self._accessors),
- 'fields': (' ' + '\n '.join(self._fields) + '\n') if self._fields else '',
+ 'fields': (' ' + '\n '.join(self._fields) + '\n') if self._fields else '',
'assignments': ('\n ' + '\n '.join(self._assignments)) if self._assignments else '',
}, file=f)
@@ -344,7 +345,8 @@ def generate_java(struct):
# Accessor for each field.
for field in struct.fields:
- if not (isinstance(field.ttype, StructType) and (field.ttype.kind == 'enum' or struct.kind == 'union')):
+ if not (isinstance(field.ttype, StructType) and (
+ field.ttype.kind == 'enum' or struct.kind == 'union')):
code.add_accessor(IMMUTABLE_FIELD_TEMPLATE
% {'type': 'boolean',
'fn_name': field.isset_method()})
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/bin/thermos.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/bin/thermos.py b/src/main/python/apache/thermos/bin/thermos.py
index 196dfae..ebf5488 100644
--- a/src/main/python/apache/thermos/bin/thermos.py
+++ b/src/main/python/apache/thermos/bin/thermos.py
@@ -23,15 +23,12 @@ import pwd
import re
import sys
import time
-from collections import namedtuple
-from pystachio import Ref
from pystachio.naming import frozendict
from twitter.common import app, log
-from twitter.common.dirutil import du, tail_f
+from twitter.common.dirutil import tail_f
from twitter.common.dirutil.tail import tail as tail_closed
from twitter.common.log.options import LogOptions
-from twitter.common.quantity import Amount, Data, Time
from twitter.common.quantity.parse_simple import parse_data, parse_time
from twitter.common.recordio import RecordIO, ThriftRecordReader
@@ -101,9 +98,9 @@ def daemonize():
daemon_fork()
os.setsid()
daemon_fork()
- sys.stdin, sys.stdout, sys.stderr = (open('/dev/null', 'r'),
- open('/dev/null', 'a+'),
- open('/dev/null', 'a+', 0))
+ sys.stdin, sys.stdout, sys.stderr = (open('/dev/null', 'r'), # noqa
+ open('/dev/null', 'a+'), # noqa
+ open('/dev/null', 'a+', 0)) # noqa
def tasks_from_re(expressions, root, state=None):
@@ -274,16 +271,16 @@ def simplerun(args, options):
"""
try:
cutoff = args.index('--')
- cmdline = ' '.join(args[cutoff+1:])
+ cmdline = ' '.join(args[cutoff + 1:])
except ValueError:
cmdline = ' '.join(args)
print("Running command: '%s'" % cmdline)
thermos_task = ThermosTaskWrapper(Task(
- name = options.name,
- resources = Resources(cpu = 1.0, ram = 256 * 1024 * 1024, disk = 0),
- processes = [Process(name = options.name, cmdline = cmdline)]))
+ name=options.name,
+ resources=Resources(cpu=1.0, ram=256 * 1024 * 1024, disk=0),
+ processes=[Process(name=options.name, cmdline=cmdline)]))
_really_run(thermos_task,
options.root,
@@ -339,7 +336,7 @@ def read(args, options):
print('Recovered Task States:')
for task_status in state.statuses:
print(' %s [pid: %d] => %s' % (
- time.asctime(time.localtime(task_status.timestamp_ms/1000.0)),
+ time.asctime(time.localtime(task_status.timestamp_ms / 1000.0)),
task_status.runner_pid,
TaskState._VALUES_TO_NAMES[task_status.state]))
print('Recovered Processes:')
@@ -431,10 +428,11 @@ def gc(args, options):
gc_options['max_space'] = parse_data(options.max_space)
if options.max_tasks is not None:
gc_options['max_tasks'] = int(options.max_tasks)
- gc_options.update(include_data = not options.keep_data,
- include_metadata = not options.keep_metadata,
- include_logs = not options.keep_logs,
- verbose = True, logger = print)
+ gc_options.update(include_data=not options.keep_data,
+ include_metadata=not options.keep_metadata,
+ include_logs=not options.keep_logs,
+ verbose=True,
+ logger=print)
tgc = TaskGarbageCollector(root=options.root)
if args:
@@ -496,7 +494,7 @@ def status(args, options):
checkpoint_stat = os.stat(checkpoint_filename)
try:
checkpoint_owner = pwd.getpwuid(checkpoint_stat.st_uid).pw_name
- except:
+ except KeyError:
checkpoint_owner = 'uid:%s' % checkpoint_stat.st_uid
print(' %-20s [owner: %8s]' % (task_id, checkpoint_owner), end='')
if options.verbose == 0:
@@ -508,7 +506,7 @@ def status(args, options):
return
print(' state: %8s' % TaskState._VALUES_TO_NAMES.get(state.statuses[-1].state, 'Unknown'),
end='')
- print(' start: %25s' % time.asctime(time.localtime(state.header.launch_time_ms/1000.0)))
+ print(' start: %25s' % time.asctime(time.localtime(state.header.launch_time_ms / 1000.0)))
if options.verbose > 1:
print(' user: %s' % state.header.user, end='')
if state.header.ports:
@@ -557,7 +555,6 @@ def status(args, options):
sys.exit(1)
-
@app.command
@app.command_option("--stderr", default=False, dest='use_stderr', action='store_true',
help="Tail stderr instead of stdout")
@@ -595,7 +592,7 @@ def tail(args, options):
run=run, log_dir=log_dir).getpath('process_logdir')
logfile = os.path.join(logdir, 'stderr' if options.use_stderr else 'stdout')
- monitor = TaskMonitor(TaskPath(root = options.root), args[0])
+ monitor = TaskMonitor(TaskPath(root=options.root), args[0])
def log_is_active():
active_processes = monitor.get_active_processes()
for process_status, process_run in active_processes:
@@ -621,8 +618,8 @@ def tail(args, options):
next_check = time.time() + 5.0
-@app.command
-def help(args, options):
+@app.command(name='help')
+def help_command(args, options):
"""Get help about a specific command.
"""
if len(args) == 0:
@@ -635,8 +632,6 @@ def help(args, options):
print('unknown command: %s' % args[0], file=sys.stderr)
-
-
def generate_usage():
usage = """
thermos
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/bin/thermos_ckpt.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/bin/thermos_ckpt.py b/src/main/python/apache/thermos/bin/thermos_ckpt.py
index a8e42bc..8ed69e0 100644
--- a/src/main/python/apache/thermos/bin/thermos_ckpt.py
+++ b/src/main/python/apache/thermos/bin/thermos_ckpt.py
@@ -12,7 +12,8 @@
# limitations under the License.
#
-import os
+from __future__ import print_function
+
import pprint
import sys
import time
@@ -24,47 +25,59 @@ from apache.thermos.common.ckpt import CheckpointDispatcher
from gen.apache.thermos.ttypes import RunnerCkpt, RunnerState, TaskState
-app.add_option("--checkpoint", dest = "ckpt", metavar = "CKPT",
- help = "read checkpoint from CKPT")
-app.add_option("--assemble", dest = "assemble", metavar = "CKPT", default=True,
- help = "read checkpoint from CKPT")
+app.add_option(
+ "--checkpoint",
+ dest="ckpt",
+ metavar="CKPT",
+ help="read checkpoint from CKPT")
+
+app.add_option(
+ "--assemble",
+ dest="assemble",
+ metavar="CKPT",
+ default=True,
+ help="whether or not to replay the checkpoint records.")
+
def main(args):
values = app.get_options()
if len(args) > 0:
- print >> sys.stderr, "ERROR: unrecognized arguments: %s\n" % (" ".join(args))
+ print("ERROR: unrecognized arguments: %s\n" % (" ".join(args)), file=sys.stderr)
app.help()
sys.exit(1)
if not values.ckpt:
- print >> sys.stderr, "ERROR: must supply --checkpoint"
+ print("ERROR: must supply --checkpoint", file=sys.stderr)
app.help()
sys.exit(1)
fp = file(values.ckpt, "r")
rr = ThriftRecordReader(fp, RunnerCkpt)
- wrs = RunnerState(processes = {})
+ wrs = RunnerState(processes={})
dispatcher = CheckpointDispatcher()
try:
for wts in rr:
- print 'Recovering: ', wts
+ print('Recovering: %s' % wts)
if values.assemble is True:
- dispatcher.dispatch(wrs, wts)
+ dispatcher.dispatch(wrs, wts)
except RecordIO.Error as err:
- print 'Error recovering checkpoint stream: %s' % err
+ print('Error recovering checkpoint stream: %s' % err, file=sys.stderr)
return
- print '\n\n\n'
+ print('\n\n\n')
if values.assemble:
- print 'Recovered Task Header'
+ print('Recovered Task Header')
pprint.pprint(wrs.header, indent=4)
- print '\nRecovered Task States'
+ print('\nRecovered Task States')
for task_status in wrs.statuses:
- print ' %s [pid: %d] => %s' % (time.asctime(time.localtime(task_status.timestamp_ms/1000.0)),
- task_status.runner_pid, TaskState._VALUES_TO_NAMES[task_status.state])
+ print(' %s [pid: %d] => %s' % (
+ time.asctime(time.localtime(task_status.timestamp_ms / 1000.0)),
+ task_status.runner_pid,
+ TaskState._VALUES_TO_NAMES[task_status.state]))
- print '\nRecovered Processes'
+ print('\nRecovered Processes')
pprint.pprint(wrs.processes, indent=4)
+
app.main()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/common/ckpt.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/common/ckpt.py b/src/main/python/apache/thermos/common/ckpt.py
index af971f5..7df179b 100644
--- a/src/main/python/apache/thermos/common/ckpt.py
+++ b/src/main/python/apache/thermos/common/ckpt.py
@@ -22,8 +22,6 @@ Task state machines.
"""
-import os
-
from twitter.common import log
from twitter.common.recordio import RecordIO, ThriftRecordReader
@@ -41,6 +39,7 @@ class UniversalStateHandler(object):
Generic interface for a handler to be called on any process/state transition, and at task
initialization
"""
+
def on_process_transition(self, state, process_update):
pass
@@ -65,6 +64,7 @@ class ProcessStateHandler(object):
v | `---> [SUCCESS]
[LOST] <------'
"""
+
def on_waiting(self, process_update):
pass
@@ -301,7 +301,7 @@ class CheckpointDispatcher(object):
if process_update is None:
return False
process = process_update.process
- if process not in state.processes: # never seen before
+ if process not in state.processes: # never seen before
return True
else:
# if this sequence number is ahead of the current high water mark, it would
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/common/options.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/common/options.py b/src/main/python/apache/thermos/common/options.py
index f3debc3..d7abd8e 100644
--- a/src/main/python/apache/thermos/common/options.py
+++ b/src/main/python/apache/thermos/common/options.py
@@ -18,6 +18,7 @@ from pystachio import Ref
class ParseError(Exception):
pass
+
def add_port_to(option_name):
def add_port_callback(option, opt, value, parser):
if not getattr(parser.values, option_name, None):
@@ -34,6 +35,7 @@ def add_port_to(option_name):
getattr(parser.values, option_name)[name] = port
return add_port_callback
+
def add_binding_to(option_name):
def add_binding_callback(option, opt, value, parser):
if not getattr(parser.values, option_name, None):
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/common/path.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/common/path.py b/src/main/python/apache/thermos/common/path.py
index 0278161..9e61705 100644
--- a/src/main/python/apache/thermos/common/path.py
+++ b/src/main/python/apache/thermos/common/path.py
@@ -52,22 +52,22 @@ class TaskPath(object):
class UnderspecifiedPath(Exception): pass
DEFAULT_CHECKPOINT_ROOT = "/var/run/thermos"
- KNOWN_KEYS = [ 'root', 'task_id', 'state', 'process', 'run', 'log_dir' ]
+ KNOWN_KEYS = ['root', 'task_id', 'state', 'process', 'run', 'log_dir']
LEGACY_KNOWN_KEYS = KNOWN_KEYS[:-1]
DIR_TEMPLATE = {
- 'task_path': ['%(root)s', 'tasks', '%(state)s', '%(task_id)s'],
+ 'task_path': ['%(root)s', 'tasks', '%(state)s', '%(task_id)s'],
'checkpoint_path': ['%(root)s', 'checkpoints', '%(task_id)s'],
- 'runner_checkpoint': ['%(root)s', 'checkpoints', '%(task_id)s', 'runner'],
- 'process_checkpoint': ['%(root)s', 'checkpoints', '%(task_id)s', 'coordinator.%(process)s'],
+ 'runner_checkpoint': ['%(root)s', 'checkpoints', '%(task_id)s', 'runner'],
+ 'process_checkpoint': ['%(root)s', 'checkpoints', '%(task_id)s', 'coordinator.%(process)s'],
'process_logbase': ['%(log_dir)s'],
- 'process_logdir': ['%(log_dir)s', '%(process)s', '%(run)s']
+ 'process_logdir': ['%(log_dir)s', '%(process)s', '%(run)s']
}
LEGACY_DIR_TEMPLATE = DIR_TEMPLATE.copy()
LEGACY_DIR_TEMPLATE.update(
- process_logbase = ['%(root)s', 'logs', '%(task_id)s'],
- process_logdir = ['%(root)s', 'logs', '%(task_id)s', '%(process)s', '%(run)s']
+ process_logbase=['%(root)s', 'logs', '%(task_id)s'],
+ process_logdir=['%(root)s', 'logs', '%(task_id)s', '%(process)s', '%(run)s']
)
def __init__(self, **kw):
@@ -85,7 +85,7 @@ class TaskPath(object):
def given(self, **kw):
""" Perform further interpolation of the templates given the kwargs """
- eval_dict = dict(self._data) # copy
+ eval_dict = dict(self._data)
eval_dict.update(kw)
tp = TaskPath(**eval_dict)
tp._filename = self._filename
@@ -107,9 +107,9 @@ class TaskPath(object):
path = os.path.join(*path)
interpolated_path = path % self._data
try:
- _ = interpolated_path % {}
+ interpolated_path % {}
except KeyError:
- raise TaskPath.UnderspecifiedPath(
+ raise self.UnderspecifiedPath(
"Tried to interpolate path with insufficient variables: %s as %s" % (
pathname, interpolated_path))
return interpolated_path
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/common/planner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/common/planner.py b/src/main/python/apache/thermos/common/planner.py
index 58bd68d..da5120f 100644
--- a/src/main/python/apache/thermos/common/planner.py
+++ b/src/main/python/apache/thermos/common/planner.py
@@ -49,8 +49,8 @@ class Planner(object):
process_set -= given
return dependencies
- @staticmethod
- def satisfiable(processes, dependencies):
+ @classmethod
+ def satisfiable(cls, processes, dependencies):
"""
Given a set of processes and a dependency map, determine if this is a consistent
schedule without cycles.
@@ -61,27 +61,27 @@ class Planner(object):
scheduling = True
while scheduling:
scheduling = False
- runnables = Planner.filter_runnable(processes, dependencies)
+ runnables = cls.filter_runnable(processes, dependencies)
if runnables:
scheduling = True
processes -= runnables
- dependencies = Planner.filter_dependencies(dependencies, given=runnables)
+ dependencies = cls.filter_dependencies(dependencies, given=runnables)
return len(processes) == 0
def __init__(self, processes, dependencies):
self._processes = set(processes)
self._dependencies = dict((process, set(dependencies.get(process, [])))
for process in self._processes)
- if not Planner.satisfiable(self._processes, self._dependencies):
- raise Planner.InvalidSchedule("Cycles detected in the task schedule!")
+ if not self.satisfiable(self._processes, self._dependencies):
+ raise self.InvalidSchedule("Cycles detected in the task schedule!")
self._running = set()
self._finished = set()
self._failed = set()
@property
def runnable(self):
- return Planner.filter_runnable(self._processes - self._running - self._finished - self._failed,
- Planner.filter_dependencies(self._dependencies, given=self._finished))
+ return self.filter_runnable(self._processes - self._running - self._finished - self._failed,
+ self.filter_dependencies(self._dependencies, given=self._finished))
@property
def processes(self):
@@ -129,6 +129,7 @@ class Planner(object):
TaskAttributes = namedtuple('TaskAttributes', 'min_duration is_daemon max_failures is_ephemeral')
+
class TaskPlanner(object):
"""
A planner for the processes part of a Thermos task, taking into account ephemeral and daemon
@@ -155,12 +156,12 @@ class TaskPlanner(object):
| failed |
`--------'
"""
- InvalidSchedule = Planner.InvalidSchedule
+ InvalidSchedule = Planner.InvalidSchedule # noqa
INFINITY = sys.float_info.max
TOTAL_RUN_LIMIT = sys.maxsize
- @staticmethod
- def extract_dependencies(task, process_filter=None):
+ @classmethod
+ def extract_dependencies(cls, task, process_filter=None):
"""
Construct a set of processes and the process dependencies from a Thermos Task.
"""
@@ -175,16 +176,16 @@ class TaskPlanner(object):
process_name_set = set(process_names)
# either all process_names must be in processes or none should be
if process_name_set.issubset(processes) == process_name_set.isdisjoint(processes):
- raise TaskPlanner.InvalidSchedule('Invalid process dependencies!')
+ raise cls.InvalidSchedule('Invalid process dependencies!')
if not process_name_set.issubset(processes):
continue
for k in range(1, len(process_names)):
- pnk, pnk1 = process_names[k], process_names[k-1]
+ pnk, pnk1 = process_names[k], process_names[k - 1]
if process_map[pnk1].daemon().get():
- raise TaskPlanner.InvalidSchedule(
+ raise cls.InvalidSchedule(
'Process %s may not depend upon daemon process %s' % (pnk, pnk1))
if not process_map[pnk].ephemeral().get() and process_map[pnk1].ephemeral().get():
- raise TaskPlanner.InvalidSchedule(
+ raise cls.InvalidSchedule(
'Non-ephemeral process %s may not depend upon ephemeral process %s' % (pnk, pnk1))
dependencies[pnk].add(pnk1)
return (processes, dependencies)
@@ -195,7 +196,7 @@ class TaskPlanner(object):
'TaskPlanner must be given callable process filter.')
self._planner = Planner(*self.extract_dependencies(task, self._filter))
self._clock = clock
- self._last_terminal = {} # process => timestamp of last terminal state
+ self._last_terminal = {} # process => timestamp of last terminal state
self._failures = defaultdict(int)
self._successes = defaultdict(int)
self._attributes = {}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/config/bin/config_load.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/bin/config_load.py b/src/main/python/apache/thermos/config/bin/config_load.py
index a0b4800..d6e1f82 100644
--- a/src/main/python/apache/thermos/config/bin/config_load.py
+++ b/src/main/python/apache/thermos/config/bin/config_load.py
@@ -12,10 +12,10 @@
# limitations under the License.
#
-import copy
+from __future__ import print_function
+
import json
import pprint
-import sys
from twitter.common import app
@@ -23,41 +23,40 @@ from apache.thermos.config.loader import ThermosConfigLoader
def main(args):
- """
- Given .thermos configs, loads them and prints out information about them.
- """
+ """Given .thermos configs, loads them and prints out information about them."""
if len(args) == 0:
app.help()
for arg in args:
- print '\nparsing %s\n' % arg
+ print('\nparsing %s\n' % arg)
tc = ThermosConfigLoader.load(arg)
for task_wrapper in tc.tasks():
task = task_wrapper.task
if not task.has_name():
- print 'Found unnamed task! Skipping...'
+ print('Found unnamed task! Skipping...')
continue
- print 'Task: %s [check: %s]' % (task.name(), task.check())
+ print('Task: %s [check: %s]' % (task.name(), task.check()))
if not task.processes():
- print ' No processes.'
+ print(' No processes.')
else:
- print ' Processes:'
+ print(' Processes:')
for proc in task.processes():
- print ' %s' % proc
+ print(' %s' % proc)
ports = task_wrapper.ports()
if not ports:
- print ' No unbound ports.'
+ print(' No unbound ports.')
else:
- print ' Ports:'
+ print(' Ports:')
for port in ports:
- print ' %s' % port
+ print(' %s' % port)
- print 'raw:'
+ print('raw:')
pprint.pprint(json.loads(task_wrapper.to_json()))
+
app.set_usage("%s config1 config2 ..." % app.name())
app.main()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/config/bin/config_repl.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/bin/config_repl.py b/src/main/python/apache/thermos/config/bin/config_repl.py
index 8776f9c..ae9ca3b 100644
--- a/src/main/python/apache/thermos/config/bin/config_repl.py
+++ b/src/main/python/apache/thermos/config/bin/config_repl.py
@@ -14,6 +14,6 @@
from code import interact
-from apache.thermos.config.schema import *
+from apache.thermos.config.schema import * # noqa
interact('Thermos Config REPL', local=locals())
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/config/loader.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/loader.py b/src/main/python/apache/thermos/config/loader.py
index 160027a..d77ab9a 100644
--- a/src/main/python/apache/thermos/config/loader.py
+++ b/src/main/python/apache/thermos/config/loader.py
@@ -12,7 +12,6 @@
# limitations under the License.
#
-import copy
import json
import os
import re
@@ -21,7 +20,6 @@ import textwrap
from pystachio import Ref
from pystachio.config import Config
from twitter.common.dirutil import safe_open
-from twitter.common.lang import Compatibility
from apache.thermos.common.planner import TaskPlanner
from apache.thermos.config.schema import Task
@@ -30,8 +28,8 @@ from apache.thermos.config.schema import Task
class PortExtractor(object):
class InvalidPorts(Exception): pass
- @staticmethod
- def extract(obj):
+ @classmethod
+ def extract(cls, obj):
port_scope = Ref.from_address('thermos.ports')
_, uninterp = obj.interpolate()
ports = []
@@ -39,7 +37,7 @@ class PortExtractor(object):
subscope = port_scope.scoped_to(ref)
if subscope is not None:
if not subscope.is_index():
- raise PortExtractor.InvalidPorts(
+ raise cls.InvalidPorts(
'Bad port specification "%s" (should be of form "thermos.ports[name]"' % ref.address())
ports.append(subscope.action().value)
return ports
@@ -59,10 +57,10 @@ class ThermosProcessWrapper(object):
except PortExtractor.InvalidPorts:
raise self.InvalidProcess('Process has invalid ports scoping!')
- @staticmethod
- def assert_valid_process_name(name):
- if not ThermosProcessWrapper.VALID_PROCESS_NAME_RE.match(name):
- raise ThermosProcessWrapper.InvalidProcess('Invalid process name: %s' % name)
+ @classmethod
+ def assert_valid_process_name(cls, name):
+ if not cls.VALID_PROCESS_NAME_RE.match(name):
+ raise cls.InvalidProcess('Invalid process name: %s' % name)
class ThermosTaskWrapper(object):
@@ -72,7 +70,7 @@ class ThermosTaskWrapper(object):
if bindings:
task = task.bind(*bindings)
if not task.check().ok() and strict:
- raise ThermosTaskWrapper.InvalidTask(task.check().message())
+ raise self.InvalidTask(task.check().message())
self._task = task
@property
@@ -98,13 +96,13 @@ class ThermosTaskWrapper(object):
with safe_open(filename, 'w') as fp:
json.dump(ti.get(), fp)
- @staticmethod
- def from_file(filename, **kw):
+ @classmethod
+ def from_file(cls, filename, **kw):
try:
with safe_open(filename) as fp:
task = Task.json_load(fp)
- return ThermosTaskWrapper(task, **kw)
- except Exception as e:
+ return cls(task, **kw)
+ except Exception:
return None
@@ -121,8 +119,8 @@ class ThermosTaskValidator(object):
@classmethod
def assert_valid_plan(cls, task):
try:
- TaskPlanner(task, process_filter=lambda proc: proc.final().get() == False)
- TaskPlanner(task, process_filter=lambda proc: proc.final().get() == True)
+ TaskPlanner(task, process_filter=lambda proc: proc.final().get() is False)
+ TaskPlanner(task, process_filter=lambda proc: proc.final().get() is True)
except TaskPlanner.InvalidSchedule as e:
raise cls.InvalidTaskError('Task has invalid plan: %s' % e)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/config/schema.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/schema.py b/src/main/python/apache/thermos/config/schema.py
index ca16c57..9d858be 100644
--- a/src/main/python/apache/thermos/config/schema.py
+++ b/src/main/python/apache/thermos/config/schema.py
@@ -12,5 +12,5 @@
# limitations under the License.
#
-from .schema_base import *
-from .schema_helpers import *
+from .schema_base import * # noqa
+from .schema_helpers import * # noqa
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/config/schema_base.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/schema_base.py b/src/main/python/apache/thermos/config/schema_base.py
index 5fc77a8..f9143cc 100644
--- a/src/main/python/apache/thermos/config/schema_base.py
+++ b/src/main/python/apache/thermos/config/schema_base.py
@@ -12,6 +12,8 @@
# limitations under the License.
#
+# checkstyle: noqa
+
from pystachio import Boolean, Default, Empty, Float, Integer, List, Map, Required, String, Struct
# Define constants for resources
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/config/schema_helpers.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/schema_helpers.py b/src/main/python/apache/thermos/config/schema_helpers.py
index 38620f7..3feef02 100644
--- a/src/main/python/apache/thermos/config/schema_helpers.py
+++ b/src/main/python/apache/thermos/config/schema_helpers.py
@@ -13,6 +13,9 @@
#
"""Helpers for composing Thermos workflows."""
+
+# checkstyle: noqa
+
import itertools
from pystachio import Empty, List
@@ -242,10 +245,12 @@ def SimpleTask(name, command):
"""A simple command-line Task with default resources"""
return Tasks.simple(name, command)
+
def SequentialTask(*args, **kw):
"""A Task whose processes are always sequential."""
return Tasks.sequential(Task(*args, **kw))
+
python_options = Options.python
java_options = Options.java
combine_tasks = Tasks.combine
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/core/inspector.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/inspector.py b/src/main/python/apache/thermos/core/inspector.py
index f85b5f9..4fe8aa3 100644
--- a/src/main/python/apache/thermos/core/inspector.py
+++ b/src/main/python/apache/thermos/core/inspector.py
@@ -40,7 +40,7 @@ class CheckpointInspector(object):
@staticmethod
def get_timestamp(process_record):
- if process_record :
+ if process_record:
for timestamp in ('fork_time', 'start_time', 'stop_time'):
stamp = getattr(process_record, timestamp, None)
if stamp:
@@ -52,7 +52,7 @@ class CheckpointInspector(object):
Reconstructs the checkpoint stream and returns a CheckpointInspection.
"""
dispatcher = CheckpointDispatcher()
- state = RunnerState(processes = {})
+ state = RunnerState(processes={})
muxer = ProcessMuxer(self._path.given(task_id=task_id))
runner_processes = []
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/core/muxer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/muxer.py b/src/main/python/apache/thermos/core/muxer.py
index ea2665c..47e77f7 100644
--- a/src/main/python/apache/thermos/core/muxer.py
+++ b/src/main/python/apache/thermos/core/muxer.py
@@ -22,13 +22,14 @@ from gen.apache.thermos.ttypes import RunnerCkpt
class ProcessMuxer(object):
- class ProcessExists(Exception): pass
- class ProcessNotFound(Exception): pass
- class CorruptCheckpoint(Exception): pass
+ class Error(Exception): pass
+ class ProcessExists(Error): pass
+ class ProcessNotFound(Error): pass
+ class CorruptCheckpoint(Error): pass
def __init__(self, pathspec):
- self._processes = {} # process_name => fp
- self._watermarks = {} # process_name => sequence high watermark
+ self._processes = {} # process_name => fp
+ self._watermarks = {} # process_name => sequence high watermark
self._pathspec = pathspec
def __del__(self):
@@ -38,7 +39,7 @@ class ProcessMuxer(object):
def register(self, process_name, watermark=0):
log.debug('registering %s' % process_name)
if process_name in self._processes:
- raise ProcessMuxer.ProcessExists("Process %s is already registered" % process_name)
+ raise self.ProcessExists("Process %s is already registered" % process_name)
self._processes[process_name] = None
self._watermarks[process_name] = watermark
@@ -48,7 +49,7 @@ class ProcessMuxer(object):
process_ckpt = self._pathspec.given(process=process_name).getpath('process_checkpoint')
log.debug('ProcessMuxer binding %s => %s' % (process_name, process_ckpt))
try:
- self._processes[process_name] = open(process_ckpt, 'r')
+ self._processes[process_name] = open(process_ckpt, 'r') # noqa
except IOError as e:
if e.errno == errno.ENOENT:
log.debug(' => bind failed, checkpoint not available yet.')
@@ -92,7 +93,7 @@ class ProcessMuxer(object):
def unregister(self, process_name):
log.debug('unregistering %s' % process_name)
if process_name not in self._processes:
- raise ProcessMuxer.ProcessNotFound("No trace of process: %s" % process_name)
+ raise self.ProcessNotFound("No trace of process: %s" % process_name)
else:
self._watermarks.pop(process_name)
fp = self._processes.pop(process_name)
@@ -111,8 +112,8 @@ class ProcessMuxer(object):
rr = ThriftRecordReader(fp, RunnerCkpt)
old_pos = fp.tell()
try:
- expected_new_pos = os.fstat(fp.fileno()).st_size
- except OSError as e:
+ os.fstat(fp.fileno()).st_size
+ except OSError:
log.debug('ProcessMuxer could not fstat for process %s' % process)
return False
update = rr.try_read()
@@ -137,7 +138,7 @@ class ProcessMuxer(object):
for handle in filter(None, self._processes.values()):
try:
fstat = os.fstat(handle.fileno())
- except OSError as e:
+ except OSError:
log.error('Unable to fstat %s!' % handle.name)
continue
if handle.tell() > fstat.st_size:
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/core/process.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/process.py b/src/main/python/apache/thermos/core/process.py
index aef1ec6..4889e63 100644
--- a/src/main/python/apache/thermos/core/process.py
+++ b/src/main/python/apache/thermos/core/process.py
@@ -20,7 +20,6 @@ commandline in a subprocess of its own.
"""
-import getpass
import grp
import os
import pwd
@@ -41,6 +40,7 @@ from gen.apache.thermos.ttypes import ProcessState, ProcessStatus, RunnerCkpt
class Platform(Interface):
"""Abstract representation of a platform encapsulating system-level functions"""
+
@abstractmethod
def clock(self):
pass
@@ -94,7 +94,7 @@ class ProcessBase(object):
self._stderr = None
self._user = user
if self._user:
- user, current_user = self._getpwuid() # may raise self.UnknownUserError
+ user, current_user = self._getpwuid() # may raise self.UnknownUserError
if user != current_user and os.geteuid() != 0:
raise self.PermissionError('Must be root to run processes as other users!')
self._ckpt = None
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/core/runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/runner.py b/src/main/python/apache/thermos/core/runner.py
index b04c555..ec4cdb7 100644
--- a/src/main/python/apache/thermos/core/runner.py
+++ b/src/main/python/apache/thermos/core/runner.py
@@ -39,14 +39,12 @@ terminal state.
"""
-import errno
import os
import socket
import sys
import time
import traceback
from contextlib import contextmanager
-from functools import partial
from pystachio import Environment
from twitter.common import log
@@ -64,7 +62,6 @@ from apache.thermos.common.path import TaskPath
from apache.thermos.common.planner import TaskPlanner
from apache.thermos.config.loader import (
ThermosConfigLoader,
- ThermosProcessWrapper,
ThermosTaskValidator,
ThermosTaskWrapper
)
@@ -279,7 +276,7 @@ class TaskRunnerStage(object):
raise NotImplementedError
-class TaskRunnerStage_ACTIVE(TaskRunnerStage):
+class TaskRunnerStage_ACTIVE(TaskRunnerStage): # noqa
"""
Run the regular plan (i.e. normal, non-finalizing processes.)
"""
@@ -315,7 +312,7 @@ class TaskRunnerStage_ACTIVE(TaskRunnerStage):
return TaskState.CLEANING
-class TaskRunnerStage_CLEANING(TaskRunnerStage):
+class TaskRunnerStage_CLEANING(TaskRunnerStage): # noqa
"""
Start the cleanup of the regular plan (e.g. if it failed.) On ACTIVE -> CLEANING,
we send SIGTERMs to all still-running processes. We wait at most finalization_wait
@@ -323,6 +320,7 @@ class TaskRunnerStage_CLEANING(TaskRunnerStage):
prior to that point in time, we transition to FINALIZING, which kicks into gear
the finalization schedule (if any.)
"""
+
def run(self):
log.debug('TaskRunnerStage[CLEANING]: Finalization remaining: %s' %
self.runner._finalization_remaining())
@@ -336,7 +334,7 @@ class TaskRunnerStage_CLEANING(TaskRunnerStage):
return TaskState.FINALIZING
-class TaskRunnerStage_FINALIZING(TaskRunnerStage):
+class TaskRunnerStage_FINALIZING(TaskRunnerStage): # noqa
"""
Run the finalizing plan, specifically the plan of tasks with the 'final'
bit marked (e.g. log savers, checkpointers and the like.) Anything in this
@@ -446,7 +444,7 @@ class TaskRunner(object):
raise TypeError('planner_class must be a TaskPlanner.')
self._clock = clock
launch_time = self._clock.time()
- launch_time_ms = '%06d' % int((launch_time - int(launch_time)) * 10**6)
+ launch_time_ms = '%06d' % int((launch_time - int(launch_time)) * (10 ** 6))
if not task_id:
self._task_id = '%s-%s.%s' % (task.name(),
time.strftime('%Y%m%d-%H%M%S', time.localtime(launch_time)),
@@ -481,11 +479,11 @@ class TaskRunner(object):
ThermosTaskValidator.assert_same_task(self._pathspec, self._task)
except ThermosTaskValidator.InvalidTaskError as e:
raise self.InvalidTask('Invalid task: %s' % e)
- self._plan = None # plan currently being executed (updated by Handlers)
+ self._plan = None # plan currently being executed (updated by Handlers)
self._regular_plan = planner_class(self._task, clock=clock,
- process_filter=lambda proc: proc.final().get() == False)
+ process_filter=lambda proc: proc.final().get() is False)
self._finalizing_plan = planner_class(self._task, clock=clock,
- process_filter=lambda proc: proc.final().get() == True)
+ process_filter=lambda proc: proc.final().get() is True)
self._chroot = chroot
self._sandbox = sandbox
self._terminal_state = None
@@ -496,7 +494,7 @@ class TaskRunner(object):
self._finalization_start = None
self._preemption_deadline = None
self._watcher = ProcessMuxer(self._pathspec)
- self._state = RunnerState(processes = {})
+ self._state = RunnerState(processes={})
# create runner state
universal_handler = universal_handler or TaskRunnerUniversalHandler
@@ -542,14 +540,14 @@ class TaskRunner(object):
file lock on the checkpoint stream.
"""
if self.is_terminal():
- raise TaskRunner.StateError('Cannot take control of a task in terminal state.')
+ raise self.StateError('Cannot take control of a task in terminal state.')
if self._sandbox:
safe_mkdir(self._sandbox)
ckpt_file = self._pathspec.getpath('runner_checkpoint')
try:
self._ckpt = TaskRunnerHelper.open_checkpoint(ckpt_file, force=force, state=self._state)
except TaskRunnerHelper.PermissionError:
- raise TaskRunner.PermissionError('Unable to open checkpoint %s' % ckpt_file)
+ raise self.PermissionError('Unable to open checkpoint %s' % ckpt_file)
log.debug('Flipping recovery mode off.')
self._recovery = False
self._set_task_status(self.task_state())
@@ -589,12 +587,11 @@ class TaskRunner(object):
"""
ckpt_file = self._pathspec.getpath('runner_checkpoint')
if os.path.exists(ckpt_file):
- fp = open(ckpt_file, "r")
- ckpt_recover = ThriftRecordReader(fp, RunnerCkpt)
- for record in ckpt_recover:
- log.debug('Replaying runner checkpoint record: %s' % record)
- self._dispatcher.dispatch(self._state, record, recovery=True)
- ckpt_recover.close()
+ with open(ckpt_file, 'r') as fp:
+ ckpt_recover = ThriftRecordReader(fp, RunnerCkpt)
+ for record in ckpt_recover:
+ log.debug('Replaying runner checkpoint record: %s' % record)
+ self._dispatcher.dispatch(self._state, record, recovery=True)
def _replay_process_ckpts(self):
"""
@@ -618,7 +615,7 @@ class TaskRunner(object):
if self._state.header is None:
header = RunnerHeader(
task_id=self._task_id,
- launch_time_ms=int(self._launch_time*1000),
+ launch_time_ms=int(self._launch_time * 1000),
sandbox=self._sandbox,
log_dir=self._log_dir,
hostname=socket.gethostname(),
@@ -643,8 +640,8 @@ class TaskRunner(object):
if self._finalization_start is None:
return sys.float_info.max
else:
- waited = max(0, self._clock.time() - self._finalization_start)
- return max(0, finalization_allocation - waited)
+ waited = max(0, self._clock.time() - self._finalization_start)
+ return max(0, finalization_allocation - waited)
def _set_process_status(self, process_name, process_state, **kw):
if 'sequence_number' in kw:
@@ -725,7 +722,7 @@ class TaskRunner(object):
def forked_but_never_came_up():
return current_run.state == ProcessState.FORKED and (
- self._clock.time() - current_run.fork_time > TaskRunner.LOST_TIMEOUT.as_(Time.SECONDS))
+ self._clock.time() - current_run.fork_time > self.LOST_TIMEOUT.as_(Time.SECONDS))
def running_but_coordinator_died():
if current_run.state != ProcessState.RUNNING:
@@ -846,7 +843,6 @@ class TaskRunner(object):
self._run()
def _run(self):
- iteration_time = self.MAX_ITERATION_TIME.as_(Time.SECONDS)
while not self.is_terminal():
start = self._clock.time()
# step 1: execute stage corresponding to the state we're currently in
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/monitoring/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/BUILD b/src/main/python/apache/thermos/monitoring/BUILD
index 1ae5f52..79da0d5 100644
--- a/src/main/python/apache/thermos/monitoring/BUILD
+++ b/src/main/python/apache/thermos/monitoring/BUILD
@@ -18,7 +18,7 @@ python_library(
name = 'detector',
sources = ['detector.py'],
dependencies = [
- pants('src/main/python/apache/thermos/common:path')
+ pants('src/main/python/apache/thermos/common:path'),
]
)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/monitoring/detector.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/detector.py b/src/main/python/apache/thermos/monitoring/detector.py
index fa50fce..117aef5 100644
--- a/src/main/python/apache/thermos/monitoring/detector.py
+++ b/src/main/python/apache/thermos/monitoring/detector.py
@@ -48,7 +48,7 @@ class TaskDetector(object):
for path in paths:
try:
task_state, task_id = path_re.match(path).groups()
- except:
+ except Exception:
continue
if state is None or task_state == state:
yield (task_state, task_id)
@@ -69,7 +69,7 @@ class TaskDetector(object):
for path in paths:
try:
process, run = path_re.match(path).groups()
- except:
+ except Exception:
continue
yield process, int(run)
@@ -100,6 +100,6 @@ class TaskDetector(object):
for path in matching_paths:
try:
process, = path_re.match(path).groups()
- except:
+ except Exception:
continue
yield path
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/monitoring/disk.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/disk.py b/src/main/python/apache/thermos/monitoring/disk.py
index 23c1a93..175ed3a 100644
--- a/src/main/python/apache/thermos/monitoring/disk.py
+++ b/src/main/python/apache/thermos/monitoring/disk.py
@@ -28,7 +28,7 @@ Currently, there are two threads available:
import os
import threading
import time
-from Queue import Empty, Queue
+from Queue import Queue
from twitter.common import log
from twitter.common.dirutil import du, safe_bsize
@@ -47,6 +47,7 @@ from watchdog.observers import Observer as WatchdogObserver
class DiskCollectorThread(ExceptionalThread):
""" Thread to calculate aggregate disk usage under a given path using a simple algorithm """
+
def __init__(self, path):
self.path = path
self.value = None
@@ -66,6 +67,7 @@ class DiskCollectorThread(ExceptionalThread):
class DiskCollector(Lockable):
""" Spawn a background thread to sample disk usage """
+
def __init__(self, root):
self._root = root
self._thread = None
@@ -169,11 +171,12 @@ class InotifyDiskCollectorThread(ExceptionalThread, FileSystemEventHandler):
@property
def value(self):
- return sum(self._files.itervalues())
+ return sum(self._files.values())
class InotifyDiskCollector(object):
""" Spawn a background thread to sample disk usage """
+
def __init__(self, root):
self._root = root
self._thread = InotifyDiskCollectorThread(self._root)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/monitoring/garbage.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/garbage.py b/src/main/python/apache/thermos/monitoring/garbage.py
index 11b8c76..69bf8e4 100644
--- a/src/main/python/apache/thermos/monitoring/garbage.py
+++ b/src/main/python/apache/thermos/monitoring/garbage.py
@@ -136,9 +136,9 @@ class DefaultCollector(TaskGarbageCollectionPolicy):
verbose: boolean (whether or not to log) [default: False]
logger: callable (function to call with log messages) [default: sys.stdout.write]
"""
- self._max_age = kw.get('max_age', Amount(10**10, Time.DAYS))
- self._max_space = kw.get('max_space', Amount(10**10, Data.TB))
- self._max_tasks = kw.get('max_tasks', 10**10)
+ self._max_age = kw.get('max_age', Amount(10 ** 10, Time.DAYS))
+ self._max_space = kw.get('max_space', Amount(10 ** 10, Data.TB))
+ self._max_tasks = kw.get('max_tasks', 10 ** 10)
self._include_metadata = kw.get('include_metadata', True)
self._include_logs = kw.get('include_logs', True)
self._verbose = kw.get('verbose', False)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/monitoring/process.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/process.py b/src/main/python/apache/thermos/monitoring/process.py
index 99f7a3d..9529a00 100644
--- a/src/main/python/apache/thermos/monitoring/process.py
+++ b/src/main/python/apache/thermos/monitoring/process.py
@@ -40,24 +40,25 @@ class ProcessSample(namedtuple('ProcessSample', 'rate user system rss vms nice s
else:
status = other.status
return ProcessSample(
- rate = self.rate + other.rate,
- user = self.user + other.user,
- system = self.system + other.system,
- rss = self.rss + other.rss,
- vms = self.vms + other.vms,
- nice = nice,
- status = status,
- threads = self.threads + other.threads)
+ rate=self.rate + other.rate,
+ user=self.user + other.user,
+ system=self.system + other.system,
+ rss=self.rss + other.rss,
+ vms=self.vms + other.vms,
+ nice=nice,
+ status=status,
+ threads=self.threads + other.threads,
+ )
def to_dict(self):
return dict(
- cpu = self.rate,
- ram = self.rss,
- user = self.user,
- system = self.system,
- rss = self.rss,
- vms = self.vms,
- nice = self.nice,
- status = str(self.status),
- threads = self.threads
+ cpu=self.rate,
+ ram=self.rss,
+ user=self.user,
+ system=self.system,
+ rss=self.rss,
+ vms=self.vms,
+ nice=self.nice,
+ status=str(self.status),
+ threads=self.threads,
)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/monitoring/process_collector_psutil.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/process_collector_psutil.py b/src/main/python/apache/thermos/monitoring/process_collector_psutil.py
index f972c83..298459b 100644
--- a/src/main/python/apache/thermos/monitoring/process_collector_psutil.py
+++ b/src/main/python/apache/thermos/monitoring/process_collector_psutil.py
@@ -43,11 +43,12 @@ def process_to_sample(process):
class ProcessTreeCollector(object):
""" Collect resource consumption statistics for a process and its children """
+
def __init__(self, pid):
""" Given a pid """
self._pid = pid
self._process = None # psutil.Process
- self._sampled_tree = {} # pid => ProcessSample
+ self._sampled_tree = {} # pid => ProcessSample
self._sample = ProcessSample.empty()
self._stamp = None
self._rate = 0.0
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/monitoring/resource.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/resource.py b/src/main/python/apache/thermos/monitoring/resource.py
index 9f8e24e..7b5bbe0 100644
--- a/src/main/python/apache/thermos/monitoring/resource.py
+++ b/src/main/python/apache/thermos/monitoring/resource.py
@@ -27,7 +27,6 @@ disk consumption and retaining a limited (FIFO) in-memory history of this data.
"""
-import platform
import threading
import time
from abc import abstractmethod
@@ -42,7 +41,6 @@ from twitter.common.lang import Interface
from twitter.common.quantity import Amount, Time
from .disk import DiskCollector
-from .monitor import TaskMonitor
from .process import ProcessSample
from .process_collector_psutil import ProcessTreeCollector
@@ -81,6 +79,7 @@ class ResourceHistory(object):
"""Simple class to contain a RingBuffer (fixed-length FIFO) history of resource samples, with the
mapping: timestamp => (number_of_procs, ProcessSample, disk_usage_in_bytes)
"""
+
def __init__(self, maxlen, initialize=True):
if not maxlen >= 1:
raise ValueError("maxlen must be greater than 0")
@@ -117,7 +116,7 @@ class TaskResourceMonitor(ResourceMonitorBase, threading.Thread):
history of previous sample results.
"""
- MAX_HISTORY = 10000 # magic number
+ MAX_HISTORY = 10000 # magic number
def __init__(self, task_monitor, sandbox,
process_collector=ProcessTreeCollector, disk_collector=DiskCollector,
@@ -128,10 +127,10 @@ class TaskResourceMonitor(ResourceMonitorBase, threading.Thread):
task_monitor: TaskMonitor object specifying the task whose resources should be monitored
sandbox: Directory for which to monitor disk utilisation
"""
- self._task_monitor = task_monitor # exposes PIDs, sandbox
+ self._task_monitor = task_monitor # exposes PIDs, sandbox
self._task_id = task_monitor._task_id
log.debug('Initialising resource collection for task %s' % self._task_id)
- self._process_collectors = dict() # ProcessStatus => ProcessTreeCollector
+ self._process_collectors = dict() # ProcessStatus => ProcessTreeCollector
# TODO(jon): sandbox is also available through task_monitor, but typically the first checkpoint
# isn't written (and hence the header is not available) by the time we initialise here
self._sandbox = sandbox
@@ -203,7 +202,7 @@ class TaskResourceMonitor(ResourceMonitorBase, threading.Thread):
log.debug('Adding process "%s" (pid %s) to resource monitoring' %
(process.process, process.pid))
self._process_collectors[process] = self._process_collector_factory(process.pid)
- for process, collector in self._process_collectors.iteritems():
+ for process, collector in self._process_collectors.items():
log.debug('Collecting sample for process "%s" (pid %s) and children' %
(process.process, process.pid))
collector.sample()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/observer/bin/thermos_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/bin/thermos_observer.py b/src/main/python/apache/thermos/observer/bin/thermos_observer.py
index c205f14..53c331c 100644
--- a/src/main/python/apache/thermos/observer/bin/thermos_observer.py
+++ b/src/main/python/apache/thermos/observer/bin/thermos_observer.py
@@ -14,9 +14,7 @@
from __future__ import print_function
-import socket
import sys
-import time
from twitter.common import app
from twitter.common.exceptions import ExceptionalThread
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/observer/http/file_browser.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/file_browser.py b/src/main/python/apache/thermos/observer/http/file_browser.py
index 6b53c86..87ef9c8 100644
--- a/src/main/python/apache/thermos/observer/http/file_browser.py
+++ b/src/main/python/apache/thermos/observer/http/file_browser.py
@@ -13,11 +13,9 @@
#
import os
-import pprint
from xml.sax.saxutils import escape
import bottle
-from mako.template import Template
from twitter.common import log
from twitter.common.http import HttpServer
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/observer/http/http_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/http_observer.py b/src/main/python/apache/thermos/observer/http/http_observer.py
index 2bd079c..5bfc4f2 100644
--- a/src/main/python/apache/thermos/observer/http/http_observer.py
+++ b/src/main/python/apache/thermos/observer/http/http_observer.py
@@ -19,7 +19,6 @@ system. To do this, it relies heavily on the Thermos TaskObserver.
"""
-import os
import socket
from twitter.common import log
@@ -79,15 +78,15 @@ class BottleObserver(HttpServer, StaticAssets, TaskObserverFileBrowser, TaskObse
state = self._observer.state(task_id)
return dict(
- task_id = task_id,
- task = task,
- statuses = self._observer.task_statuses(task_id),
- user = task['user'],
- ports = task['ports'],
- processes = processes,
- chroot = state.get('sandbox', ''),
- launch_time = state.get('launch_time', 0),
- hostname = state.get('hostname', 'localhost'),
+ task_id=task_id,
+ task=task,
+ statuses=self._observer.task_statuses(task_id),
+ user=task['user'],
+ ports=task['ports'],
+ processes=processes,
+ chroot=state.get('sandbox', ''),
+ launch_time=state.get('launch_time', 0),
+ hostname=state.get('hostname', 'localhost'),
)
def get_task(self, task_id):
@@ -102,9 +101,9 @@ class BottleObserver(HttpServer, StaticAssets, TaskObserverFileBrowser, TaskObse
task = self.get_task(task_id)
state = self._observer.state(task_id)
return dict(
- hostname = state.get('hostname', 'localhost'),
- task_id = task_id,
- task_struct = task['task_struct']
+ hostname=state.get('hostname', 'localhost'),
+ task_id=task_id,
+ task_struct=task['task_struct']
)
@HttpServer.route("/process/:task_id/:process_id")
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/observer/http/static_assets.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/http/static_assets.py b/src/main/python/apache/thermos/observer/http/static_assets.py
index be881bf..83adeb3 100644
--- a/src/main/python/apache/thermos/observer/http/static_assets.py
+++ b/src/main/python/apache/thermos/observer/http/static_assets.py
@@ -25,6 +25,7 @@ class StaticAssets(object):
"""
Serve the /assets directory.
"""
+
def __init__(self):
self._assets = {}
self._detect_assets()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/main/python/apache/thermos/testing/runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/testing/runner.py b/src/main/python/apache/thermos/testing/runner.py
index 551311f..8b6ba73 100644
--- a/src/main/python/apache/thermos/testing/runner.py
+++ b/src/main/python/apache/thermos/testing/runner.py
@@ -24,14 +24,13 @@ import tempfile
import time
from thrift.TSerialization import deserialize as thrift_deserialize
-from twitter.common import log
from twitter.common.contextutil import environment_as, temporary_file
from apache.thermos.common.ckpt import CheckpointDispatcher
from apache.thermos.common.path import TaskPath
from apache.thermos.config.loader import ThermosTaskWrapper
-from gen.apache.thermos.ttypes import RunnerCkpt, RunnerState, TaskState
+from gen.apache.thermos.ttypes import RunnerState
class Runner(object):
@@ -98,11 +97,11 @@ with open('%(state_filename)s', 'w') as fp:
self.state_filename = tempfile.mktemp()
self.tempdir = tempfile.mkdtemp()
- self.task_id = '%s-runner-base' % int(time.time()*1000000)
+ self.task_id = '%s-runner-base' % int(time.time() * 1000000)
self.sandbox = os.path.join(self.tempdir, 'sandbox')
self.portmap = portmap
self.cleaned = False
- self.pathspec = TaskPath(root = self.tempdir, task_id = self.task_id)
+ self.pathspec = TaskPath(root=self.tempdir, task_id=self.task_id)
self.script_filename = None
self.success_rate = success_rate
self.random_seed = random_seed
@@ -150,7 +149,8 @@ with open('%(state_filename)s', 'w') as fp:
rc = self.po.returncode
if rc != 0:
if os.path.exists(self.job_filename):
- config = open(self.job_filename).read()
+ with open(self.job_filename) as fp:
+ config = fp.read()
else:
config = 'Nonexistent!'
if 'THERMOS_DEBUG' in os.environ:
@@ -167,8 +167,9 @@ with open('%(state_filename)s', 'w') as fp:
try:
self.reconstructed_state = CheckpointDispatcher.from_file(
- self.pathspec.getpath('runner_checkpoint'))
- except:
+ self.pathspec.getpath('runner_checkpoint'))
+ except Exception as e:
+ print('Failed to replay checkpoint: %s' % e, file=sys.stderr)
self.reconstructed_state = None
self.initialized = True
return rc
@@ -178,7 +179,8 @@ with open('%(state_filename)s', 'w') as fp:
if hasattr(self, 'po'):
try:
self.po.kill()
- except:
+ except Exception as e:
+ print('Failed to kill runner: %s' % e, file=sys.stderr)
pass
os.unlink(self.job_filename)
os.unlink(self.script_filename)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/test/python/apache/aurora/admin/test_host_maintenance.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/admin/test_host_maintenance.py b/src/test/python/apache/aurora/admin/test_host_maintenance.py
index ed0782b..176076f 100644
--- a/src/test/python/apache/aurora/admin/test_host_maintenance.py
+++ b/src/test/python/apache/aurora/admin/test_host_maintenance.py
@@ -15,7 +15,6 @@
import unittest
import mock
-import pytest
from apache.aurora.admin.host_maintenance import HostMaintenance
from apache.aurora.client.base import add_grouping, remove_grouping
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/test/python/apache/aurora/client/api/test_disambiguator.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_disambiguator.py b/src/test/python/apache/aurora/client/api/test_disambiguator.py
index fb6380d..e9523ac 100644
--- a/src/test/python/apache/aurora/client/api/test_disambiguator.py
+++ b/src/test/python/apache/aurora/client/api/test_disambiguator.py
@@ -25,7 +25,7 @@ from apache.aurora.common.cluster import Cluster
from gen.apache.aurora.api.constants import ResponseCode
from gen.apache.aurora.api.ttypes import GetJobsResult, JobConfiguration, JobKey, Response, Result
-TEST_CLUSTER = Cluster(name = 'smf1')
+TEST_CLUSTER = Cluster(name='smf1')
class LiveJobDisambiguatorTest(mox.MoxTestBase):
@@ -51,7 +51,7 @@ class LiveJobDisambiguatorTest(mox.MoxTestBase):
self._api.get_jobs(self.ROLE).AndReturn(Response(
responseCode=ResponseCode.OK,
messageDEPRECATED='Mock OK',
- result = Result(getJobsResult=GetJobsResult(
+ result=Result(getJobsResult=GetJobsResult(
configs=set(JobConfiguration(key=JobKey(role=self.ROLE, environment=env, name=self.NAME))
for env in envs)))))
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/test/python/apache/aurora/client/api/test_health_check.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_health_check.py b/src/test/python/apache/aurora/client/api/test_health_check.py
index 52cfcfe..af005be 100644
--- a/src/test/python/apache/aurora/client/api/test_health_check.py
+++ b/src/test/python/apache/aurora/client/api/test_health_check.py
@@ -15,7 +15,6 @@
import unittest
import mox
-import pytest
from apache.aurora.client.api.health_check import (
HealthCheck,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/test/python/apache/aurora/client/api/test_instance_watcher.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_instance_watcher.py b/src/test/python/apache/aurora/client/api/test_instance_watcher.py
index b2d0c80..723a5b6 100644
--- a/src/test/python/apache/aurora/client/api/test_instance_watcher.py
+++ b/src/test/python/apache/aurora/client/api/test_instance_watcher.py
@@ -21,7 +21,19 @@ from apache.aurora.client.api.health_check import HealthCheck
from apache.aurora.client.api.instance_watcher import InstanceWatcher
from gen.apache.aurora.api.AuroraSchedulerManager import Client as scheduler_client
-from gen.apache.aurora.api.ttypes import *
+from gen.apache.aurora.api.ttypes import (
+ AssignedTask,
+ Identity,
+ JobKey,
+ Response,
+ ResponseCode,
+ Result,
+ ScheduledTask,
+ ScheduleStatus,
+ ScheduleStatusResult,
+ TaskConfig,
+ TaskQuery
+)
class FakeClock(object):
@@ -82,7 +94,8 @@ class InstanceWatcherTest(unittest.TestCase):
for x in range(int(num_calls)):
self._scheduler.getTasksStatus(query).AndReturn(response)
- def expect_io_error_in_get_statuses(self, instance_ids=WATCH_INSTANCES, num_calls=EXPECTED_CYCLES):
+ def expect_io_error_in_get_statuses(self, instance_ids=WATCH_INSTANCES,
+ num_calls=EXPECTED_CYCLES):
tasks = [self.create_task(instance_id) for instance_id in instance_ids]
response = Response(responseCode=ResponseCode.OK, messageDEPRECATED='test')
response.result = Result()
@@ -92,7 +105,6 @@ class InstanceWatcherTest(unittest.TestCase):
for x in range(int(num_calls)):
self._scheduler.getTasksStatus(query).AndRaise(IOError('oops'))
-
def mock_health_check(self, task, status, retry):
self._health_check.health(task).InAnyOrder().AndReturn((status, retry))
@@ -103,7 +115,8 @@ class InstanceWatcherTest(unittest.TestCase):
def assert_watch_result(self, expected_failed_instances, instances_to_watch=WATCH_INSTANCES):
instances_returned = self._watcher.watch(instances_to_watch, self._health_check)
assert set(expected_failed_instances) == instances_returned, (
- 'Expected instances (%s) : Returned instances (%s)' % (expected_failed_instances, instances_returned))
+ 'Expected instances (%s) : Returned instances (%s)' % (
+ expected_failed_instances, instances_returned))
def replay_mocks(self):
mox.Replay(self._scheduler)
@@ -142,7 +155,6 @@ class InstanceWatcherTest(unittest.TestCase):
self.assert_watch_result([0, 1, 2])
self.verify_mocks()
-
def test_all_instance_failure(self):
"""All failed instance in a batch of instances"""
self.expect_get_statuses()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/test/python/apache/aurora/client/api/test_job_monitor.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_job_monitor.py b/src/test/python/apache/aurora/client/api/test_job_monitor.py
index 665db74..3aa9607 100644
--- a/src/test/python/apache/aurora/client/api/test_job_monitor.py
+++ b/src/test/python/apache/aurora/client/api/test_job_monitor.py
@@ -15,11 +15,9 @@ import unittest
from mock import Mock
-from apache.aurora.client.api import AuroraClientAPI
from apache.aurora.client.api.job_monitor import JobMonitor
from apache.aurora.common.aurora_job_key import AuroraJobKey
-from gen.apache.aurora.api.AuroraSchedulerManager import Client
from gen.apache.aurora.api.ttypes import (
AssignedTask,
Identity,
@@ -56,6 +54,7 @@ class JobMonitorTest(unittest.TestCase):
status=status,
timestamp=10)]
)
+
def mock_get_tasks(self, tasks, response_code=None):
response_code = ResponseCode.OK if response_code is None else response_code
resp = Response(responseCode=response_code, messageDEPRECATED='test')
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/test/python/apache/aurora/client/api/test_quota_check.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_quota_check.py b/src/test/python/apache/aurora/client/api/test_quota_check.py
index 6c241b7..d75bd1b 100644
--- a/src/test/python/apache/aurora/client/api/test_quota_check.py
+++ b/src/test/python/apache/aurora/client/api/test_quota_check.py
@@ -19,7 +19,6 @@ from mock import Mock
from apache.aurora.client.api.quota_check import CapacityRequest, QuotaCheck
-from gen.apache.aurora.api.AuroraSchedulerManager import Client as scheduler_client
from gen.apache.aurora.api.ttypes import (
GetQuotaResult,
JobKey,
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/test/python/apache/aurora/client/api/test_restarter.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_restarter.py b/src/test/python/apache/aurora/client/api/test_restarter.py
index e31fd6d..97add6f 100644
--- a/src/test/python/apache/aurora/client/api/test_restarter.py
+++ b/src/test/python/apache/aurora/client/api/test_restarter.py
@@ -21,22 +21,28 @@ from apache.aurora.client.fake_scheduler_proxy import FakeSchedulerProxy
from apache.aurora.common.aurora_job_key import AuroraJobKey
from gen.apache.aurora.api.AuroraSchedulerManager import Client as scheduler_client
-from gen.apache.aurora.api.ttypes import *
-
-# test space
-
+from gen.apache.aurora.api.ttypes import (
+ AssignedTask,
+ Response,
+ ResponseCode,
+ Result,
+ ScheduledTask,
+ ScheduleStatus,
+ ScheduleStatusResult,
+ TaskConfig
+)
SESSION_KEY = 'test_session'
-CLUSTER='smfd'
+CLUSTER = 'smfd'
JOB = AuroraJobKey(CLUSTER, 'johndoe', 'test', 'test_job')
HEALTH_CHECK_INTERVAL_SECONDS = 5
UPDATER_CONFIG = UpdaterConfig(
- 2, # batch_size
- 23, # restart_threshold
- 45, #watch_secs
- 0, # max_per_instance_failures
- 0, # max_total_failures
- True # rollback_on_failure
+ batch_size=2,
+ restart_threshold=23,
+ watch_secs=45,
+ max_per_shard_failures=0,
+ max_total_failures=0,
+ rollback_on_failure=True,
)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/test/python/apache/aurora/client/api/test_scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
index 402b426..6b23a4a 100644
--- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py
+++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
@@ -29,24 +29,39 @@ from apache.aurora.common.cluster import Cluster
import gen.apache.aurora.api.AuroraAdmin as AuroraAdmin
import gen.apache.aurora.api.AuroraSchedulerManager as AuroraSchedulerManager
from gen.apache.aurora.api.constants import CURRENT_API_VERSION, DEFAULT_ENVIRONMENT
-from gen.apache.aurora.api.ttypes import *
+from gen.apache.aurora.api.ttypes import (
+ Hosts,
+ JobConfiguration,
+ JobKey,
+ Lock,
+ LockValidation,
+ ResourceAggregate,
+ Response,
+ ResponseCode,
+ Result,
+ RewriteConfigsRequest,
+ ScheduleStatus,
+ SessionKey,
+ TaskQuery
+)
ROLE = 'foorole'
JOB_NAME = 'barjobname'
JOB_KEY = JobKey(role=ROLE, environment=DEFAULT_ENVIRONMENT, name=JOB_NAME)
-def test_testCoverage():
+def test_coverage():
"""Make sure a new thrift RPC doesn't get added without minimal test coverage."""
for name, klass in inspect.getmembers(AuroraAdmin) + inspect.getmembers(AuroraSchedulerManager):
if name.endswith('_args'):
rpc_name = name[:-len('_args')]
assert hasattr(TestSchedulerProxyAdminInjection, 'test_%s' % rpc_name), (
- 'No test defined for RPC %s' % rpc_name)
+ 'No test defined for RPC %s' % rpc_name)
class TestSchedulerProxy(scheduler_client.SchedulerProxy):
"""In testing we shouldn't use the real SSHAgentAuthenticator."""
+
def session_key(self):
return self.create_session('SOME_USER')
@@ -70,7 +85,7 @@ class TestSchedulerProxyInjection(unittest.TestCase):
self.mock_scheduler_client.get_thrift_client().AndReturn(self.mock_thrift_client)
version_resp = Response(responseCode=ResponseCode.OK)
- version_resp.result = Result(getVersionResult = CURRENT_API_VERSION)
+ version_resp.result = Result(getVersionResult=CURRENT_API_VERSION)
self.mock_thrift_client.getVersion().AndReturn(version_resp)
@@ -322,15 +337,15 @@ class TestZookeeperSchedulerClient(unittest.TestCase):
return mock_zk, [ServiceInstance.unpack(service_json)]
- class TestZookeeperSchedulerClient(scheduler_client.ZookeeperSchedulerClient):
+ class ZookeeperSchedulerClientTestImpl(scheduler_client.ZookeeperSchedulerClient):
SERVERSET_TIMEOUT = Amount(10, Time.MILLISECONDS)
- original_method = TestZookeeperSchedulerClient.get_scheduler_serverset
+ original_method = ZookeeperSchedulerClientTestImpl.get_scheduler_serverset
try:
- TestZookeeperSchedulerClient.get_scheduler_serverset = mock_get_serverset
+ ZookeeperSchedulerClientTestImpl.get_scheduler_serverset = mock_get_serverset
- zk_scheduler_client = TestZookeeperSchedulerClient(Cluster(proxy_url=None))
+ zk_scheduler_client = ZookeeperSchedulerClientTestImpl(Cluster(proxy_url=None))
self.mox.StubOutWithMock(zk_scheduler_client, '_connect_scheduler')
mock_zk.stop()
zk_scheduler_client._connect_scheduler(host, port)
@@ -339,7 +354,8 @@ class TestZookeeperSchedulerClient(unittest.TestCase):
assert zk_scheduler_client.url == 'http://%s:%d' % (host, port)
finally:
- TestZookeeperSchedulerClient.get_scheduler_serverset = original_method
+ ZookeeperSchedulerClientTestImpl.get_scheduler_serverset = original_method
+
class TestSchedulerClient(unittest.TestCase):
@mock.patch('thrift.transport.THttpClient.THttpClient', spec=THttpClient.THttpClient)
@@ -348,4 +364,5 @@ class TestSchedulerClient(unittest.TestCase):
mock_time = mock.Mock(spec=time)
scheduler_client.SchedulerClient._connect_scheduler('scheduler.example.com', 1337, mock_time)
assert MockTHttpClient.return_value.open.call_count is 2
- mock_time.sleep.assert_called_once_with(scheduler_client.SchedulerClient.RETRY_TIMEOUT.as_(Time.SECONDS))
+ mock_time.sleep.assert_called_once_with(
+ scheduler_client.SchedulerClient.RETRY_TIMEOUT.as_(Time.SECONDS))
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/test/python/apache/aurora/client/api/test_sla.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_sla.py b/src/test/python/apache/aurora/client/api/test_sla.py
index 9c3bb6d..9ca8102 100644
--- a/src/test/python/apache/aurora/client/api/test_sla.py
+++ b/src/test/python/apache/aurora/client/api/test_sla.py
@@ -14,16 +14,15 @@
import time
import unittest
-
from contextlib import contextmanager
+
from mock import call, Mock, patch
-from apache.aurora.client.api.sla import DomainUpTimeSlaVector, JobUpTimeSlaVector, Sla, task_query
+from apache.aurora.client.api.sla import JobUpTimeLimit, Sla, task_query
from apache.aurora.client.base import add_grouping, DEFAULT_GROUPING, remove_grouping
from apache.aurora.common.aurora_job_key import AuroraJobKey
from apache.aurora.common.cluster import Cluster
-from gen.apache.aurora.api.AuroraSchedulerManager import Client as scheduler_client
from gen.apache.aurora.api.constants import LIVE_STATES
from gen.apache.aurora.api.ttypes import (
AssignedTask,
@@ -192,7 +191,6 @@ class SlaTest(unittest.TestCase):
self.mock_get_tasks(self.create_tasks([100, 200, 300, 400, 500]))
self.assert_count_result(100, 50)
-
def test_uptime_empty(self):
self.mock_get_tasks([])
self.assert_uptime_result(0, 50)
@@ -217,7 +215,6 @@ class SlaTest(unittest.TestCase):
self.mock_get_tasks(self.create_tasks([100, 200, 300, 400]))
self.assert_uptime_result(None, 100)
-
def test_wait_time_empty(self):
self.mock_get_tasks([])
self.assert_wait_time_result(None, 50, 200)
@@ -246,7 +243,6 @@ class SlaTest(unittest.TestCase):
self.mock_get_tasks(self.create_tasks([100, 200, 300, 400]))
self.assert_wait_time_result(150, 80, 250)
-
def test_domain_uptime_no_tasks(self):
self.mock_get_tasks([])
vector = self._sla.get_domain_uptime_vector(self._cluster, self._min_count)
@@ -289,11 +285,7 @@ class SlaTest(unittest.TestCase):
])
job_override = {
- self._job_key:
- DomainUpTimeSlaVector.JobUpTimeLimit(
- job=self._job_key,
- percentage=50,
- duration_secs=100)
+ self._job_key: JobUpTimeLimit(job=self._job_key, percentage=50, duration_secs=100)
}
self.assert_safe_domain_result('h1', 50, 400, in_limit=job_override)
@@ -337,7 +329,6 @@ class SlaTest(unittest.TestCase):
assert 0 == len(vector.get_safe_hosts(50, 150, None, 'by_rack')), 'Length must be empty.'
self.expect_task_status_call_cluster_scoped()
-
def test_probe_hosts_no_hosts(self):
self.mock_get_tasks([])
vector = self._sla.get_domain_uptime_vector(self._cluster, self._min_count)
@@ -431,7 +422,6 @@ class SlaTest(unittest.TestCase):
self.assert_probe_host_job_details(result, 'cl-r2-h03', 25.0, False, 100)
self.assert_probe_host_job_details(result, 'cl-r2-h04', 25.0, False, 100)
-
def test_get_domain_uptime_vector_with_hosts(self):
with patch('apache.aurora.client.api.sla.task_query', return_value=TaskQuery()) as (mock_query):
self.mock_get_tasks([
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/test/python/apache/aurora/client/api/test_updater.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_updater.py b/src/test/python/apache/aurora/client/api/test_updater.py
index ba783da..0ee342c 100644
--- a/src/test/python/apache/aurora/client/api/test_updater.py
+++ b/src/test/python/apache/aurora/client/api/test_updater.py
@@ -38,7 +38,6 @@ from gen.apache.aurora.api.ttypes import (
JobConfiguration,
JobKey,
LimitConstraint,
- Lock,
LockKey,
LockValidation,
Metadata,
@@ -63,6 +62,7 @@ if 'UPDATER_DEBUG' in environ:
LogOptions.set_stderr_log_level('DEBUG')
log.init('test_updater')
+
class FakeConfig(object):
def __init__(self, role, name, env, update_config):
self._role = role
@@ -104,12 +104,12 @@ class FakeConfig(object):
class UpdaterTest(TestCase):
UPDATE_CONFIG = {
- 'batch_size': 3,
- 'restart_threshold': 50,
- 'watch_secs': 50,
- 'max_per_shard_failures': 0,
- 'max_total_failures': 0,
- 'rollback_on_failure': True,
+ 'batch_size': 3,
+ 'restart_threshold': 50,
+ 'watch_secs': 50,
+ 'max_per_shard_failures': 0,
+ 'max_total_failures': 0,
+ 'rollback_on_failure': True,
}
def setUp(self):
@@ -459,7 +459,6 @@ class UpdaterTest(TestCase):
self.update_and_expect_ok(instances=[2, 3, 4])
self.verify_mocks()
-
def test_patch_hole_with_instance_option(self):
"""Patching an instance ID gap created by a terminated update."""
old_configs = self.make_task_configs(8)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/test/python/apache/aurora/client/cli/test_command_hooks.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_command_hooks.py b/src/test/python/apache/aurora/client/cli/test_command_hooks.py
index 1ee11ff..65bce5f 100644
--- a/src/test/python/apache/aurora/client/cli/test_command_hooks.py
+++ b/src/test/python/apache/aurora/client/cli/test_command_hooks.py
@@ -215,8 +215,8 @@ class TestClientCreateCommand(AuroraClientCommandTest):
def test_dynamic_hook_syntax_error(self):
with patch("logging.warn") as log_patch:
GlobalCommandHookRegistry.reset()
- hook_locals = GlobalCommandHookRegistry.load_project_hooks(
- "./src/test/python/apache/aurora/client/cli/hook_test_data/bad_syntax")
+ GlobalCommandHookRegistry.load_project_hooks(
+ "./src/test/python/apache/aurora/client/cli/hook_test_data/bad_syntax")
log_patch.assert_called_with("Error compiling hooks file "
"./src/test/python/apache/aurora/client/cli/hook_test_data/bad_syntax/AuroraHooks: "
"invalid syntax (AuroraHooks, line 1)")
@@ -224,16 +224,15 @@ class TestClientCreateCommand(AuroraClientCommandTest):
def test_dynamic_hook_exec_error(self):
with patch("logging.warn") as log_patch:
GlobalCommandHookRegistry.reset()
- hook_locals = GlobalCommandHookRegistry.load_project_hooks(
- "./src/test/python/apache/aurora/client/cli/hook_test_data/exec_error")
+ GlobalCommandHookRegistry.load_project_hooks(
+ "./src/test/python/apache/aurora/client/cli/hook_test_data/exec_error")
log_patch.assert_called_with("Warning: error loading hooks file "
"./src/test/python/apache/aurora/client/cli/hook_test_data/exec_error/AuroraHooks: "
"integer division or modulo by zero")
def assert_skip_allowed(self, context, skip_opt, user, noun, verb, args):
"""Checks that a hook would be allowed to be skipped in a command invocation"""
- required_hooks = GlobalCommandHookRegistry.get_required_hooks(context, skip_opt, noun,
- verb, user)
+ GlobalCommandHookRegistry.get_required_hooks(context, skip_opt, noun, verb, user)
def assert_skip_forbidden(self, context, skip_opt, user, noun, verb, args):
"""Checks that a hook would NOT be allowed to be skipped in a command invocation"""
@@ -362,7 +361,6 @@ class TestClientCreateCommand(AuroraClientCommandTest):
}
}
- mock_query = self.create_mock_query()
mock_context.add_expected_status_query_result(
self.create_mock_status_query_result(ScheduleStatus.INIT))
mock_context.add_expected_status_query_result(
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/test/python/apache/aurora/client/cli/test_config_noun.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_config_noun.py b/src/test/python/apache/aurora/client/cli/test_config_noun.py
index 7a9d733..c55c9fe 100644
--- a/src/test/python/apache/aurora/client/cli/test_config_noun.py
+++ b/src/test/python/apache/aurora/client/cli/test_config_noun.py
@@ -12,14 +12,13 @@
# limitations under the License.
#
-import contextlib
import textwrap
+from mock import patch
from twitter.common.contextutil import temporary_file
from apache.aurora.client.cli.client import AuroraCommandLine
from apache.aurora.client.cli.util import AuroraClientCommandTest, FakeAuroraCommandContext
-from mock import patch
class TestClientCreateCommand(AuroraClientCommandTest):
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/test/python/apache/aurora/client/cli/test_create.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_create.py b/src/test/python/apache/aurora/client/cli/test_create.py
index b186b52..224adc3 100644
--- a/src/test/python/apache/aurora/client/cli/test_create.py
+++ b/src/test/python/apache/aurora/client/cli/test_create.py
@@ -20,7 +20,6 @@ from twitter.common.contextutil import temporary_file
from apache.aurora.client.cli import EXIT_COMMAND_FAILURE, EXIT_INVALID_CONFIGURATION
from apache.aurora.client.cli.client import AuroraCommandLine
from apache.aurora.client.cli.util import AuroraClientCommandTest, FakeAuroraCommandContext
-from apache.aurora.client.hooks.hooked_api import HookedAuroraClientAPI
from apache.aurora.config import AuroraConfig
from gen.apache.aurora.api.ttypes import (
@@ -164,7 +163,6 @@ class TestClientCreateCommand(AuroraClientCommandTest):
# Check that create_job was called exactly once, with an AuroraConfig parameter.
self.assert_create_job_called(api)
-
def test_create_job_failed_invalid_config(self):
"""Run a test of the "create" command against a mocked-out API, with a configuration
containing a syntax error"""
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/467f8428/src/test/python/apache/aurora/client/cli/test_cron.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_cron.py b/src/test/python/apache/aurora/client/cli/test_cron.py
index 049405a..6066a3b 100644
--- a/src/test/python/apache/aurora/client/cli/test_cron.py
+++ b/src/test/python/apache/aurora/client/cli/test_cron.py
@@ -15,7 +15,9 @@
#
import contextlib
+
from mock import Mock, patch
+from twitter.common.contextutil import temporary_file
from apache.aurora.client.cli import EXIT_COMMAND_FAILURE, EXIT_INVALID_CONFIGURATION, EXIT_OK
from apache.aurora.client.cli.client import AuroraCommandLine
@@ -25,9 +27,6 @@ from apache.aurora.config import AuroraConfig
from gen.apache.aurora.api.ttypes import JobKey
-from twitter.common.contextutil import temporary_file
-
-
class TestCronNoun(AuroraClientCommandTest):
def test_successful_schedule(self):