You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by go...@apache.org on 2014/08/06 22:14:08 UTC
git commit: TEZ-1332. Swimlane diagrams from tez AM logs (gopalv)
Repository: tez
Updated Branches:
refs/heads/master 45420856e -> 948565e99
TEZ-1332. Swimlane diagrams from tez AM logs (gopalv)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/948565e9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/948565e9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/948565e9
Branch: refs/heads/master
Commit: 948565e99e309a1118d61d1c5c4883d4667e7760
Parents: 4542085
Author: Gopal V <go...@apache.org>
Authored: Wed Aug 6 11:47:29 2014 -0700
Committer: Gopal V <go...@apache.org>
Committed: Wed Aug 6 11:47:29 2014 -0700
----------------------------------------------------------------------
tez-tools/swimlanes/README.md | 34 +++++
tez-tools/swimlanes/amlogparser.py | 240 ++++++++++++++++++++++++++++++++
tez-tools/swimlanes/swimlane.py | 163 ++++++++++++++++++++++
3 files changed, 437 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/948565e9/tez-tools/swimlanes/README.md
----------------------------------------------------------------------
diff --git a/tez-tools/swimlanes/README.md b/tez-tools/swimlanes/README.md
new file mode 100644
index 0000000..87a90f5
--- /dev/null
+++ b/tez-tools/swimlanes/README.md
@@ -0,0 +1,34 @@
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+Swimlanes
+=========
+
+This is a performance and post-hoc analysis tool for Apache Tez which uses the log lines
+printed during execution to draw a diagram which represents each container allocated
+with a horizontal lane, where each task is represented as a section of the lane.
+
+Currently, the tool only works for successfully completed DAGs and does not have much way
+of representing failures or data dependency in the diagram.
+
+The data output format is SVG, which also includes clickable links back to the history logs
+of each task for further debugging, after a slow task has been located.
+
+To use tool, generate a trimmed log file by doing
+
+`yarn logs -applicationId <application_...> | grep HISTORY > am.log`
+
+Generate the diagram with the tool using
+
+`python swimlane.py -o am.svg am.log`
http://git-wip-us.apache.org/repos/asf/tez/blob/948565e9/tez-tools/swimlanes/amlogparser.py
----------------------------------------------------------------------
diff --git a/tez-tools/swimlanes/amlogparser.py b/tez-tools/swimlanes/amlogparser.py
new file mode 100644
index 0000000..46d0599
--- /dev/null
+++ b/tez-tools/swimlanes/amlogparser.py
@@ -0,0 +1,240 @@
+import os,sys,re,math,os.path
+from collections import defaultdict
+from itertools import groupby
+from bz2 import BZ2File
+from gzip import GzipFile as GZFile
+try:
+ from urllib.request import urlopen
+except:
+ from urllib2 import urlopen as urlopen
+
+class AMRawEvent(object):
+ def __init__(self, ts, dag, event, args):
+ self.ts = ts
+ self.dag = dag
+ self.event = event
+ self.args = args
+ def __repr__(self):
+ return "%s->%s (%s)" % (self.dag, self.event, self.args)
+
+def first(l):
+ return (l[:1] or [None])[0]
+
+def kv_add(d, k, v):
+ if(d.has_key(k)):
+ oldv = d[k]
+ if(type(oldv) is list):
+ oldv.append(v)
+ else:
+ oldv = [oldv, v]
+ d[k] = oldv
+ else:
+ d[k] = v
+
+def csv_kv(args):
+ kvs = {};
+ pairs = [p.strip() for p in args.split(",")]
+ for kv in pairs:
+ if(kv.find("=") == -1):
+ kv_add(kvs, kv, None)
+ elif(kv.find("=") == kv.rfind("=")):
+ (k,v) = kv.split("=")
+ kv_add(kvs, k, v)
+ return kvs
+
+class AppMaster(object):
+ def __init__(self, raw):
+ self.raw = raw
+ self.kvs = csv_kv(raw.args)
+ self.name = self.kvs["appAttemptId"]
+ self.zero = int(self.kvs["startTime"])
+ #self.ready = int(self.kvs["initTime"])
+ #self.start = int(self.kvs["appSubmitTime"])
+ self.containers = None
+ self.dags = None
+ def __repr__(self):
+ return "[%s started at %d]" % (self.name, self.zero)
+
+class Container(object):
+ def __init__(self, raw):
+ self.raw = raw
+ self.kvs = csv_kv(raw.args)
+ self.name = self.kvs["containerId"]
+ self.start = int(self.kvs["launchTime"])
+ self.stop = -1
+ self.node =""
+ def __repr__(self):
+ return "[%s start=%d]" % (self.name, self.start)
+
+class DAG(object):
+ def __init__(self, raw):
+ self.raw = raw
+ self.name = raw.dag
+ self.kvs = csv_kv(raw.args)
+ self.start = (int)(self.kvs["startTime"])
+ self.finish = (int)(self.kvs["finishTime"])
+ self.duration = (int)(self.kvs["timeTaken"])
+ def structure(self, vertexes):
+ self.vertexes = [v for v in vertexes if v.dag == self.name]
+ def attempts(self):
+ for v in self.vertexes:
+ for t in v.tasks:
+ for a in t.attempts:
+ if(a.dag == self.name):
+ yield a
+ def __repr__(self):
+ return "%s (%d+%d)" % (self.name, self.start, self.duration)
+
+class Vertex(object):
+ def __init__(self, raw):
+ self.raw = raw
+ self.dag = raw.dag
+ self.kvs = csv_kv(raw.args)
+ self.name = self.kvs["vertexName"]
+ self.initZero = (int)(self.kvs["initRequestedTime"])
+ self.init = (int)(self.kvs["initedTime"])
+ self.startZero = (int)(self.kvs["startRequestedTime"])
+ self.start = (int)(self.kvs["startedTime"])
+ self.finish = (int)(self.kvs["finishTime"])
+ self.duration = (int)(self.kvs["timeTaken"])
+ def structure(self, tasks):
+ self.tasks = [t for t in tasks if t.vertex == self.name]
+ def __repr__(self):
+ return "%s (%d+%d)" % (self.name, self.start, self.duration)
+
+
+class Task(object):
+ def __init__(self, raw):
+ self.raw = raw
+ self.dag = raw.dag
+ self.kvs = csv_kv(raw.args)
+ self.vertex = self.kvs["vertexName"]
+ self.name = self.kvs["taskId"]
+ self.start = (int)(self.kvs["startTime"])
+ self.finish = (int)(self.kvs["finishTime"])
+ self.duration = (int)(self.kvs["timeTaken"])
+ def structure(self, attempts):
+ self.attempts = [a for a in attempts if a.task == self.name]
+ def __repr__(self):
+ return "%s (%d+%d)" % (self.name, self.start, self.duration)
+
+class Attempt(object):
+ def __init__(self, pair):
+ start = first(filter(lambda a: a.event == "TASK_ATTEMPT_STARTED", pair))
+ finish = first(filter(lambda a: a.event == "TASK_ATTEMPT_FINISHED", pair))
+ self.raw = finish
+ self.dag = finish.dag
+ self.kvs = csv_kv(start.args)
+ self.kvs.update(csv_kv(finish.args))
+ self.name = self.kvs["taskAttemptId"]
+ self.task = self.name[:self.name.rfind("_")].replace("attempt","task")
+ self.vertex = self.kvs["vertexName"]
+ self.start = (int)(self.kvs["startTime"])
+ self.finish = (int)(self.kvs["finishTime"])
+ self.duration = (int)(self.kvs["timeTaken"])
+ self.container = self.kvs["containerId"]
+ self.node = self.kvs["nodeId"]
+ def __repr__(self):
+ return "%s (%d+%d)" % (self.name, self.start, self.duration)
+
+
+def open_file(f):
+ if(f.endswith(".gz")):
+ return GZFile(f)
+ elif(f.endswith(".bz2")):
+ return BZ2File(f)
+ elif(f.startswith("http://")):
+ return urlopen(f)
+ return open(f)
+
+class AMLog(object):
+ def init(self):
+ ID=r'[^\]]*'
+ TS=r'[0-9:\-, ]*'
+ MAIN_RE=r'^(?P<ts>%(ts)s) INFO [(?P<thread>%(id)s)] org.apache.tez.dag.history.HistoryEventHandler: [HISTORY][DAG:(?P<dag>%(id)s)][Event:(?P<event>%(id)s)]: (?P<args>.*)'
+ MAIN_RE = MAIN_RE.replace('[','\[').replace(']','\]')
+ MAIN_RE = MAIN_RE % {'ts' : TS, 'id' : ID}
+ self.MAIN_RE = re.compile(MAIN_RE)
+
+ def __init__(self, f):
+ fp = open_file(f)
+ self.init()
+ self.events = filter(lambda a:a, [self.parse(l.strip()) for l in fp])
+
+ def structure(self):
+ am = self.appmaster() # this is a copy
+ containers = dict([(a.name, a) for a in self.containers()])
+ dags = self.dags()
+ vertexes = self.vertexes()
+ tasks = self.tasks()
+ attempts = self.attempts()
+ for t in tasks:
+ t.structure(attempts)
+ for v in vertexes:
+ v.structure(tasks)
+ for d in dags:
+ d.structure(vertexes)
+ for a in attempts:
+ c = containers[a.container]
+ c.node = a.node
+ am.containers = containers
+ am.dags = dags
+ return am
+
+ def appmaster(self):
+ return first([AppMaster(ev) for ev in self.events if ev.event == "AM_STARTED"])
+
+ def containers(self):
+ containers = [Container(ev) for ev in self.events if ev.event == "CONTAINER_LAUNCHED"]
+ containermap = dict([(c.name, c) for c in containers])
+ for ev in self.events:
+ if ev.event == "CONTAINER_STOPPED":
+ kvs = csv_kv(ev.args)
+ if containermap.has_key(kvs["containerId"]):
+ containermap[kvs["containerId"]].stop = int(kvs["stoppedTime"])
+ return containers
+
+
+ def dags(self):
+ dags = [DAG(ev) for ev in self.events if ev.event == "DAG_FINISHED"]
+ return dags
+
+ def vertexes(self):
+ """ yes, not vertices """
+ vertexes = [Vertex(ev) for ev in self.events if ev.event == "VERTEX_FINISHED"]
+ return vertexes
+
+ def tasks(self):
+ tasks = [Task(ev) for ev in self.events if ev.event == "TASK_FINISHED"]
+ return tasks
+
+ def attempts(self):
+ key = lambda a:a[0]
+ value = lambda a:a[1]
+ raw = [(csv_kv(ev.args)["taskAttemptId"], ev) for ev in self.events if ev.event == "TASK_ATTEMPT_STARTED" or ev.event == "TASK_ATTEMPT_FINISHED"]
+ pairs = groupby(sorted(raw), key = key)
+ attempts = [Attempt(map(value,p)) for (k,p) in pairs]
+ return attempts
+
+ def parse(self, l):
+ if(l.find("[HISTORY]") != -1):
+ m = self.MAIN_RE.match(l)
+ ts = m.group("ts")
+ dag = m.group("dag")
+ event = m.group("event")
+ args = m.group("args")
+ return AMRawEvent(ts, dag, event, args)
+
+def main(argv):
+ f = argv[0]
+ tree = AMLog(argv[0]).structure()
+ # AM -> dag -> vertex -> task -> attempt
+ # AM -> container
+ containers = set(tree.containers.keys())
+ timeto = lambda a: (a - tree.zero)
+ for d in tree.dags:
+ for a in d.attempts():
+ print [a.vertex, a.name, a.container, a.start, a.finish]
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
http://git-wip-us.apache.org/repos/asf/tez/blob/948565e9/tez-tools/swimlanes/swimlane.py
----------------------------------------------------------------------
diff --git a/tez-tools/swimlanes/swimlane.py b/tez-tools/swimlanes/swimlane.py
new file mode 100644
index 0000000..5a527ba
--- /dev/null
+++ b/tez-tools/swimlanes/swimlane.py
@@ -0,0 +1,163 @@
+import os,sys,re,math,os.path
+import StringIO
+from amlogparser import AMLog
+import random
+from getopt import getopt
+
+class ColourManager(object):
+ def __init__(self):
+ # text-printable colours
+ self.colours = [
+ '#E4F5FC', '#62C2A2', '#E2F2D8', '#A9DDB4', '#E2F6E1', '#D8DAD7', '#BBBDBA', '#FEE6CE', '#FFCF9F',
+ '#FDAE69', '#FDE4DD', '#EDE6F2', '#A5BDDB', '#FDE1EE', '#D8B9D8', '#D7DCEC', '#BABDDA', '#FDC5BF',
+ '#FC9FB3', '#FDE1D2', '#FBBB9E', '#DBEF9F', '#AADD8E', '#81CDBB', '#C7EDE8', '#96D9C8', '#E3EBF4',
+ '#BAD3E5', '#9DBDD9', '#8996C8', '#CEEAC6', '#76CCC6', '#C7E9BE', '#9ED99C', '#71C572', '#EFF1EE',
+ '#949693', '#FD8D3D', '#FFF7ED', '#FED3AE', '#FEBB8F', '#FCE9CA', '#FED49B', '#FBBC85', '#FB8E58',
+ '#FFEEE8', '#D0D0E8', '#76A9CE', '#FDFFFC', '#E9E2EE', '#64A8D2', '#FAF7FC', '#F6ECF2', '#F8E7F0',
+ '#C994C6', '#E063B1', '#ECEDF7', '#DDD9EB', '#9B9BCA', '#FEDFDE', '#F8689F', '#FC9273', '#FC6948',
+ '#F6FDB6', '#78C67B', '#EBF9B0', '#C5E9B0', '#40B7C7', '#FDF7BA', '#FFE392', '#FFC34C', '#FF982A']
+ self.i = 0
+ def next(self):
+ self.i += 1
+ return self.colours[self.i % len(self.colours)]
+
+def attempts(tree):
+ for d in tree.dags:
+ for a in d.attempts():
+ yield (a.vertex, a.name, a.container, a.start, a.finish)
+
+def attrs(args):
+ s = ""
+ for k in args:
+ v = args[k]
+ k = k.replace("_","-") # css
+ if type(v) is str:
+ s += "%s='%s' " % (k,v)
+ else:
+ s += "%s=%s " % (k,str(v))
+ return s
+
+class SVGHelper(object):
+ def __init__(self, w, h, parent=None):
+ self.width = w
+ self.height = h
+ self.parent = parent
+ if(not parent):
+ self.lines = StringIO.StringIO()
+ self.write("""<?xml version="1.0" standalone="no"?>
+ <!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+ """)
+ else:
+ self.lines = parent.lines
+ self.write("""<svg xmlns="http://www.w3.org/2000/svg" version="1.1" xmlns:xlink="http://www.w3.org/1999/xlink" height="%d" width="%d">""" % (h, w))
+ def line(self, x1, y1, x2, y2, style="stroke: #000", **kwargs):
+ self.write("""<line x1="%d" y1="%d" x2="%d" y2="%d" style="%s" %s />""" % (x1, y1, x2, y2, style, attrs(kwargs)))
+ def rect(self, left, top, right, bottom, style="", title=""):
+ w = (right-left)
+ h = (bottom-top)
+ self.write("""<rect x="%d" y="%d" width="%d" height="%d" style="%s"><title>%s</title></rect>""" % (left, top, w, h, style, title))
+ def text(self, x, y, text, style=""):
+ self.write("""<text x="%d" y="%d" style="%s">%s</text>""" % (x, y, style, text))
+ def link(self, x, y, text, link, style=""):
+ self.write("<a xlink:href='%s'>" % link)
+ self.text(x, y, text, style)
+ self.write("</a>")
+ def write(self, s):
+ self.lines.write(s)
+ def flush(self):
+ self.write("</svg>")
+ if(self.parent):
+ self.parent.flush()
+ return self.lines.getvalue()
+
+def usage():
+ sys.stderr.write("""
+usage: swimlane.py [-t ms-per-pixel] [-o outputfile] [-f redline-fraction] <log-file>
+
+Input files for this tool can be prepared by "yarn logs -applicationId <application_...> | grep HISTORY".
+""")
+
+def main(argv):
+ (opts, args) = getopt(argv, "o:t:f:")
+ out = sys.stdout
+ ticks = 20 # precision of 1/tick
+ fraction = 0.95
+ for k,v in opts:
+ if(k == "-o"):
+ out = open(v, "w")
+ if(k == "-t"):
+ ticks = int(v)
+ if(k == "-f"):
+ if(int(v) < 100):
+ fraction = int(v)/100.0
+ if len(args) == 0:
+ return usage()
+ log = AMLog(args[0]).structure()
+ lanes = [c.name for c in sorted(log.containers.values(), key=lambda a: a.start)]
+ marginTop = 128
+ marginRight = 100;
+ laneSize = 24
+ y = len(lanes)*laneSize
+ items = attempts(log)
+ xdomain = lambda t : (t - log.zero)/ticks
+ x = max([xdomain(a[4]) for a in items])
+ svg = SVGHelper(x+2*marginRight, y+2*marginTop)
+ a = marginTop
+ svg.text(x/2, 32, log.name, style="font-size: 32px; text-anchor: middle")
+ containerMap = dict(zip(list(lanes), xrange(4096)))
+ # draw a grid
+ for l in lanes:
+ a += laneSize
+ svg.text(marginRight - 4, a, l, "text-anchor:end; font-size: 16px;")
+ svg.line(marginRight, a, marginRight+x, a, "stroke: #ccc")
+ for x1 in set(range(0, x, 10*ticks)) | set([x]):
+ svg.text(marginRight+x1, marginTop-laneSize/2, "%0.2f s" % ((x1 * ticks)/1000), "text-anchor: middle; font-size: 12px")
+ svg.line(marginRight+x1, marginTop-laneSize/2, marginRight+x1, marginTop+y, "stroke: #ddd")
+ svg.line(marginRight, marginTop, marginRight+x, marginTop)
+ svg.line(marginRight, y+marginTop, marginRight+x, y+marginTop)
+ svg.line(marginRight, marginTop, marginRight, y+marginTop)
+ svg.line(marginRight+x, marginTop, marginRight+x, y+marginTop)
+
+ colourman = ColourManager()
+ for c in log.containers.values():
+ y1 = marginTop+(containerMap[c.name]*laneSize)
+ x1 = marginRight+xdomain(c.start)
+ svg.line(x1, y1, x1, y1 + laneSize, style="stroke: green")
+ if c.stop > c.start:
+ x2 = marginRight+xdomain(c.stop)
+ svg.line(x2, y1, x2, y1 + laneSize, style="stroke: red")
+ svg.rect(x1, y1, x2, y1 + laneSize, style="fill: #ccc; opacity: 0.3")
+ elif c.stop == -1:
+ x2 = marginRight+x
+ svg.rect(x1, y1, x2, y1 + laneSize, style="fill: #ccc; opacity: 0.3")
+ for dag in log.dags:
+ x1 = marginRight+xdomain(dag.start)
+ svg.line(x1, marginTop-24, x1, marginTop+y, "stroke: black;", stroke_dasharray="8,4")
+ x2 = marginRight+xdomain(dag.finish)
+ svg.line(x2, marginTop-24, x2, marginTop+y, "stroke: black;", stroke_dasharray="8,4")
+ svg.line(x1, marginTop-24, x2, marginTop-24, "stroke: black")
+ svg.text((x1+x2)/2, marginTop-32, "%s (%0.1f)" % (dag.name, (dag.finish-dag.start)/1000.0) , "text-anchor: middle; font-size: 12px;")
+ vertexes = set([v.name for v in dag.vertexes])
+ colourmap = dict([(v,colourman.next()) for v in list(vertexes)])
+ for c in dag.attempts():
+ colour = colourmap[c.vertex]
+ y1 = marginTop+(containerMap[c.container]*laneSize)+1
+ x1 = marginRight+xdomain(c.start)
+ x2 = marginRight+xdomain(c.finish)
+ y2 = y1 + laneSize - 2
+ locality = (c.kvs.has_key("DATA_LOCAL_TASKS") * 1) + (c.kvs.has_key("RACK_LOCAL_TASKS")*2)
+ svg.rect(x1, y1, x2, y2, title=c.name, style="fill: %s; stroke: #ccc;" % (colour))
+ if locality > 1: # rack-local (no-locality isn't counted)
+ svg.rect(x1, y2-4, x2, y2, style="fill: #f00; fill-opacity: 0.5;")
+ svg.link((x1+x2)/2, y2-12, c.vertex, link=c.kvs["completedLogs"], style="text-anchor: middle; font-size: 9px;")
+ finishes = sorted([c.finish for c in dag.attempts()])
+ if(len(finishes) > 10):
+ percentX = finishes[int(len(finishes)*fraction)]
+ svg.line(marginRight+xdomain(percentX), marginTop, marginRight+xdomain(percentX), y+marginTop, style="stroke: red")
+ svg.text(marginRight+xdomain(percentX), y+marginTop+12, "%d%% (%0.1fs)" % (int(fraction*100), (percentX - dag.start)/1000.0), style="font-size:12px; text-anchor: middle")
+ prefix = lambda a: (a.find(".") == -1 and a) or (a[:a.find(".")])
+ out.write(svg.flush())
+ out.close()
+
+if __name__ == "__main__":
+ sys.exit(main(sys.argv[1:]))