You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/11/06 21:44:34 UTC

git commit: Refactored implementation and organization of Python CLI tools.

Updated Branches:
  refs/heads/master ffe53a16e -> bb8a627c7


Refactored implementation and organization of Python CLI tools.

The initial Python CLI 'mesos-ps' tool used curl in order to make
concurrent HTTP requests. This was a giant hack (one which even
required getting the maximum open file descriptor limit to not
fork/exec too many curl subprocesses). Instead of using curl we know
use futures and threads. The choice to use futures was (a) because it
naturally abstracts what we're trying to do and (b) it's "pro style"
for Python > 3.x. For now, we've implemented the futures interface
minimally to accomplish what we need (see
src/cli/python/mesos/futures.py). In addition, we updated the way
mesos-ps was implemented to leverage the futures API.

To enable sharing of CLI utilities written in Python we've also added
a mechanism for installing Python modules. This will likely change in
the future to use PIP but for now we manually update PYTHONPATH (see
bin/mesos.sh.in and src/cli/mesos.cpp).

Finally, all of the CLI specific Python code has been updated to use
the standard four space indent rather than two.

Review: https://reviews.apache.org/r/15260


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bb8a627c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bb8a627c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bb8a627c

Branch: refs/heads/master
Commit: bb8a627c767d09c53811e507aadf18e24429b21e
Parents: ffe53a1
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Nov 5 15:58:53 2013 -1000
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Nov 6 10:41:50 2013 -1000

----------------------------------------------------------------------
 bin/mesos.sh.in                  |   7 +
 src/Makefile.am                  |   9 +
 src/cli/mesos-ps                 | 361 ++++++++++++++++------------------
 src/cli/mesos.cpp                |   6 +
 src/cli/python/mesos/__init__.py |   0
 src/cli/python/mesos/cli.py      |  24 +++
 src/cli/python/mesos/futures.py  | 169 ++++++++++++++++
 7 files changed, 383 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bb8a627c/bin/mesos.sh.in
----------------------------------------------------------------------
diff --git a/bin/mesos.sh.in b/bin/mesos.sh.in
index 3d03ee4..5cbeac4 100644
--- a/bin/mesos.sh.in
+++ b/bin/mesos.sh.in
@@ -27,4 +27,11 @@ PATH=@abs_top_builddir@/src:${PATH}
 
 export PATH
 
+# Add 'src/cli/python' to PYTHONPATH.
+# TODO(benh): Remove this if/when we install the 'mesos' module via
+# PIP and setuptools.
+PYTHONPATH=@abs_top_srcdir@/src/cli/python:${PYTHONPATH}
+
+export PYTHONPATH
+
 exec @abs_top_builddir@/src/mesos "${@}"

http://git-wip-us.apache.org/repos/asf/mesos/blob/bb8a627c/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 9780d07..a73d6e1 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -379,6 +379,15 @@ mesos_resolve_LDADD = libmesos.la
 dist_bin_SCRIPTS +=							\
   cli/mesos-ps
 
+# Also install the supporting scripts for the Python based CLI tools.
+# TODO(benh): Use PIP and python/setup.py to do this correctly.
+mesospythonpkglibexecdir = $(pkglibexecdir)/python/mesos
+
+dist_mesospythonpkglibexec_SCRIPTS =					\
+  cli/python/mesos/__init__.py						\
+  cli/python/mesos/cli.py						\
+  cli/python/mesos/futures.py
+
 # Need to distribute/install webui javascript. We use 'pkgdatadir'
 # instead of 'datadir' as the install directory so we get the the
 # package name (i.e., 'mesos') as part of the path (i.e.,

http://git-wip-us.apache.org/repos/asf/mesos/blob/bb8a627c/src/cli/mesos-ps
----------------------------------------------------------------------
diff --git a/src/cli/mesos-ps b/src/cli/mesos-ps
index b7dca14..0d1694e 100755
--- a/src/cli/mesos-ps
+++ b/src/cli/mesos-ps
@@ -2,17 +2,21 @@
 
 import datetime
 import json
