You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/11/08 02:21:36 UTC

[3/4] git commit: Refactored mesos-cat to use helpers from cli.py.

Refactored mesos-cat to use helpers from cli.py.

Review: https://reviews.apache.org/r/15301


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a7dcb99b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a7dcb99b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a7dcb99b

Branch: refs/heads/master
Commit: a7dcb99bbd286a21a15ef95479db0671e117878d
Parents: 3901765
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Nov 6 20:12:38 2013 -1000
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Nov 7 15:18:10 2013 -1000

----------------------------------------------------------------------
 src/cli/mesos-cat | 286 +++++++++++++++++++++----------------------------
 1 file changed, 123 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a7dcb99b/src/cli/mesos-cat
----------------------------------------------------------------------
diff --git a/src/cli/mesos-cat b/src/cli/mesos-cat
index 6ea5d8a..bb1e197 100755
--- a/src/cli/mesos-cat
+++ b/src/cli/mesos-cat
@@ -1,111 +1,79 @@
 #!/usr/bin/env python
 
-import datetime
 import json
 import os
-import resource
-import subprocess
 import signal
 import sys
-import time
-import urllib
+import urllib2
 
+from contextlib import closing
 from optparse import OptionParser
+from urllib2 import HTTPError, urlopen
+
+from mesos import http
+from mesos.cli import *
+from mesos.futures import *
 
 
 if sys.version_info < (2,6,0):
-  sys.stderr.write('Expecting Python >= 2.6\n')
-  sys.exit(1)
-
-
-# Helper that uses 'mesos-resolve' to resolve the master's IP:port.
-def resolve(master):
-  process = subprocess.Popen(
-    ['mesos-resolve', master],
-    stdin=subprocess.PIPE,
-    stdout=subprocess.PIPE,
-    stderr=subprocess.PIPE,
-    shell=False)
-
-  status = process.wait()
-  if status != 0:
-    print "Failed to resolve 'mesos-resolve %s'\n" % master
-    print process.stderr.read()
+    sys.stderr.write('Expecting Python >= 2.6\n')
     sys.exit(1)
 
-  return process.stdout.read()
-
-
-class Slave:
-  def __init__(self, slave):
-    self.slave = slave
-
-  def hostname(self):
-    return self.slave['hostname']
-
-  def curl(self, path, query):
-    pid = self.slave['pid']
-    url = 'http://' + pid[len('slave(1)@'):] + path
-    if query is not None and len(query) > 0:
-      url += '?' + '&'.join(
-        ['%s=%s' % (urllib.quote(str(key)), urllib.quote(str(value))) for (key, value) in query.items()])
-
-    process = subprocess.Popen(
-      ['curl', '-sSfL', url],
-      stdin=None,
-      stdout=subprocess.PIPE,
-      stderr=subprocess.PIPE,
-      shell=False)
-
-    status = process.wait()
-    if status != 0:
-      print 'Failed to execute \'curl\':\n'
-      print process.stderr.read()
-      sys.exit(1)
-
-    result = process.stdout.read()
-    process.stdout.close()
-    process.stderr.close()
-    return result
 
-  def read(self, task, file):
+def read(slave, task, file):
     framework_id = task['framework_id']
     executor_id = task['executor_id']
-    if executor_id == "": executor_id = task['id']
 
-    # Get 'state.json' to get the executor directory.
-    state = json.loads(self.curl('/slave(1)/state.json', None))
+    # An executorless task has an empty executor ID in the master but
+    # uses the same executor ID as task ID in the slave.
+    if executor_id == '': executor_id = task['id']
+
+    # Get 'state.json' to determine the executor directory.
+    try:
+        state = http.get(slave['pid'], '/slave(1)/state.json')
+    except:
+        sys.stderr.write('Failed to get state from slave\n')
+        sys.exit(1)
 
     directory = None
 
     for framework in state['frameworks']:
-      if framework['id'] == framework_id:
-        for executor in framework['executors']:
-          if executor['id'] == executor_id:
-            directory = executor['directory']
-            break
-        for completed_executor in framework['completed_executors']:
-          if completed_executor['id'] == executor_id:
-            directory = completed_executor['directory']
-            break
-
-    for completed_framework in state['completed_frameworks']:
-      if completed_framework['id'] == framework_id:
-        for completed_executor in completed_framework['completed_executors']:
-          if completed_executor['id'] == executor_id:
-            directory = completed_executor['directory']
-            break
+        if framework['id'] == framework_id:
+            for executor in framework['executors']:
+                if executor['id'] == executor_id:
+                    directory = executor['directory']
+                    break
+            for executor in framework['completed_executors']:
+                if executor['id'] == executor_id:
+                    directory = executor['directory']
+                    break
+
+    for framework in state['completed_frameworks']:
+        if framework['id'] == framework_id:
+            for executor in framework['completed_executors']:
+                if executor['id'] == executor_id:
+                    directory = executor['directory']
+                    break
 
     if directory is None:
-      raise IOError('Task directory/file not found')
+        sys.stderr.write('File not found\n')
+        sys.exit(1)
 
     path = os.path.join(directory, file)
 
     # Determine the current length of the file.
