You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by go...@apache.org on 2014/08/06 22:14:08 UTC

git commit: TEZ-1332. Swimlane diagrams from tez AM logs (gopalv)

Repository: tez
Updated Branches:
  refs/heads/master 45420856e -> 948565e99


TEZ-1332. Swimlane diagrams from tez AM logs (gopalv)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/948565e9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/948565e9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/948565e9

Branch: refs/heads/master
Commit: 948565e99e309a1118d61d1c5c4883d4667e7760
Parents: 4542085
Author: Gopal V <go...@apache.org>
Authored: Wed Aug 6 11:47:29 2014 -0700
Committer: Gopal V <go...@apache.org>
Committed: Wed Aug 6 11:47:29 2014 -0700

----------------------------------------------------------------------
 tez-tools/swimlanes/README.md      |  34 +++++
 tez-tools/swimlanes/amlogparser.py | 240 ++++++++++++++++++++++++++++++++
 tez-tools/swimlanes/swimlane.py    | 163 ++++++++++++++++++++++
 3 files changed, 437 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/948565e9/tez-tools/swimlanes/README.md
----------------------------------------------------------------------
diff --git a/tez-tools/swimlanes/README.md b/tez-tools/swimlanes/README.md
new file mode 100644
index 0000000..87a90f5
--- /dev/null
+++ b/tez-tools/swimlanes/README.md
@@ -0,0 +1,34 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+Swimlanes
+=========
+
+This is a performance and post-hoc analysis tool for Apache Tez which uses the log lines
+printed during execution to draw a diagram which represents each container allocated
+with a horizontal lane, where each task is represented as a section of the lane. 
+
+Currently, the tool only works for successfully completed DAGs and does not have much way
+of representing failures or data dependency in the diagram.
+
+The data output format is SVG, which also includes clickable links back to the history logs 
+of each task for further debugging, after a slow task has been located. 
+
+To use tool, generate a trimmed log file by doing
+
+`yarn logs -applicationId <application_...> | grep HISTORY > am.log`
+
+Generate the diagram with the tool using
+
+`python swimlane.py -o am.svg am.log`