-import resource
-import subprocess
+import signal
 import sys
-import time
-import urllib
+import urllib2
 
+from contextlib import closing
 from optparse import OptionParser
 
+from mesos.cli import *
+from mesos.futures import *
+
+
 if sys.version_info < (2,6,0):
-  sys.stderr.write("You need python 2.6 or later to run\n")
-  sys.exit(1)
+    sys.stderr.write('Expecting Python >= 2.6\n')
+    sys.exit(1)
+
 
 USER_COLUMN_WIDTH = 4
 FRAMEWORK_COLUMN_WIDTH = 4
@@ -22,212 +26,183 @@ MEM_COLUMN_WIDTH = 8
 TIME_COLUMN_WIDTH = 14
 
 
-# Helper that uses 'mesos-resolve' to resolve the master's IP:port.
-def resolve(master):
-  process = subprocess.Popen(
-    ['mesos-resolve', master],
-    stdin=None,
-    stdout=subprocess.PIPE,
-    stderr=subprocess.PIPE,
-    shell=False)
+# Defines the column structure for printing to the terminal.
+class Column:
+    def __init__(self, title, padding):
+        self.title = title
+        self.padding = padding
 
-  status = process.wait()
-  if status != 0:
-    print 'Failed to execute \'mesos-resolve %s\':\n' % master
-    print process.stderr.read()
-    sys.exit(1)
+    def width(self):
+        return len(self.title) + self.padding
 
-  result = process.stdout.read()
-  process.stdout.close()
-  process.stderr.close()
-  return result
-
-
-# Helper to determine the number of open file descriptors specified by
-# ulimit (since resource.RLIMIT_NOFILE is not always accurate).
-def ulimit(args):
-  command = args if isinstance(args, list) else [ args ]
-  command.insert(0, 'ulimit')
-  process = subprocess.Popen(
-    command,
-    stdin=None,
-    stdout=subprocess.PIPE,
-    stderr=subprocess.PIPE,
-    shell=False)
-
-  status = process.wait()
-  if status != 0:
-    print 'Failed to execute \'ulimit %s\':\n' % ' '.join(command)
-    print process.stderr.read()
-    sys.exit(1)
+    def truncate(self, text):
+        if text is None:
+            return ' ' * self.width()
 
-  result = process.stdout.read()
-  process.stdout.close()
-  process.stderr.close()
-  return result
-
-class Slave:
-  def __init__(self, slave):
-    self.slave = slave
-    self.process = None
-    self.statistics = None
-
-  def hostname(self):
-    return self.slave['hostname']
-
-  def curl(self):
-    if self.process is None:
-      pid = self.slave['pid']
-      url = 'http://' + pid[len('slave(1)@'):] + '/monitor/statistics.json'
-      self.process = subprocess.Popen(
-        ['curl', '-s', url],
-        stdin=None,
-        stdout=subprocess.PIPE,
-        stderr=subprocess.PIPE,
-        shell=False)
-
-  def load(self):
-    if self.process is None:
-      self.curl()
-    if self.statistics is None:
-      status = self.process.wait()
-      if status != 0:
-        print 'Failed to execute \'curl\':\n'
-        print self.process.stderr.read()
-        sys.exit(1)
-      self.statistics = json.loads(self.process.stdout.read())
-      self.process.stdout.close()
-      self.process.stderr.close()
-
-  def cpus_time_secs(self, task):
-    self.load()
-    framework_id = task['framework_id']
-    executor_id = task['executor_id']
-    if executor_id == "": executor_id = task['id']
-    for i in range(len(self.statistics)):
-      entry = self.statistics[i]
-      if entry['framework_id'] == framework_id and \
-         entry['executor_id'] == executor_id:
-        return entry['statistics']['cpus_system_time_secs'] + \
-               entry['statistics']['cpus_user_time_secs']
-    return None
+        text = str(text)
+
+        # If 'text' is less than the width then add spaces.
+        # Otherwise, abbreviate and add a space.
+        if len(text) < self.width():
+            spaces = ' ' * (self.width() - len(text))
+            text += spaces
+        else:
+            text = text[:self.width() - 4]
+            text += '... '
+        return text
+
+
+# Helper for formatting the MEM column for a task.
+def mem(task, statistics):
+    if statistics is None:
+        return None
 