-    result = json.loads(self.curl(
-        '/files/read.json',
-        {'path': path,
-         'offset': -1}))
+    try:
+        result = http.get(
+            slave['pid'],
+            '/files/read.json',
+            {'path': path,
+             'offset': -1})
+    except HTTPError as error:
+        if error.code == 404:
+            sys.stderr.write('No such file or directory\n')
+        else:
+            sys.stderr.write('Failed to determine length of file\n')
+        sys.exit(1)
 
     length = result['offset']
 
@@ -114,91 +82,83 @@ class Slave:
     offset = 0
 
     while True:
-      result = json.loads(self.curl(
-          '/files/read.json',
-          {'path': path,
-           'offset': offset,
-           'length': PAGE_LENGTH}))
-      offset += len(result['data'])
-      yield result['data']
-      if offset == length:
-        return
+        try:
+            result = http.get(
+                slave['pid'],
+                '/files/read.json',
+                {'path': path,
+                 'offset': offset,
+                 'length': PAGE_LENGTH})
+            offset += len(result['data'])
+            yield result['data']
+            if offset == length:
+                return
+        except:
+            sys.stderr.write('Failed to read file from slave\n')
+            sys.exit(1)
 
 
 def main():
-  # Parse options for this script.
-  parser = OptionParser()
-  parser.add_option('--master')
-  parser.add_option('--framework')
-  parser.add_option('--task')
-  parser.add_option('--file')
-  (options, args) = parser.parse_args(sys.argv)
-
-  if options.master is None:
-    print "Missing --master\n"
-    parser.print_help()
-    exit(-1)
-
-  if options.framework is None:
-    print "Missing --framework\n"
-    parser.print_help()
-    exit(-1)
-
-  if options.task is None:
-    print "Missing --task\n"
-    parser.print_help()
-    exit(-1)
-
-  if options.file is None:
-    print "Missing --file\n"
-    parser.print_help()
-    exit(-1)
-
-  url = 'http://' + resolve(options.master) + '/master/state.json'
-  file = urllib.urlopen(url)
-  state = json.loads(file.read())
-  file.close()
-
-  # Build a dict from slave ID to `slaves'.
-  slaves = {}
-  for slave in state['slaves']:
-    slaves[slave['id']] = Slave(slave)
-
-  target_task = None
-  target_slave = None
-
-  for framework in state['frameworks']:
-    if framework['id'] == options.framework:
-      for task in framework['tasks']:
-        if (task['id'] == options.task):
-          target_task = task
-          target_slave = slaves[task['slave_id']]
-          break
-      for completed_task in framework['completed_tasks']:
-        if (completed_task['id'] == options.task):
-          target_task = completed_task
-          target_slave = slaves[completed_task['slave_id']]
-          break
-
-  for completed_framework in state['completed_frameworks']:
-    if completed_framework['id'] == options.framework:
-      for completed_task in completed_framework['completed_tasks']:
-        if (completed_task['id'] == options.task):
-          target_task = completed_task
-          target_slave= slaves[completed_task['slave_id']]
-          break
-
-  for data in target_slave.read(target_task, options.file):
-    sys.stdout.write(data)
-    sys.stdout.flush()
-    exit(0)
-
-  sys.stderr.write('No task found!\n')
-  sys.stderr.flush()
-  exit(-1)
-
-
-if __name__ == "__main__":
+    # Parse options for this script.
+    parser = OptionParser()
+    parser.add_option('--master')
+    parser.add_option('--framework')
+    parser.add_option('--task')
+    parser.add_option('--file')
+    (options, args) = parser.parse_args(sys.argv)
+
+    if options.master is None:
+        usage('Missing --master', parser)
+
+    if options.framework is None:
+        usage('Missing --framework', parser)
+
+    if options.task is None:
+        usage('Missing --task', parser)
+
+    if options.file is None:
+        usage('Missing --file', parser)
+
+    # Get the master's state.
+    try:
+        state = http.get(resolve(options.master), '/master/state.json')
+    except:
+        sys.stderr.write('Failed to get the master state\n')
+        sys.exit(1)
+
+    # Build a dict from slave ID to slaves.
+    slaves = {}
+    for slave in state['slaves']:
+        slaves[slave['id']] = slave
+
+    def cat(slave, task):
+        for data in read(slave, task, options.file):
+            sys.stdout.write(data)
+
+    for framework in state['frameworks']:
+        if framework['id'] == options.framework:
+            for task in framework['tasks']:
+                if task['id'] == options.task:
+                    cat(slaves[task['slave_id']], task)
+                    sys.exit(0)
+
+            for task in framework['completed_tasks']:
+                if task['id'] == options.task:
+                    cat(slaves[task['slave_id']], task)
+                    sys.exit(0)
+
+    for framework in state['completed_frameworks']:
+        if framework['id'] == options.framework:
+            for task in framework['completed_tasks']:
+                if task['id'] == options.task:
+                    cat(slaves[task['slave_id']], task)
+                    sys.exit(0)
+
+    sys.stderr.write('No task found!\n')
+    sys.exit(-1)
+
+
+if __name__ == '__main__':
   def signal_handler(signal, frame):
     sys.stdout.write('\n')
     sys.exit(130)