http://git-wip-us.apache.org/repos/asf/tez/blob/948565e9/tez-tools/swimlanes/amlogparser.py
----------------------------------------------------------------------
diff --git a/tez-tools/swimlanes/amlogparser.py b/tez-tools/swimlanes/amlogparser.py
new file mode 100644
index 0000000..46d0599
--- /dev/null
+++ b/tez-tools/swimlanes/amlogparser.py
@@ -0,0 +1,240 @@
+import os,sys,re,math,os.path
+from collections import defaultdict
+from itertools import groupby
+from bz2 import BZ2File
+from gzip import GzipFile as GZFile
+try:
+	from urllib.request import urlopen
+except:
+	from urllib2 import urlopen as urlopen
+
+class AMRawEvent(object):
+	def __init__(self, ts, dag, event, args):
+		self.ts = ts
+		self.dag = dag
+		self.event = event
+		self.args = args
+	def __repr__(self):
+		return "%s->%s (%s)" % (self.dag, self.event, self.args)
+
+def first(l):
+	return (l[:1] or [None])[0]
+
+def kv_add(d, k, v):
+	if(d.has_key(k)):
+		oldv = d[k]
+		if(type(oldv) is list):
+			oldv.append(v)
+		else:
+			oldv = [oldv, v]
+		d[k] = oldv
+	else:
+		d[k] = v
+			
+def csv_kv(args):
+	kvs = {};
+	pairs = [p.strip() for p in args.split(",")]
+	for kv in pairs:
+		if(kv.find("=") == -1):
+			kv_add(kvs, kv, None)
+		elif(kv.find("=") == kv.rfind("=")):
+			(k,v) = kv.split("=")
+			kv_add(kvs, k, v)
+	return kvs
+
+class AppMaster(object):
+	def __init__(self, raw):
+		self.raw = raw
+		self.kvs = csv_kv(raw.args)
+		self.name = self.kvs["appAttemptId"]
+		self.zero = int(self.kvs["startTime"])
+		#self.ready = int(self.kvs["initTime"])
+		#self.start = int(self.kvs["appSubmitTime"])
+		self.containers = None
+		self.dags = None
+	def __repr__(self):
+		return "[%s started at %d]" % (self.name, self.zero)
+	
+class Container(object):
+	def __init__(self, raw):
+		self.raw = raw
+		self.kvs = csv_kv(raw.args)
+		self.name = self.kvs["containerId"]
+		self.start = int(self.kvs["launchTime"])
+		self.stop = -1 
+		self.node =""
+	def __repr__(self):
+		return "[%s start=%d]" % (self.name, self.start)
+
+class DAG(object):
+	def __init__(self, raw):
+		self.raw = raw
+		self.name = raw.dag
+		self.kvs = csv_kv(raw.args)
+		self.start = (int)(self.kvs["startTime"])
+		self.finish = (int)(self.kvs["finishTime"])
+		self.duration = (int)(self.kvs["timeTaken"])
+	def structure(self, vertexes):
+		self.vertexes = [v for v in vertexes if v.dag == self.name]
+	def attempts(self):
+		for v in self.vertexes:
+			for t in v.tasks:
+				for a in t.attempts:
+					if(a.dag == self.name):
+						yield a
+	def __repr__(self):
+		return "%s (%d+%d)" % (self.name, self.start, self.duration)
+
+class Vertex(object):
+	def __init__(self, raw):
+		self.raw = raw
+		self.dag = raw.dag
+		self.kvs = csv_kv(raw.args)
+		self.name = self.kvs["vertexName"]
+		self.initZero = (int)(self.kvs["initRequestedTime"])
+		self.init = (int)(self.kvs["initedTime"])
+		self.startZero = (int)(self.kvs["startRequestedTime"])
+		self.start = (int)(self.kvs["startedTime"])
+		self.finish = (int)(self.kvs["finishTime"])
+		self.duration = (int)(self.kvs["timeTaken"])
+	def structure(self, tasks):
+		self.tasks = [t for t in tasks if t.vertex == self.name]
+	def __repr__(self):
+		return "%s (%d+%d)" % (self.name, self.start, self.duration)
+
+
+class Task(object):
+	def __init__(self, raw):
+		self.raw = raw
+		self.dag = raw.dag
+		self.kvs = csv_kv(raw.args)
+		self.vertex = self.kvs["vertexName"]
+		self.name = self.kvs["taskId"]
+		self.start = (int)(self.kvs["startTime"])
+		self.finish = (int)(self.kvs["finishTime"])
+		self.duration = (int)(self.kvs["timeTaken"])
+	def structure(self, attempts):
+		self.attempts = [a for a in attempts if a.task == self.name]
+	def __repr__(self):
+		return "%s (%d+%d)" % (self.name, self.start, self.duration)
+
+class Attempt(object):
+	def __init__(self, pair):
+		start = first(filter(lambda a: a.event == "TASK_ATTEMPT_STARTED", pair))
+		finish = first(filter(lambda a: a.event == "TASK_ATTEMPT_FINISHED", pair))
+		self.raw = finish
+		self.dag = finish.dag
+		self.kvs = csv_kv(start.args)
+		self.kvs.update(csv_kv(finish.args))
+		self.name = self.kvs["taskAttemptId"]
+		self.task = self.name[:self.name.rfind("_")].replace("attempt","task")
+		self.vertex = self.kvs["vertexName"]
+		self.start = (int)(self.kvs["startTime"])
+		self.finish = (int)(self.kvs["finishTime"])
+		self.duration = (int)(self.kvs["timeTaken"])
+		self.container = self.kvs["containerId"]
+		self.node = self.kvs["nodeId"]
+	def __repr__(self):
+		return "%s (%d+%d)" % (self.name, self.start, self.duration)
+		
+
+def open_file(f):
+	if(f.endswith(".gz")):
+		return GZFile(f)
+	elif(f.endswith(".bz2")):
+		return BZ2File(f)
+	elif(f.startswith("http://")):
+		return urlopen(f)
+	return open(f)
+
+class AMLog(object):	
+	def init(self):
+		ID=r'[^\]]*'
+		TS=r'[0-9:\-, ]*'
+		MAIN_RE=r'^(?P<ts>%(ts)s) INFO [(?P<thread>%(id)s)] org.apache.tez.dag.history.HistoryEventHandler: [HISTORY][DAG:(?P<dag>%(id)s)][Event:(?P<event>%(id)s)]: (?P<args>.*)'
+		MAIN_RE = MAIN_RE.replace('[','\[').replace(']','\]')
+		MAIN_RE = MAIN_RE % {'ts' : TS, 'id' : ID}
+		self.MAIN_RE = re.compile(MAIN_RE)
+	
+	def __init__(self, f):
+		fp = open_file(f)
+		self.init()
+		self.events = filter(lambda a:a, [self.parse(l.strip()) for l in fp])
+	
+	def structure(self):
+		am = self.appmaster() # this is a copy
+		containers = dict([(a.name, a) for a in self.containers()])
+		dags = self.dags()
+		vertexes = self.vertexes()
+		tasks = self.tasks()
+		attempts = self.attempts()
+		for t in tasks:
+			t.structure(attempts)
+		for v in vertexes:
+			v.structure(tasks)
+		for d in dags:
+			d.structure(vertexes)
+		for a in attempts:
+			c = containers[a.container]
+			c.node = a.node
+		am.containers = containers
+		am.dags = dags
+		return am
+
+	def appmaster(self):
+		return first([AppMaster(ev) for ev in self.events if ev.event == "AM_STARTED"])
+	
+	def containers(self):
+		containers = [Container(ev) for ev in self.events if ev.event == "CONTAINER_LAUNCHED"]
+		containermap = dict([(c.name, c) for c in containers])
+		for ev in self.events:
+			if ev.event == "CONTAINER_STOPPED":
+				kvs = csv_kv(ev.args)
+				if containermap.has_key(kvs["containerId"]):
+					containermap[kvs["containerId"]].stop = int(kvs["stoppedTime"])
+		return containers
+				
+	
+	def dags(self):
+		dags = [DAG(ev) for ev in self.events if ev.event == "DAG_FINISHED"]
+		return dags
+	
+	def vertexes(self):
+		""" yes, not vertices """
+		vertexes = [Vertex(ev) for ev in self.events if ev.event == "VERTEX_FINISHED"]
+		return vertexes
+	
+	def tasks(self):
+		tasks = [Task(ev) for ev in self.events if ev.event == "TASK_FINISHED"]
+		return tasks
+	
+	def attempts(self):
+		key = lambda a:a[0]
+		value = lambda a:a[1]
+		raw = [(csv_kv(ev.args)["taskAttemptId"], ev) for ev in self.events if ev.event == "TASK_ATTEMPT_STARTED" or ev.event == "TASK_ATTEMPT_FINISHED"]
+		pairs = groupby(sorted(raw), key = key)
+		attempts = [Attempt(map(value,p)) for (k,p) in pairs]
+		return attempts
+	
+	def parse(self, l):		
+		if(l.find("[HISTORY]") != -1):
+			m = self.MAIN_RE.match(l)
+			ts = m.group("ts")
+			dag = m.group("dag")
+			event = m.group("event")
+			args = m.group("args")
+			return AMRawEvent(ts, dag, event, args)
+
+def main(argv):
+	f = argv[0]
+	tree = AMLog(argv[0]).structure()
+	# AM -> dag -> vertex -> task -> attempt
+	# AM -> container
+	containers = set(tree.containers.keys())
+	timeto = lambda a: (a - tree.zero)
+	for d in tree.dags:
+		for a in d.attempts():			
+			print [a.vertex, a.name, a.container, a.start, a.finish]
+
+if __name__ == "__main__":
+	main(sys.argv[1:])