-  def mem_rss_bytes(self, task):
-    self.load()
     framework_id = task['framework_id']
     executor_id = task['executor_id']
+
+    # An executorless task has an empty executor ID in the master but
+    # uses the same executor ID as task ID in the slave.
     if executor_id == "": executor_id = task['id']
-    for i in range(len(self.statistics)):
-      entry = self.statistics[i]
-      if entry['framework_id'] == framework_id and \
-         entry['executor_id'] == executor_id:
-        return entry['statistics']['mem_rss_bytes']
+
+    mem_rss_bytes = None
+    for entry in statistics:
+        if (entry['framework_id'] == framework_id and
+            entry['executor_id'] == executor_id):
+            mem_rss_bytes = entry['statistics']['mem_rss_bytes']
+            break
+
+    if mem_rss_bytes is not None:
+        MB = 1024.0 * 1024.0
+        return "{0:.1f} MB".format(mem_rss_bytes / MB)
+
     return None
 
 
-# Define the columns.
-class Column:
-  def __init__(self, title, padding):
-    self.title = title
-    self.padding = padding
+# Helper for formatting the TIME column for a task.
+def time(task, statistics):
+    if statistics is None:
+        return None
+
+    framework_id = task['framework_id']
+    executor_id = task['executor_id']
 
-  def width(self):
-    return len(self.title) + self.padding
+    # An executorless task has an empty executor ID in the master but
+    # uses the same executor ID as task ID in the slave.
+    if executor_id == "": executor_id = task['id']
 
-  def format(self, text):
-    if text is None:
-      return ' ' * self.width()
+    cpus_time_secs = None
+    for entry in statistics:
+        if (entry['framework_id'] == framework_id and
+            entry['executor_id'] == executor_id):
+            cpus_time_secs = (entry['statistics']['cpus_system_time_secs'] +
+                              entry['statistics']['cpus_user_time_secs'])
+            break
 
-    text = str(text)
+    if cpus_time_secs is not None:
+        return (datetime.datetime
+                .utcfromtimestamp(cpus_time_secs)
+                .strftime('%H:%M:%S.%f'))
 
-    # If 'text' is less than the width then add spaces.
-    # Otherwise, abbreviate and add a space.
-    if len(text) < self.width():
-      spaces = ' ' * (self.width() - len(text))
-      text += spaces
-    else:
-      text = text[:self.width() - 4]
-      text += '... '
-    return text
+    return None
 
 
 def main():
