You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/11/06 22:08:07 UTC
git commit: Added mesos-cat and mesos-tail.
Updated Branches:
refs/heads/master bb8a627c7 -> 847f6cf43
Added mesos-cat and mesos-tail.
These are actually slightly improved version of the original shared by
Ben.H at mesos hackathon on Oct.25th 2013.
Modifications I did:
- add python version check
- add quote for query contents
- support to inspect files on completed frameworks/tasks
From: Shingo Omura <ev...@gmail.com>
Review: https://reviews.apache.org/r/14951
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/847f6cf4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/847f6cf4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/847f6cf4
Branch: refs/heads/master
Commit: 847f6cf43a8517908498d141aba34dc1b013fd0d
Parents: bb8a627
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Nov 6 11:01:35 2013 -1000
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Nov 6 11:04:00 2013 -1000
----------------------------------------------------------------------
src/cli/mesos-cat | 208 ++++++++++++++++++++++++++++++++++++++++++++++++
src/cli/mesos-tail | 200 ++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 408 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/847f6cf4/src/cli/mesos-cat
----------------------------------------------------------------------
diff --git a/src/cli/mesos-cat b/src/cli/mesos-cat
new file mode 100755
index 0000000..6ea5d8a
--- /dev/null
+++ b/src/cli/mesos-cat
@@ -0,0 +1,208 @@
+#!/usr/bin/env python
+
+import datetime
+import json
+import os
+import resource
+import subprocess
+import signal
+import sys
+import time
+import urllib
+
+from optparse import OptionParser
+
+
+if sys.version_info < (2,6,0):
+ sys.stderr.write('Expecting Python >= 2.6\n')
+ sys.exit(1)
+
+
+# Helper that uses 'mesos-resolve' to resolve the master's IP:port.
+def resolve(master):
+ process = subprocess.Popen(
+ ['mesos-resolve', master],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ shell=False)
+
+ status = process.wait()
+ if status != 0:
+ print "Failed to resolve 'mesos-resolve %s'\n" % master
+ print process.stderr.read()
+ sys.exit(1)
+
+ return process.stdout.read()
+
+
+class Slave:
+ def __init__(self, slave):
+ self.slave = slave
+
+ def hostname(self):
+ return self.slave['hostname']
+
+ def curl(self, path, query):
+ pid = self.slave['pid']
+ url = 'http://' + pid[len('slave(1)@'):] + path
+ if query is not None and len(query) > 0:
+ url += '?' + '&'.join(
+ ['%s=%s' % (urllib.quote(str(key)), urllib.quote(str(value))) for (key, value) in query.items()])
+
+ process = subprocess.Popen(
+ ['curl', '-sSfL', url],
+ stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ shell=False)
+
+ status = process.wait()
+ if status != 0:
+ print 'Failed to execute \'curl\':\n'
+ print process.stderr.read()
+ sys.exit(1)
+
+ result = process.stdout.read()
+ process.stdout.close()
+ process.stderr.close()
+ return result
+
+ def read(self, task, file):
+ framework_id = task['framework_id']
+ executor_id = task['executor_id']
+ if executor_id == "": executor_id = task['id']
+
+ # Get 'state.json' to get the executor directory.
+ state = json.loads(self.curl('/slave(1)/state.json', None))
+
+ directory = None
+
+ for framework in state['frameworks']:
+ if framework['id'] == framework_id:
+ for executor in framework['executors']:
+ if executor['id'] == executor_id:
+ directory = executor['directory']
+ break
+ for completed_executor in framework['completed_executors']:
+ if completed_executor['id'] == executor_id:
+ directory = completed_executor['directory']
+ break
+
+ for completed_framework in state['completed_frameworks']:
+ if completed_framework['id'] == framework_id:
+ for completed_executor in completed_framework['completed_executors']:
+ if completed_executor['id'] == executor_id:
+ directory = completed_executor['directory']
+ break
+
+ if directory is None:
+ raise IOError('Task directory/file not found')
+
+ path = os.path.join(directory, file)
+
+ # Determine the current length of the file.
+ result = json.loads(self.curl(
+ '/files/read.json',
+ {'path': path,
+ 'offset': -1}))
+
+ length = result['offset']
+
+ # Start streaming "pages" up to length.
+ PAGE_LENGTH = 1024
+ offset = 0
+
+ while True:
+ result = json.loads(self.curl(
+ '/files/read.json',
+ {'path': path,
+ 'offset': offset,
+ 'length': PAGE_LENGTH}))
+ offset += len(result['data'])
+ yield result['data']
+ if offset == length:
+ return
+
+
+def main():
+ # Parse options for this script.
+ parser = OptionParser()
+ parser.add_option('--master')
+ parser.add_option('--framework')
+ parser.add_option('--task')
+ parser.add_option('--file')
+ (options, args) = parser.parse_args(sys.argv)
+
+ if options.master is None:
+ print "Missing --master\n"
+ parser.print_help()
+ exit(-1)
+
+ if options.framework is None:
+ print "Missing --framework\n"
+ parser.print_help()
+ exit(-1)
+
+ if options.task is None:
+ print "Missing --task\n"
+ parser.print_help()
+ exit(-1)
+
+ if options.file is None:
+ print "Missing --file\n"
+ parser.print_help()
+ exit(-1)
+
+ url = 'http://' + resolve(options.master) + '/master/state.json'
+ file = urllib.urlopen(url)
+ state = json.loads(file.read())
+ file.close()
+
+ # Build a dict from slave ID to `slaves'.
+ slaves = {}
+ for slave in state['slaves']:
+ slaves[slave['id']] = Slave(slave)
+
+ target_task = None
+ target_slave = None
+
+ for framework in state['frameworks']:
+ if framework['id'] == options.framework:
+ for task in framework['tasks']:
+ if (task['id'] == options.task):
+ target_task = task
+ target_slave = slaves[task['slave_id']]
+ break
+ for completed_task in framework['completed_tasks']:
+ if (completed_task['id'] == options.task):
+ target_task = completed_task
+ target_slave = slaves[completed_task['slave_id']]
+ break
+
+ for completed_framework in state['completed_frameworks']:
+ if completed_framework['id'] == options.framework:
+ for completed_task in completed_framework['completed_tasks']:
+ if (completed_task['id'] == options.task):
+ target_task = completed_task
+ target_slave= slaves[completed_task['slave_id']]
+ break
+
+ for data in target_slave.read(target_task, options.file):
+ sys.stdout.write(data)
+ sys.stdout.flush()
+ exit(0)
+
+ sys.stderr.write('No task found!\n')
+ sys.stderr.flush()
+ exit(-1)
+
+
+if __name__ == "__main__":
+ def signal_handler(signal, frame):
+ sys.stdout.write('\n')
+ sys.exit(130)
+
+ signal.signal(signal.SIGINT, signal_handler)
+
+ main()
http://git-wip-us.apache.org/repos/asf/mesos/blob/847f6cf4/src/cli/mesos-tail
----------------------------------------------------------------------
diff --git a/src/cli/mesos-tail b/src/cli/mesos-tail
new file mode 100755
index 0000000..f4f7c75
--- /dev/null
+++ b/src/cli/mesos-tail
@@ -0,0 +1,200 @@
+#!/usr/bin/env python
+
+import datetime
+import json
+import os
+import resource
+import signal
+import subprocess
+import sys
+import time
+import urllib
+
+from optparse import OptionParser
+
+
+if sys.version_info < (2,6,0):
+ sys.stderr.write('Expecting Python >= 2.6\n')
+ sys.exit(1)
+
+
+# Helper that uses 'mesos-resolve' to resolve the master's IP:port.
+def resolve(master):
+ process = subprocess.Popen(
+ ['mesos-resolve', master],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ shell=False)
+
+ status = process.wait()
+ if status != 0:
+ print "Failed to resolve 'mesos-resolve %s'\n" % master
+ print process.stderr.read()
+ sys.exit(1)
+
+ return process.stdout.read()
+
+
+class Slave:
+ def __init__(self, slave):
+ self.slave = slave
+
+ def hostname(self):
+ return self.slave['hostname']
+
+ def curl(self, path, query):
+ pid = self.slave['pid']
+ url = 'http://' + pid[len('slave(1)@'):] + path
+ if query is not None and len(query) > 0:
+ url += '?' + '&'.join(
+ ['%s=%s' % (urllib.quote(str(key)), urllib.quote(str(value))) for (key, value) in query.items()])
+
+ process = subprocess.Popen(
+ ['curl', '-sSfL', url],
+ stdin=None,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ shell=False)
+
+ status = process.wait()
+ if status != 0:
+ print 'Failed to execute \'curl\':\n'
+ print process.stderr.read()
+ sys.exit(1)
+
+ result = process.stdout.read()
+ process.stdout.close()
+ process.stderr.close()
+ return result
+
+ def tail(self, task, file):
+ framework_id = task['framework_id']
+ executor_id = task['executor_id']
+ if executor_id == "": executor_id = task['id']
+
+ # Get 'state.json' to get the executor directory.
+ state = json.loads(self.curl('/slave(1)/state.json', None))
+
+ directory = None
+
+ for framework in state['frameworks']:
+ if framework['id'] == framework_id:
+ for executor in framework['executors']:
+ if executor['id'] == executor_id:
+ directory = executor['directory']
+ break
+ for completed_executor in framework['completed_executors']:
+ if completed_executor['id'] == executor_id:
+ directory = completed_executor['directory']
+ break
+
+ for completed_framework in state['completed_frameworks']:
+ if completed_framework['id'] == framework_id:
+ for completed_executor in completed_framework['completed_executors']:
+ if completed_executor['id'] == executor_id:
+ directory = completed_executor['directory']
+ break
+
+ if directory is None:
+ raise IOError('Task directory/file not found')
+
+ path = os.path.join(directory, file)
+
+ # Start streaming "pages" up to length.
+ PAGE_LENGTH = 1024
+ offset = 0
+
+ while True:
+ result = json.loads(self.curl(
+ '/files/read.json',
+ {'path': path,
+ 'offset': offset,
+ 'length': PAGE_LENGTH}))
+ if len(result['data']) == 0:
+ time.sleep(0.5)
+ continue
+ offset += len(result['data'])
+ yield result['data']
+
+
+def main():
+ # Parse options for this script.
+ parser = OptionParser()
+ parser.add_option('--master')
+ parser.add_option('--framework')
+ parser.add_option('--task')
+ parser.add_option('--file')
+ (options, args) = parser.parse_args(sys.argv)
+
+ if options.master is None:
+ print "Missing --master\n"
+ parser.print_help()
+ exit(-1)
+
+ if options.framework is None:
+ print "Missing --framework\n"
+ parser.print_help()
+ exit(-1)
+
+ if options.task is None:
+ print "Missing --task\n"
+ parser.print_help()
+ exit(-1)
+
+ if options.file is None:
+ print "Missing --file\n"
+ parser.print_help()
+ exit(-1)
+
+ url = 'http://' + resolve(options.master) + '/master/state.json'
+ file = urllib.urlopen(url)
+ state = json.loads(file.read())
+ file.close()
+
+ # Build a dict from slave ID to `slaves'.
+ slaves = {}
+ for slave in state['slaves']:
+ slaves[slave['id']] = Slave(slave)
+
+ target_task = None
+ target_slave = None
+
+ for framework in state['frameworks']:
+ if framework['id'] == options.framework:
+ for task in framework['tasks']:
+ if (task['id'] == options.task):
+ target_task = task
+ target_slave = slaves[task['slave_id']]
+ break
+ for completed_task in framework['completed_tasks']:
+ if (completed_task['id'] == options.task):
+ target_task = completed_task
+ target_slave = slaves[completed_task['slave_id']]
+ break
+
+ for completed_framework in state['completed_frameworks']:
+ if completed_framework['id'] == options.framework:
+ for completed_task in completed_framework['completed_tasks']:
+ if (completed_task['id'] == options.task):
+ target_task = completed_task
+ target_slave= slaves[completed_task['slave_id']]
+ break
+
+ for data in target_slave.tail(target_task, options.file):
+ sys.stdout.write(data)
+ sys.stdout.flush()
+
+ sys.stderr.write('No task found!\n')
+ sys.stderr.flush()
+ exit(-1)
+
+
+if __name__ == "__main__":
+ def signal_handler(signal, frame):
+ sys.stdout.write('\n')
+ sys.exit(130)
+
+ signal.signal(signal.SIGINT, signal_handler)
+
+ main()