http://git-wip-us.apache.org/repos/asf/tez/blob/948565e9/tez-tools/swimlanes/swimlane.py
----------------------------------------------------------------------
diff --git a/tez-tools/swimlanes/swimlane.py b/tez-tools/swimlanes/swimlane.py
new file mode 100644
index 0000000..5a527ba
--- /dev/null
+++ b/tez-tools/swimlanes/swimlane.py
@@ -0,0 +1,163 @@
+import os,sys,re,math,os.path
+import StringIO
+from amlogparser import AMLog
+import random
+from getopt import getopt
+
+class ColourManager(object):
+	def __init__(self):
+		# text-printable colours
+		self.colours = [
+		'#E4F5FC', '#62C2A2', '#E2F2D8', '#A9DDB4', '#E2F6E1', '#D8DAD7', '#BBBDBA', '#FEE6CE', '#FFCF9F',
+		'#FDAE69', '#FDE4DD', '#EDE6F2', '#A5BDDB', '#FDE1EE', '#D8B9D8', '#D7DCEC', '#BABDDA', '#FDC5BF',
+		'#FC9FB3', '#FDE1D2', '#FBBB9E', '#DBEF9F', '#AADD8E', '#81CDBB', '#C7EDE8', '#96D9C8', '#E3EBF4',
+		'#BAD3E5', '#9DBDD9', '#8996C8', '#CEEAC6', '#76CCC6', '#C7E9BE', '#9ED99C', '#71C572', '#EFF1EE',
+		'#949693', '#FD8D3D', '#FFF7ED', '#FED3AE', '#FEBB8F', '#FCE9CA', '#FED49B', '#FBBC85', '#FB8E58',
+		'#FFEEE8', '#D0D0E8', '#76A9CE', '#FDFFFC', '#E9E2EE', '#64A8D2', '#FAF7FC', '#F6ECF2', '#F8E7F0',
+		'#C994C6', '#E063B1', '#ECEDF7', '#DDD9EB', '#9B9BCA', '#FEDFDE', '#F8689F', '#FC9273', '#FC6948',
+		'#F6FDB6', '#78C67B', '#EBF9B0', '#C5E9B0', '#40B7C7', '#FDF7BA', '#FFE392', '#FFC34C', '#FF982A']
+		self.i = 0
+	def next(self):
+		self.i += 1
+		return self.colours[self.i % len(self.colours)]
+
+def attempts(tree):
+	for d in tree.dags:
+		for a in d.attempts():
+			yield (a.vertex, a.name, a.container, a.start, a.finish)
+
+def attrs(args):
+	s = ""
+	for k in args:
+		v = args[k]
+		k = k.replace("_","-") # css
+		if type(v) is str:
+			s += "%s='%s' " % (k,v)
+		else:
+			s += "%s=%s " % (k,str(v))
+	return s
+
+class SVGHelper(object):
+	def __init__(self, w, h, parent=None):
+		self.width = w		
+		self.height = h
+		self.parent = parent
+		if(not parent):
+			self.lines = StringIO.StringIO()
+			self.write("""<?xml version="1.0" standalone="no"?>
+		<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+		""")
+		else:
+			self.lines = parent.lines
+		self.write("""<svg xmlns="http://www.w3.org/2000/svg" version="1.1" xmlns:xlink="http://www.w3.org/1999/xlink" height="%d" width="%d">""" % (h, w))	
+	def line(self, x1, y1, x2, y2, style="stroke: #000", **kwargs):
+		self.write("""<line x1="%d" y1="%d" x2="%d" y2="%d"  style="%s" %s />""" % (x1, y1, x2, y2, style, attrs(kwargs)))
+	def rect(self, left, top, right, bottom, style="", title=""):
+		w = (right-left)
+		h = (bottom-top)
+		self.write("""<rect x="%d" y="%d" width="%d" height="%d" style="%s"><title>%s</title></rect>""" % (left, top, w, h, style, title))
+	def text(self, x, y, text, style=""):
+		self.write("""<text x="%d" y="%d" style="%s">%s</text>""" % (x, y, style, text))
+	def link(self, x, y, text, link, style=""):
+		self.write("<a xlink:href='%s'>" % link)
+		self.text(x, y, text, style)
+		self.write("</a>")
+	def write(self, s):
+		self.lines.write(s)
+	def flush(self):
+		self.write("</svg>")
+		if(self.parent):
+			self.parent.flush()
+		return self.lines.getvalue()
+
+def usage():
+	sys.stderr.write("""
+usage: swimlane.py [-t ms-per-pixel] [-o outputfile] [-f redline-fraction] <log-file>
+
+Input files for this tool can be prepared by "yarn logs -applicationId <application_...> | grep HISTORY".
+""")
+
+def main(argv):
+	(opts, args) = getopt(argv, "o:t:f:")
+	out = sys.stdout
+	ticks = 20 # precision of 1/tick
+	fraction = 0.95
+	for k,v in opts:
+		if(k == "-o"):
+			out = open(v, "w")
+		if(k == "-t"):
+			ticks = int(v)
+		if(k == "-f"):
+			if(int(v) < 100):
+				fraction = int(v)/100.0
+	if len(args) == 0:
+		return usage()
+	log = AMLog(args[0]).structure()
+	lanes = [c.name for c in sorted(log.containers.values(), key=lambda a: a.start)]
+	marginTop = 128
+	marginRight = 100;
+	laneSize = 24
+	y = len(lanes)*laneSize
+	items = attempts(log)
+	xdomain = lambda t : (t - log.zero)/ticks 
+	x = max([xdomain(a[4]) for a in items])
+	svg = SVGHelper(x+2*marginRight, y+2*marginTop)
+	a = marginTop
+	svg.text(x/2, 32, log.name, style="font-size: 32px; text-anchor: middle")	
+	containerMap = dict(zip(list(lanes), xrange(4096)))
+	# draw a grid
+	for l in lanes:
+		a += laneSize
+		svg.text(marginRight - 4, a, l, "text-anchor:end; font-size: 16px;")
+		svg.line(marginRight, a, marginRight+x, a, "stroke: #ccc")
+	for x1 in set(range(0, x, 10*ticks)) | set([x]):
+		svg.text(marginRight+x1, marginTop-laneSize/2, "%0.2f s" % ((x1 *  ticks)/1000), "text-anchor: middle; font-size: 12px")
+		svg.line(marginRight+x1, marginTop-laneSize/2, marginRight+x1, marginTop+y, "stroke: #ddd")
+	svg.line(marginRight, marginTop, marginRight+x, marginTop)
+	svg.line(marginRight, y+marginTop, marginRight+x, y+marginTop)
+	svg.line(marginRight, marginTop, marginRight, y+marginTop)
+	svg.line(marginRight+x, marginTop, marginRight+x, y+marginTop)
+	
+	colourman = ColourManager()
+	for c in log.containers.values():
+		y1 = marginTop+(containerMap[c.name]*laneSize)
+		x1 = marginRight+xdomain(c.start)
+		svg.line(x1, y1, x1, y1 + laneSize, style="stroke: green")
+		if c.stop > c.start:
+			x2 = marginRight+xdomain(c.stop)
+			svg.line(x2, y1, x2, y1 + laneSize, style="stroke: red")
+			svg.rect(x1, y1, x2, y1 + laneSize, style="fill: #ccc; opacity: 0.3")
+		elif c.stop == -1:
+			x2 = marginRight+x 
+			svg.rect(x1, y1, x2, y1 + laneSize, style="fill: #ccc; opacity: 0.3")
+	for dag in log.dags:
+		x1 = marginRight+xdomain(dag.start)
+		svg.line(x1, marginTop-24, x1, marginTop+y, "stroke: black;", stroke_dasharray="8,4")
+		x2 = marginRight+xdomain(dag.finish)
+		svg.line(x2, marginTop-24, x2, marginTop+y, "stroke: black;", stroke_dasharray="8,4")
+		svg.line(x1, marginTop-24, x2, marginTop-24, "stroke: black")
+		svg.text((x1+x2)/2, marginTop-32, "%s (%0.1f)" % (dag.name, (dag.finish-dag.start)/1000.0) , "text-anchor: middle; font-size: 12px;")		
+		vertexes = set([v.name for v in dag.vertexes])
+		colourmap = dict([(v,colourman.next()) for v in list(vertexes)])
+		for c in dag.attempts():
+			colour = colourmap[c.vertex]
+			y1 = marginTop+(containerMap[c.container]*laneSize)+1
+			x1 = marginRight+xdomain(c.start)
+			x2 = marginRight+xdomain(c.finish)
+			y2 = y1 + laneSize - 2
+			locality = (c.kvs.has_key("DATA_LOCAL_TASKS") * 1) + (c.kvs.has_key("RACK_LOCAL_TASKS")*2)
+			svg.rect(x1, y1, x2, y2, title=c.name, style="fill: %s; stroke: #ccc;" % (colour))
+			if locality > 1: # rack-local (no-locality isn't counted)
+				svg.rect(x1, y2-4, x2, y2, style="fill: #f00; fill-opacity: 0.5;")
+			svg.link((x1+x2)/2, y2-12, c.vertex, link=c.kvs["completedLogs"], style="text-anchor: middle; font-size: 9px;")
+		finishes = sorted([c.finish for c in dag.attempts()])
+		if(len(finishes) > 10):
+			percentX = finishes[int(len(finishes)*fraction)]
+			svg.line(marginRight+xdomain(percentX), marginTop, marginRight+xdomain(percentX), y+marginTop, style="stroke: red")
+			svg.text(marginRight+xdomain(percentX), y+marginTop+12, "%d%% (%0.1fs)" % (int(fraction*100), (percentX - dag.start)/1000.0), style="font-size:12px; text-anchor: middle")
+	prefix = lambda a: (a.find(".") == -1 and a) or (a[:a.find(".")])
+	out.write(svg.flush())
+	out.close()
+
+if __name__ == "__main__":
+	sys.exit(main(sys.argv[1:]))