-  # Parse options for this script.
-  parser = OptionParser()
-  parser.add_option('--master')
-  (options, args) = parser.parse_args(sys.argv)
-
-  if options.master is None:
-    print "Missing --master\n"
-    parser.print_help()
-    exit(-1)
-
-  url = 'http://' + resolve(options.master) + '/master/state.json'
-  file = urllib.urlopen(url)
-  state = json.loads(file.read())
-  file.close()
-
-  # Build a dict from slave ID to `slaves'.
-  slaves = {}
-  for slave in state['slaves']:
-    slaves[slave['id']] = Slave(slave)
-
-  # Initiate the curl requests in batches less than the open file
-  # descriptor limit.
-  fd_limit = ulimit('-Sn')
-
-  batch = []
-  for slave in slaves.values():
-    if len(batch) == fd_limit:
-      for slave in batch:
-        slave.load() # Forces close of open file descriptors.
-      batch = []
-    slave.curl()
-    batch.append(slave)
-
-  columns = {}
-
-  columns[0] = Column('USER', USER_COLUMN_WIDTH)
-  columns[1] = Column('FRAMEWORK', FRAMEWORK_COLUMN_WIDTH)
-  columns[2] = Column('TASK', TASK_COLUMN_WIDTH)
-  columns[3] = Column('SLAVE', SLAVE_COLUMN_WIDTH)
-  columns[4] = Column('MEM', MEM_COLUMN_WIDTH)
-  columns[5] = Column('TIME', TIME_COLUMN_WIDTH)
-
-  for i in columns:
-    sys.stdout.write(columns[i].title)
-    sys.stdout.write(' ' * columns[i].padding)
-
-  for framework in state['frameworks']:
-    for task in framework['tasks']:
-      sys.stdout.write('\n')
-      sys.stdout.write(columns[0].format(framework['user']))
-      sys.stdout.write(columns[1].format(framework['name']))
-      sys.stdout.write(columns[2].format(task['name']))
-      sys.stdout.write(columns[3].format(slaves[task['slave_id']].hostname()))
-
-      # Get memory usage and cpus *time* from the slave (we can't get
-      # cpus % unless we have a previous value).
-      mem_rss_bytes = slaves[task['slave_id']].mem_rss_bytes(task)
-      cpus_time_secs = slaves[task['slave_id']].cpus_time_secs(task)
-
-      if mem_rss_bytes is not None:
-        MB = 1024.0 * 1024.0
-        mem_rss_megabytes = mem_rss_bytes / MB
-        s = "{0:.1f} MB".format(mem_rss_megabytes)
-        sys.stdout.write(columns[4].format(s))
-      else:
-        sys.stdout.write(columns[4].format(None))
+    # Parse options for this script.
+    parser = OptionParser()
+    parser.add_option('--master')
+    parser.add_option('--timeout', default=5.0)
+    parser.add_option('--verbose', default=False)
+    (options, args) = parser.parse_args(sys.argv)
+
+    if options.master is None:
+        sys.stderr.write('Missing --master\n')
+        parser.print_help()
+        sys.exit(-1)
+
+    try:
+        timeout = float(options.timeout)
+    except:
+        sys.stderr.write('Expecting --timeout to be a floating point number\n')
+        sys.exit(-1)
+
+    # Get the master's state.
+    url = 'http://' + resolve(options.master) + '/master/state.json'
+    with closing(urllib2.urlopen(url)) as file:
+        try:
+            state = json.loads(file.read())
+        except:
+            sys.stderr.write('Failed to read the master state\n')
+            sys.exit(1)
+
+    # Collect all the active frameworks and tasks by slave ID.
+    active = {}
+    for framework in state['frameworks']:
+        for task in framework['tasks']:
+            if task['slave_id'] not in active.keys():
+                active[task['slave_id']] = []
+            active[task['slave_id']].append((framework, task))
+
+    # Now set up the columns.
+    columns = {}
+
+    columns[0] = Column('USER', USER_COLUMN_WIDTH)
+    columns[1] = Column('FRAMEWORK', FRAMEWORK_COLUMN_WIDTH)
+    columns[2] = Column('TASK', TASK_COLUMN_WIDTH)
+    columns[3] = Column('SLAVE', SLAVE_COLUMN_WIDTH)
+    columns[4] = Column('MEM', MEM_COLUMN_WIDTH)
+    columns[5] = Column('TIME', TIME_COLUMN_WIDTH)
+
+    # Output the header.
+    for i in columns:
+        sys.stdout.write(columns[i].title)
+        sys.stdout.write(' ' * columns[i].padding)
+
+    # Helper to get the statistics from the slave.
+    def get(slave):
+        pid = slave['pid']
+        url = 'http://' + pid[len('slave(1)@'):] + '/monitor/statistics.json'
+        with closing(urllib2.urlopen(url)) as file:
+            return json.loads(file.read())
+
+    with ThreadingExecutor() as executor:
+        # Grab all the slaves with active tasks.
+        slaves = [slave for slave in state['slaves'] if slave['id'] in active]
+
+        # Now submit calls to get the statistics for each slave.
+        futures = dict((executor.submit(get, slave), slave)
+                       for slave in slaves)
+
+        # And wait for each future to complete!
+        for future in as_completed(futures, timeout):
+            slave = futures[future]
+            statistics = None
+            try:
+                statistics = future.result()
+            except TimeoutError:
+                sys.stderr.write('Timed out while waiting for slaves\n')
+                sys.exit(1)
+            except Exception as e:
+                # TODO(benh): Print error if 'verbose'.
+                pass
+            finally:
+                for framework, task in active[slave['id']]:
+                    sys.stdout.write('\n')
+                    sys.stdout.write(columns[0].truncate(framework['user']))
+                    sys.stdout.write(columns[1].truncate(framework['name']))
+                    sys.stdout.write(columns[2].truncate(task['name']))
+                    sys.stdout.write(columns[3].truncate(slave['hostname']))
+                    sys.stdout.write(columns[4].truncate(mem(task, statistics)))
+                    sys.stdout.write(columns[5].truncate(time(task, statistics)))
+
+    sys.stdout.write('\n')
+    sys.exit(0)
 
