You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/11/06 22:08:07 UTC

git commit: Added mesos-cat and mesos-tail.

Updated Branches:
  refs/heads/master bb8a627c7 -> 847f6cf43


Added mesos-cat and mesos-tail.

These are actually slightly improved version of the original shared by
Ben.H at mesos hackathon on Oct.25th 2013.

Modifications I did:
- add python version check
- add quote for query contents
- support to inspect files on completed frameworks/tasks

From: Shingo Omura <ev...@gmail.com>
Review: https://reviews.apache.org/r/14951


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/847f6cf4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/847f6cf4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/847f6cf4

Branch: refs/heads/master
Commit: 847f6cf43a8517908498d141aba34dc1b013fd0d
Parents: bb8a627
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Nov 6 11:01:35 2013 -1000
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Nov 6 11:04:00 2013 -1000

----------------------------------------------------------------------
 src/cli/mesos-cat  | 208 ++++++++++++++++++++++++++++++++++++++++++++++++
 src/cli/mesos-tail | 200 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 408 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/847f6cf4/src/cli/mesos-cat
----------------------------------------------------------------------
diff --git a/src/cli/mesos-cat b/src/cli/mesos-cat
new file mode 100755
index 0000000..6ea5d8a
--- /dev/null
+++ b/src/cli/mesos-cat
@@ -0,0 +1,208 @@
+#!/usr/bin/env python
+
+import datetime
+import json
+import os
+import resource
+import subprocess
+import signal
+import sys
+import time
+import urllib
+
+from optparse import OptionParser
+
+
+if sys.version_info < (2,6,0):
+  sys.stderr.write('Expecting Python >= 2.6\n')
+  sys.exit(1)
+
+
+# Helper that uses 'mesos-resolve' to resolve the master's IP:port.
+def resolve(master):
+  process = subprocess.Popen(
+    ['mesos-resolve', master],
+    stdin=subprocess.PIPE,
+    stdout=subprocess.PIPE,
+    stderr=subprocess.PIPE,
+    shell=False)
+
+  status = process.wait()
+  if status != 0:
+    print "Failed to resolve 'mesos-resolve %s'\n" % master
+    print process.stderr.read()
+    sys.exit(1)
+
+  return process.stdout.read()
+
+
+class Slave:
+  def __init__(self, slave):
+    self.slave = slave
+
+  def hostname(self):
+    return self.slave['hostname']
+
+  def curl(self, path, query):
+    pid = self.slave['pid']
+    url = 'http://' + pid[len('slave(1)@'):] + path
+    if query is not None and len(query) > 0:
+      url += '?' + '&'.join(
+        ['%s=%s' % (urllib.quote(str(key)), urllib.quote(str(value))) for (key, value) in query.items()])
+
+    process = subprocess.Popen(
+      ['curl', '-sSfL', url],
+      stdin=None,
+      stdout=subprocess.PIPE,
+      stderr=subprocess.PIPE,
+      shell=False)
+
+    status = process.wait()
+    if status != 0:
+      print 'Failed to execute \'curl\':\n'
+      print process.stderr.read()
+      sys.exit(1)
+
+    result = process.stdout.read()
+    process.stdout.close()
+    process.stderr.close()
+    return result
+
+  def read(self, task, file):
+    framework_id = task['framework_id']
+    executor_id = task['executor_id']
+    if executor_id == "": executor_id = task['id']
+
+    # Get 'state.json' to get the executor directory.
+    state = json.loads(self.curl('/slave(1)/state.json', None))
+
+    directory = None
+
+    for framework in state['frameworks']:
+      if framework['id'] == framework_id:
+        for executor in framework['executors']:
+          if executor['id'] == executor_id:
+            directory = executor['directory']
+            break
+        for completed_executor in framework['completed_executors']:
+          if completed_executor['id'] == executor_id:
+            directory = completed_executor['directory']
+            break
+
+    for completed_framework in state['completed_frameworks']:
+      if completed_framework['id'] == framework_id:
+        for completed_executor in completed_framework['completed_executors']:
+          if completed_executor['id'] == executor_id:
+            directory = completed_executor['directory']
+            break
+
+    if directory is None:
+      raise IOError('Task directory/file not found')
+
+    path = os.path.join(directory, file)
+
+    # Determine the current length of the file.
+    result = json.loads(self.curl(
+        '/files/read.json',
+        {'path': path,
+         'offset': -1}))
+
+    length = result['offset']
+
+    # Start streaming "pages" up to length.
+    PAGE_LENGTH = 1024
+    offset = 0
+
+    while True:
+      result = json.loads(self.curl(
+          '/files/read.json',
+          {'path': path,
+           'offset': offset,
+           'length': PAGE_LENGTH}))
+      offset += len(result['data'])
+      yield result['data']
+      if offset == length:
+        return
+
+
+def main():
+  # Parse options for this script.
+  parser = OptionParser()
+  parser.add_option('--master')
+  parser.add_option('--framework')
+  parser.add_option('--task')
+  parser.add_option('--file')
+  (options, args) = parser.parse_args(sys.argv)
+
+  if options.master is None:
+    print "Missing --master\n"
+    parser.print_help()
+    exit(-1)
+
+  if options.framework is None:
+    print "Missing --framework\n"
+    parser.print_help()
+    exit(-1)
+
+  if options.task is None:
+    print "Missing --task\n"
+    parser.print_help()
+    exit(-1)
+
+  if options.file is None:
+    print "Missing --file\n"
+    parser.print_help()
+    exit(-1)
+
+  url = 'http://' + resolve(options.master) + '/master/state.json'
+  file = urllib.urlopen(url)
+  state = json.loads(file.read())
+  file.close()
+
+  # Build a dict from slave ID to `slaves'.
+  slaves = {}
+  for slave in state['slaves']:
+    slaves[slave['id']] = Slave(slave)
+
+  target_task = None
+  target_slave = None
+
+  for framework in state['frameworks']:
+    if framework['id'] == options.framework:
+      for task in framework['tasks']:
+        if (task['id'] == options.task):
+          target_task = task
+          target_slave = slaves[task['slave_id']]
+          break
+      for completed_task in framework['completed_tasks']:
+        if (completed_task['id'] == options.task):
+          target_task = completed_task
+          target_slave = slaves[completed_task['slave_id']]
+          break
+
+  for completed_framework in state['completed_frameworks']:
+    if completed_framework['id'] == options.framework:
+      for completed_task in completed_framework['completed_tasks']:
+        if (completed_task['id'] == options.task):
+          target_task = completed_task
+          target_slave= slaves[completed_task['slave_id']]
+          break
+
+  for data in target_slave.read(target_task, options.file):
+    sys.stdout.write(data)
+    sys.stdout.flush()
+    exit(0)
+
+  sys.stderr.write('No task found!\n')
+  sys.stderr.flush()
+  exit(-1)
+
+
+if __name__ == "__main__":
+  def signal_handler(signal, frame):
+    sys.stdout.write('\n')
+    sys.exit(130)
+
+  signal.signal(signal.SIGINT, signal_handler)
+
+  main()

