You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/11/08 02:21:36 UTC
[3/4] git commit: Refactored mesos-cat to use helpers from cli.py.
Refactored mesos-cat to use helpers from cli.py.
Review: https://reviews.apache.org/r/15301
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a7dcb99b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a7dcb99b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a7dcb99b
Branch: refs/heads/master
Commit: a7dcb99bbd286a21a15ef95479db0671e117878d
Parents: 3901765
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Nov 6 20:12:38 2013 -1000
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Nov 7 15:18:10 2013 -1000
----------------------------------------------------------------------
src/cli/mesos-cat | 286 +++++++++++++++++++++----------------------------
1 file changed, 123 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a7dcb99b/src/cli/mesos-cat
----------------------------------------------------------------------
diff --git a/src/cli/mesos-cat b/src/cli/mesos-cat
index 6ea5d8a..bb1e197 100755
--- a/src/cli/mesos-cat
+++ b/src/cli/mesos-cat
@@ -1,111 +1,79 @@
#!/usr/bin/env python
-import datetime
import json
import os
-import resource
-import subprocess
import signal
import sys
-import time
-import urllib
+import urllib2
+from contextlib import closing
from optparse import OptionParser
+from urllib2 import HTTPError, urlopen
+
+from mesos import http
+from mesos.cli import *
+from mesos.futures import *
if sys.version_info < (2,6,0):
- sys.stderr.write('Expecting Python >= 2.6\n')
- sys.exit(1)
-
-
-# Helper that uses 'mesos-resolve' to resolve the master's IP:port.
-def resolve(master):
- process = subprocess.Popen(
- ['mesos-resolve', master],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- shell=False)
-
- status = process.wait()
- if status != 0:
- print "Failed to resolve 'mesos-resolve %s'\n" % master
- print process.stderr.read()
+ sys.stderr.write('Expecting Python >= 2.6\n')
sys.exit(1)
- return process.stdout.read()
-
-
-class Slave:
- def __init__(self, slave):
- self.slave = slave
-
- def hostname(self):
- return self.slave['hostname']
-
- def curl(self, path, query):
- pid = self.slave['pid']
- url = 'http://' + pid[len('slave(1)@'):] + path
- if query is not None and len(query) > 0:
- url += '?' + '&'.join(
- ['%s=%s' % (urllib.quote(str(key)), urllib.quote(str(value))) for (key, value) in query.items()])
-
- process = subprocess.Popen(
- ['curl', '-sSfL', url],
- stdin=None,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- shell=False)
-
- status = process.wait()
- if status != 0:
- print 'Failed to execute \'curl\':\n'
- print process.stderr.read()
- sys.exit(1)
-
- result = process.stdout.read()
- process.stdout.close()
- process.stderr.close()
- return result
- def read(self, task, file):
+def read(slave, task, file):
framework_id = task['framework_id']
executor_id = task['executor_id']
- if executor_id == "": executor_id = task['id']
- # Get 'state.json' to get the executor directory.
- state = json.loads(self.curl('/slave(1)/state.json', None))
+ # An executorless task has an empty executor ID in the master but
+ # uses the same executor ID as task ID in the slave.
+ if executor_id == '': executor_id = task['id']
+
+ # Get 'state.json' to determine the executor directory.
+ try:
+ state = http.get(slave['pid'], '/slave(1)/state.json')
+ except:
+ sys.stderr.write('Failed to get state from slave\n')
+ sys.exit(1)
directory = None
for framework in state['frameworks']:
- if framework['id'] == framework_id:
- for executor in framework['executors']:
- if executor['id'] == executor_id:
- directory = executor['directory']
- break
- for completed_executor in framework['completed_executors']:
- if completed_executor['id'] == executor_id:
- directory = completed_executor['directory']
- break
-
- for completed_framework in state['completed_frameworks']:
- if completed_framework['id'] == framework_id:
- for completed_executor in completed_framework['completed_executors']:
- if completed_executor['id'] == executor_id:
- directory = completed_executor['directory']
- break
+ if framework['id'] == framework_id:
+ for executor in framework['executors']:
+ if executor['id'] == executor_id:
+ directory = executor['directory']
+ break
+ for executor in framework['completed_executors']:
+ if executor['id'] == executor_id:
+ directory = executor['directory']
+ break
+
+ for framework in state['completed_frameworks']:
+ if framework['id'] == framework_id:
+ for executor in framework['completed_executors']:
+ if executor['id'] == executor_id:
+ directory = executor['directory']
+ break
if directory is None:
- raise IOError('Task directory/file not found')
+ sys.stderr.write('File not found\n')
+ sys.exit(1)
path = os.path.join(directory, file)
# Determine the current length of the file.
- result = json.loads(self.curl(
- '/files/read.json',
- {'path': path,
- 'offset': -1}))
+ try:
+ result = http.get(
+ slave['pid'],
+ '/files/read.json',
+ {'path': path,
+ 'offset': -1})
+ except HTTPError as error:
+ if error.code == 404:
+ sys.stderr.write('No such file or directory\n')
+ else:
+ sys.stderr.write('Failed to determine length of file\n')
+ sys.exit(1)
length = result['offset']
@@ -114,91 +82,83 @@ class Slave:
offset = 0
while True:
- result = json.loads(self.curl(
- '/files/read.json',
- {'path': path,
- 'offset': offset,
- 'length': PAGE_LENGTH}))
- offset += len(result['data'])
- yield result['data']
- if offset == length:
- return
+ try:
+ result = http.get(
+ slave['pid'],
+ '/files/read.json',
+ {'path': path,
+ 'offset': offset,
+ 'length': PAGE_LENGTH})
+ offset += len(result['data'])
+ yield result['data']
+ if offset == length:
+ return
+ except:
+ sys.stderr.write('Failed to read file from slave\n')
+ sys.exit(1)
def main():
- # Parse options for this script.
- parser = OptionParser()
- parser.add_option('--master')
- parser.add_option('--framework')
- parser.add_option('--task')
- parser.add_option('--file')
- (options, args) = parser.parse_args(sys.argv)
-
- if options.master is None:
- print "Missing --master\n"
- parser.print_help()
- exit(-1)
-
- if options.framework is None:
- print "Missing --framework\n"
- parser.print_help()
- exit(-1)
-
- if options.task is None:
- print "Missing --task\n"
- parser.print_help()
- exit(-1)
-
- if options.file is None:
- print "Missing --file\n"
- parser.print_help()
- exit(-1)
-
- url = 'http://' + resolve(options.master) + '/master/state.json'
- file = urllib.urlopen(url)
- state = json.loads(file.read())
- file.close()
-
- # Build a dict from slave ID to `slaves'.
- slaves = {}
- for slave in state['slaves']:
- slaves[slave['id']] = Slave(slave)
-
- target_task = None
- target_slave = None
-
- for framework in state['frameworks']:
- if framework['id'] == options.framework:
- for task in framework['tasks']:
- if (task['id'] == options.task):
- target_task = task
- target_slave = slaves[task['slave_id']]
- break
- for completed_task in framework['completed_tasks']:
- if (completed_task['id'] == options.task):
- target_task = completed_task
- target_slave = slaves[completed_task['slave_id']]
- break
-
- for completed_framework in state['completed_frameworks']:
- if completed_framework['id'] == options.framework:
- for completed_task in completed_framework['completed_tasks']:
- if (completed_task['id'] == options.task):
- target_task = completed_task
- target_slave= slaves[completed_task['slave_id']]
- break
-
- for data in target_slave.read(target_task, options.file):
- sys.stdout.write(data)
- sys.stdout.flush()
- exit(0)
-
- sys.stderr.write('No task found!\n')
- sys.stderr.flush()
- exit(-1)
-
-
-if __name__ == "__main__":
+ # Parse options for this script.
+ parser = OptionParser()
+ parser.add_option('--master')
+ parser.add_option('--framework')
+ parser.add_option('--task')
+ parser.add_option('--file')
+ (options, args) = parser.parse_args(sys.argv)
+
+ if options.master is None:
+ usage('Missing --master', parser)
+
+ if options.framework is None:
+ usage('Missing --framework', parser)
+
+ if options.task is None:
+ usage('Missing --task', parser)
+
+ if options.file is None:
+ usage('Missing --file', parser)
+
+ # Get the master's state.
+ try:
+ state = http.get(resolve(options.master), '/master/state.json')
+ except:
+ sys.stderr.write('Failed to get the master state\n')
+ sys.exit(1)
+
+ # Build a dict from slave ID to slaves.
+ slaves = {}
+ for slave in state['slaves']:
+ slaves[slave['id']] = slave
+
+ def cat(slave, task):
+ for data in read(slave, task, options.file):
+ sys.stdout.write(data)
+
+ for framework in state['frameworks']:
+ if framework['id'] == options.framework:
+ for task in framework['tasks']:
+ if task['id'] == options.task:
+ cat(slaves[task['slave_id']], task)
+ sys.exit(0)
+
+ for task in framework['completed_tasks']:
+ if task['id'] == options.task:
+ cat(slaves[task['slave_id']], task)
+ sys.exit(0)
+
+ for framework in state['completed_frameworks']:
+ if framework['id'] == options.framework:
+ for task in framework['completed_tasks']:
+ if task['id'] == options.task:
+ cat(slaves[task['slave_id']], task)
+ sys.exit(0)
+
+ sys.stderr.write('No task found!\n')
+ sys.exit(-1)
+
+
+if __name__ == '__main__':
def signal_handler(signal, frame):
sys.stdout.write('\n')
sys.exit(130)