-      if cpus_time_secs is not None:
-        dt = datetime.datetime.utcfromtimestamp(cpus_time_secs)
-        sys.stdout.write(columns[5].format(dt.strftime('%H:%M:%S.%f')))
-      else:
-        sys.stdout.write(columns[5].format(None))
 
-  sys.stdout.write('\n')
-  sys.stdout.flush()
+if __name__ == "__main__":
+  def handler(signal, frame):
+    sys.stdout.write('\n')
+    sys.exit(130)
 
+  signal.signal(signal.SIGINT, handler)
 
-if __name__ == "__main__":
   main()

http://git-wip-us.apache.org/repos/asf/mesos/blob/bb8a627c/src/cli/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/cli/mesos.cpp b/src/cli/mesos.cpp
index 17a9f0c..fbdc3b9 100644
--- a/src/cli/mesos.cpp
+++ b/src/cli/mesos.cpp
@@ -68,6 +68,12 @@ int main(int argc, char** argv)
     exit(1);
   }
 
+  // Update PYTHONPATH to include path to installed 'mesos' module.
+  // TODO(benh): Remove this if/when we install the 'mesos' module via
+  // PIP and setuptools.
+  string path = path::join(PKGLIBEXECDIR, "python");
+  os::setenv("PYTHONPATH", os::getenv("PYTHONPATH", false) + ":" + path);
+
   // Now dispatch to any mesos-'command' on PATH.
   if (string(argv[1]) == "help") {
     if (argc == 2) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/bb8a627c/src/cli/python/mesos/__init__.py
----------------------------------------------------------------------
diff --git a/src/cli/python/mesos/__init__.py b/src/cli/python/mesos/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/mesos/blob/bb8a627c/src/cli/python/mesos/cli.py
----------------------------------------------------------------------
diff --git a/src/cli/python/mesos/cli.py b/src/cli/python/mesos/cli.py
new file mode 100644
index 0000000..7712886
--- /dev/null
+++ b/src/cli/python/mesos/cli.py
@@ -0,0 +1,24 @@
+import subprocess
+
+# Helper that uses 'mesos-resolve' to resolve a master IP:port from
+# one of:
+#     zk://host1:port1,host2:port2,.../path
+#     zk://username:password@host1:port1,host2:port2,.../path
+#     file://path/to/file (where file contains one of the above)
+def resolve(master):
+  process = subprocess.Popen(
+    ['mesos-resolve', master],
+    stdin=None,
+    stdout=subprocess.PIPE,
+    stderr=subprocess.PIPE,
+    shell=False)
+
+  status = process.wait()
+  if status != 0:
+    raise Exception('Failed to execute \'mesos-resolve %s\':\n%s'
+                    % (master, process.stderr.read()))
+
+  result = process.stdout.read()
+  process.stdout.close()
+  process.stderr.close()
+  return result

http://git-wip-us.apache.org/repos/asf/mesos/blob/bb8a627c/src/cli/python/mesos/futures.py
----------------------------------------------------------------------
diff --git a/src/cli/python/mesos/futures.py b/src/cli/python/mesos/futures.py
new file mode 100644
index 0000000..5486f1f
--- /dev/null
+++ b/src/cli/python/mesos/futures.py
@@ -0,0 +1,169 @@
+try:
+    from concurrent.futures import *
+except ImportError:
+    import threading
+    import time
+
+    from Queue import Queue
+
+    class TimeoutError(Exception):
+        """The operation timed out"""
+
+    class Future(object):
+        def __init__(self):
+            self._lock = threading.RLock()
+            self._condition = threading.Condition(self._lock)
+            self._done = False
+            self._result = None
+            self._exception = None
+            self._exc_info = None
+            self._callbacks = []
+
+        def cancel(self):
+            # These futures are possibly backed by threads which can
+            # not (easily or portably) be interrupted so instead we
+            # simply don't let people cancel.
+
+            # TODO(benh): If more use cases come about that want to
+            # differentiate a started versus a
+            raise False
+
+        def cancelled(self):
+            return False
+
+        def running(self):
+            return not self.done()
+
+        def done(self):
+            with self._lock:
+                return self._done
+
+        def result(self, timeout=None):
+            with self._lock:
+                self._await(timeout)
+                if self._exception:
+                    raise self._exception
+                return self._result
+
+        def exception(self, timeout=None):
+            with self._lock:
+                self._await(timeout)
+                if self._exception:
+                    return self._exception
+                return None
+
+        def add_done_callback(self, fn):
+            run = False
+            with self._lock:
+                if self._done:
+                    run = True
+                else:
+                    self._callbacks.append(fn)
+            if run:
+                try:
+                    fn(self)
+                except:
+                    # TODO(benh): Log if Exception, but semantics tell
+                    # us to ignore regardless.
+                    pass
+
+        def set_result(self, result):
+            with self._lock:
+                self._result = result
+                self._finish()
+
+        def set_exception(self, exception):
+            with self._lock:
+                self._exception = exception
+                self._done = True
+
+        def _await(self, timeout):
+            with self._lock:
+                if not self._done:
+                    self._condition.wait(timeout)
+                    if not self._done:
+                        raise TimeoutError()
+
+        def _finish(self):
+            callbacks = None
+            with self._lock:
+                self._done = True
+                callbacks = self._callbacks
+                self._callbacks = None
+            for cb in callbacks:
+                try:
+                    cb(self)
+                except:
+                    # TODO(benh): Log if Exception, but semantics tell
+                    # us to ignore regardless.
+                    pass
+
+
+    class Executor(object):
+        def __enter__(self):
+            return self
+
+        def __exit__(self, type, value, traceback):
+            self.shutdown()
+
+
+    def as_completed(futures, timeout=None):
+        # Record the start time in order to determine the remaining
+        # timeout as futures are completed.
+        start = time.time()
+
+        # Use a queue to collect the done futures.
+        queue = Queue()
+
+        # Define a helper for the future "done callback".
+        def done(future):
+            queue.put(future)
+
+        # Add a "done callback" for each future.
+        for future in futures:
+            future.add_done_callback(done)
+
+        # Helper to determine the remaining timeout.
+        def remaining():
+            if timeout is None:
+                return None
+            end = start + timeout
+            remaining = end - time.time()
+            return remaining if remaining >= 0 else 0
+
+        # Now wait until all the futures have completed or we timeout.
+        finished = 0
+        while finished < len(futures):
+            try:
+                yield queue.get(timeout=remaining())
+            except Empty:
+                raise TimeoutError()
+            else:
+                finished += 1
+
+
+class ThreadingExecutor(Executor):
+    def __init__(self):
+        self._threads = []
+
+    def submit(self, fn, *args, **kwargs):
+        future = Future()
+        def run():
+            try:
+                future.set_result(fn(*args, **kwargs))
+            except Exception as e:
+                future.set_exception(e)
+        thread = threading.Thread(target=run)
+        thread.start()
+        self._threads.append(thread)
+        return future
+
+    def map(self, func, iterables, timeout=None):
+        # TODO(benh): Implement!
+        raise NotImplementedError()
+
+    def shutdown(self, wait=True):
+        if wait:
+            for thread in self._threads:
+                thread.join()
+        self._threads = []