http://git-wip-us.apache.org/repos/asf/mesos/blob/847f6cf4/src/cli/mesos-tail
----------------------------------------------------------------------
diff --git a/src/cli/mesos-tail b/src/cli/mesos-tail
new file mode 100755
index 0000000..f4f7c75
--- /dev/null
+++ b/src/cli/mesos-tail
@@ -0,0 +1,200 @@
+#!/usr/bin/env python
+
+import datetime
+import json
+import os
+import resource
+import signal
+import subprocess
+import sys
+import time
+import urllib
+
+from optparse import OptionParser
+
+
+if sys.version_info < (2,6,0):
+  sys.stderr.write('Expecting Python >= 2.6\n')
+  sys.exit(1)
+
+
+# Helper that uses 'mesos-resolve' to resolve the master's IP:port.
+def resolve(master):
+  process = subprocess.Popen(
+    ['mesos-resolve', master],
+    stdin=subprocess.PIPE,
+    stdout=subprocess.PIPE,
+    stderr=subprocess.PIPE,
+    shell=False)
+
+  status = process.wait()
+  if status != 0:
+    print "Failed to resolve 'mesos-resolve %s'\n" % master
+    print process.stderr.read()
+    sys.exit(1)
+
+  return process.stdout.read()
+
+
+class Slave:
+  def __init__(self, slave):
+    self.slave = slave
+
+  def hostname(self):
+    return self.slave['hostname']
+
+  def curl(self, path, query):
+    pid = self.slave['pid']
+    url = 'http://' + pid[len('slave(1)@'):] + path
+    if query is not None and len(query) > 0:
+      url += '?' + '&'.join(
+        ['%s=%s' % (urllib.quote(str(key)), urllib.quote(str(value))) for (key, value) in query.items()])
+
+    process = subprocess.Popen(
+      ['curl', '-sSfL', url],
+      stdin=None,
+      stdout=subprocess.PIPE,
+      stderr=subprocess.PIPE,
+      shell=False)
+
+    status = process.wait()
+    if status != 0:
+      print 'Failed to execute \'curl\':\n'
+      print process.stderr.read()
+      sys.exit(1)
+
+    result = process.stdout.read()
+    process.stdout.close()
+    process.stderr.close()
+    return result
+
+  def tail(self, task, file):
+    framework_id = task['framework_id']
+    executor_id = task['executor_id']
+    if executor_id == "": executor_id = task['id']
+
+    # Get 'state.json' to get the executor directory.
+    state = json.loads(self.curl('/slave(1)/state.json', None))
+
+    directory = None
+
+    for framework in state['frameworks']:
+       if framework['id'] == framework_id:
+         for executor in framework['executors']:
+           if executor['id'] == executor_id:
+             directory = executor['directory']
+             break
+         for completed_executor in framework['completed_executors']:
+           if completed_executor['id'] == executor_id:
+             directory = completed_executor['directory']
+             break
+
+    for completed_framework in state['completed_frameworks']:
+      if completed_framework['id'] == framework_id:
+        for completed_executor in completed_framework['completed_executors']:
+          if completed_executor['id'] == executor_id:
+            directory = completed_executor['directory']
+            break
+
+    if directory is None:
+      raise IOError('Task directory/file not found')
+
+    path = os.path.join(directory, file)
+
+    # Start streaming "pages" up to length.
+    PAGE_LENGTH = 1024
+    offset = 0
+
+    while True:
+      result = json.loads(self.curl(
+          '/files/read.json',
+          {'path': path,
+           'offset': offset,
+           'length': PAGE_LENGTH}))
+      if len(result['data']) == 0:
+        time.sleep(0.5)
+        continue
+      offset += len(result['data'])
+      yield result['data']
+
+
+def main():
+  # Parse options for this script.
+  parser = OptionParser()
+  parser.add_option('--master')
+  parser.add_option('--framework')
+  parser.add_option('--task')
+  parser.add_option('--file')
+  (options, args) = parser.parse_args(sys.argv)
+
+  if options.master is None:
+    print "Missing --master\n"
+    parser.print_help()
+    exit(-1)
+
+  if options.framework is None:
+    print "Missing --framework\n"
+    parser.print_help()
+    exit(-1)
+
+  if options.task is None:
+    print "Missing --task\n"
+    parser.print_help()
+    exit(-1)
+
+  if options.file is None:
+    print "Missing --file\n"
+    parser.print_help()
+    exit(-1)
+
+  url = 'http://' + resolve(options.master) + '/master/state.json'
+  file = urllib.urlopen(url)
+  state = json.loads(file.read())
+  file.close()
+
+  # Build a dict from slave ID to `slaves'.
+  slaves = {}
+  for slave in state['slaves']:
+    slaves[slave['id']] = Slave(slave)
+
+  target_task = None
+  target_slave = None
+
+  for framework in state['frameworks']:
+    if framework['id'] == options.framework:
+      for task in framework['tasks']:
+        if (task['id'] == options.task):
+          target_task = task
+          target_slave = slaves[task['slave_id']]
+          break
+      for completed_task in framework['completed_tasks']:
+        if (completed_task['id'] == options.task):
+          target_task = completed_task
+          target_slave = slaves[completed_task['slave_id']]
+          break
+
+  for completed_framework in state['completed_frameworks']:
+    if completed_framework['id'] == options.framework:
+      for completed_task in completed_framework['completed_tasks']:
+        if (completed_task['id'] == options.task):
+          target_task = completed_task
+          target_slave= slaves[completed_task['slave_id']]
+          break
+
+  for data in target_slave.tail(target_task, options.file):
+    sys.stdout.write(data)
+    sys.stdout.flush()
+
+  sys.stderr.write('No task found!\n')
+  sys.stderr.flush()
+  exit(-1)
+
+
+if __name__ == "__main__":
+  def signal_handler(signal, frame):
+    sys.stdout.write('\n')
+    sys.exit(130)
+
+  signal.signal(signal.SIGINT, signal_handler)
+
+  main()