You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/12/05 23:34:00 UTC

git commit: Added 'mesos-scp' CLI command.

Updated Branches:
  refs/heads/master 3f176033d -> cac2d3995


Added 'mesos-scp' CLI command.

The CLI command mesos-scp uses scp to copy the specified local file(s)
to a specified directory on all slaves known by the current master.

From: Du Li <li...@gmail.com>
Review: https://reviews.apache.org/r/14963


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cac2d399
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cac2d399
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cac2d399

Branch: refs/heads/master
Commit: cac2d39958afe95aeb465a939bb19c3ea6b1ac1a
Parents: 3f17603
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Dec 4 12:16:47 2013 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 5 14:29:39 2013 -0800

----------------------------------------------------------------------
 src/Makefile.am             |   1 +
 src/cli/mesos-cat           |  36 +++++--------
 src/cli/mesos-ps            |  24 ++++-----
 src/cli/mesos-scp           | 109 +++++++++++++++++++++++++++++++++++++++
 src/cli/mesos-tail          |  34 +++++-------
 src/cli/python/mesos/cli.py |   7 +++
 6 files changed, 153 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cac2d399/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 42dafbc..5f211a2 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -405,6 +405,7 @@ mesos_resolve_LDADD = libmesos.la
 dist_bin_SCRIPTS +=							\
   cli/mesos-cat								\
   cli/mesos-ps								\
+  cli/mesos-scp								\
   cli/mesos-tail
 
 # Also install the supporting scripts for the Python based CLI tools.

http://git-wip-us.apache.org/repos/asf/mesos/blob/cac2d399/src/cli/mesos-cat
----------------------------------------------------------------------
diff --git a/src/cli/mesos-cat b/src/cli/mesos-cat
index f898e82..73dc63e 100755
--- a/src/cli/mesos-cat
+++ b/src/cli/mesos-cat
@@ -6,9 +6,8 @@ import signal
 import sys
 import urllib2
 
-from contextlib import closing
 from optparse import OptionParser
-from urllib2 import HTTPError, urlopen
+from urllib2 import HTTPError
 
 from mesos import http
 from mesos.cli import *
@@ -16,8 +15,7 @@ from mesos.futures import *
 
 
 if sys.version_info < (2,6,0):
-    sys.stderr.write('Expecting Python >= 2.6\n')
-    sys.exit(1)
+    fatal('Expecting Python >= 2.6')
 
 
 def read(slave, task, file):
@@ -32,8 +30,7 @@ def read(slave, task, file):
     try:
         state = json.loads(http.get(slave['pid'], '/state.json'))
     except:
-        sys.stderr.write('Failed to get state from slave\n')
-        sys.exit(1)
+        fatal('Failed to get state from slave')
 
     directory = None
 
@@ -56,8 +53,7 @@ def read(slave, task, file):
                     break
 
     if directory is None:
-        sys.stderr.write('File not found\n')
-        sys.exit(1)
+        fatal('File not found')
 
     path = os.path.join(directory, file)
 
@@ -70,10 +66,9 @@ def read(slave, task, file):
              'offset': -1}))
     except HTTPError as error:
         if error.code == 404:
-            sys.stderr.write('No such file or directory\n')
+            fatal('No such file or directory')
         else:
-            sys.stderr.write('Failed to determine length of file\n')
-        sys.exit(1)
+            fatal('Failed to determine length of file')
 
     length = result['offset']
 
@@ -94,8 +89,7 @@ def read(slave, task, file):
             if offset == length:
                 return
         except:
-            sys.stderr.write('Failed to read file from slave\n')
-            sys.exit(1)
+            fatal('Failed to read file from slave')
 
 
 def main():
@@ -124,8 +118,7 @@ def main():
         state = json.loads(http.get(resolve(options.master),
                                     '/master/state.json'))
     except:
-        sys.stderr.write('Failed to get the master state\n')
-        sys.exit(1)
+        fatal('Failed to get the master state')
 
     # Build a dict from slave ID to slaves.
     slaves = {}
@@ -155,15 +148,14 @@ def main():
                     cat(slaves[task['slave_id']], task)
                     sys.exit(0)
 
-    sys.stderr.write('No task found!\n')
-    sys.exit(-1)
+    fatal('No task found!')
 
 
 if __name__ == '__main__':
-  def signal_handler(signal, frame):
-    sys.stdout.write('\n')
-    sys.exit(130)
+    def signal_handler(signal, frame):
+        sys.stdout.write('\n')
+        sys.exit(130)
 
-  signal.signal(signal.SIGINT, signal_handler)
+    signal.signal(signal.SIGINT, signal_handler)
 
-  main()
+    main()

http://git-wip-us.apache.org/repos/asf/mesos/blob/cac2d399/src/cli/mesos-ps
----------------------------------------------------------------------
diff --git a/src/cli/mesos-ps b/src/cli/mesos-ps
index 83cde13..ddd9ec5 100755
--- a/src/cli/mesos-ps
+++ b/src/cli/mesos-ps
@@ -5,9 +5,7 @@ import json
 import signal
 import sys
 
-from contextlib import closing
 from optparse import OptionParser
-from urllib2 import urlopen
 
 from mesos import http
 from mesos.cli import *
@@ -15,8 +13,7 @@ from mesos.futures import *
 
 
 if sys.version_info < (2,6,0):
-    sys.stderr.write('Expecting Python >= 2.6\n')
-    sys.exit(1)
+    fatal('Expecting Python >= 2.6')
 
 
 USER_COLUMN_PADDING = 4
@@ -158,16 +155,14 @@ def main():
     try:
         timeout = float(options.timeout)
     except:
-        sys.stderr.write('Expecting --timeout to be a floating point number\n')
-        sys.exit(-1)
+        fatal('Expecting --timeout to be a floating point number')
 
     # Get the master's state.
     try:
         state = json.loads(http.get(resolve(options.master),
                                     '/master/state.json'))
     except:
-        sys.stderr.write('Failed to get the master state\n')
-        sys.exit(1)
+        fatal('Failed to get the master state')
 
     # Collect all the active frameworks and tasks by slave ID.
     active = {}
@@ -208,8 +203,7 @@ def main():
             try:
                 statistics = json.loads(future.result())
             except TimeoutError:
-                sys.stderr.write('Timed out while waiting for slaves\n')
-                sys.exit(1)
+                fatal('Timed out while waiting for slaves')
             except Exception as e:
                 # TODO(benh): Print error if 'verbose'.
                 pass
@@ -229,10 +223,10 @@ def main():
 
 
 if __name__ == '__main__':
-  def handler(signal, frame):
-    sys.stdout.write('\n')
-    sys.exit(130)
+    def handler(signal, frame):
+        sys.stdout.write('\n')
+        sys.exit(130)
 
-  signal.signal(signal.SIGINT, handler)
+    signal.signal(signal.SIGINT, handler)
 
-  main()
+    main()

http://git-wip-us.apache.org/repos/asf/mesos/blob/cac2d399/src/cli/mesos-scp
----------------------------------------------------------------------
diff --git a/src/cli/mesos-scp b/src/cli/mesos-scp
new file mode 100755
index 0000000..77b8557
--- /dev/null
+++ b/src/cli/mesos-scp
@@ -0,0 +1,109 @@
+#!/usr/bin/env python
+
+# Uses 'scp' to copy local files to all slaves reported by the master.
+
+import json
+import signal
+import subprocess
+import sys
+
+from optparse import OptionParser
+
+from mesos import http
+from mesos.cli import *
+from mesos.futures import *
+
+
+if sys.version_info < (2,6,0):
+    fatal('Expecting Python >= 2.6')
+
+
+def scp(host, src, dst):
+    cmd = 'scp -pr %s %s' % (src, host + ':' + dst)
+    try:
+        process = subprocess.Popen(
+            cmd,
+            stdin=None,
+            stdout=None,
+            stderr=None,
+            shell=True)
+        return process.wait() == 0
+    except Exception as e:
+        sys.stderr.write('Exception %s when doing %s\n' % (e, cmd))
+        return False
+
+
+def main():
+    # Parse options for this script.
+    parser = OptionParser()
+    parser.add_option('--master')
+    parser.usage = '%prog [options] local-file(s) remote-directory'
+    parser.epilog = ('This command uploads the specifeid local file(s) '
+                     'to a remote directory on all slaves known by the '
+                     'master. The current implementation assumes '
+                     'passwordless scp')
+    (options, args) = parser.parse_args(sys.argv)
+
+    if options.master is None:
+        usage('Missing --master', parser)
+
+  # Get the master's state.
+    try:
+        state = json.loads(http.get(resolve(options.master),
+                                    '/master/state.json'))
+    except:
+        fatal('Failed to get the master state')
+
+    # all slaves that the master is aware of
+    slaves = set(slave['hostname'] for slave in state['slaves'])
+
+    if len(args) < 3:
+        usage('Missing arguments', parser)
+
+    # All arguments after args[0] until the last argument are the
+    # local files.
+    src = " ".join(args[1:-1])
+
+    # Remote directory is the very last argument.
+    dst = args[-1]
+
+    success = set()
+    with ThreadingExecutor() as executor:
+        futures = dict((executor.submit(scp, slave, src, dst), slave)
+                       for slave in slaves)
+        for future in as_completed(futures):
+            slave = futures[future]
+            try:
+                status = future.result()
+                if status:
+                    success.add(slave)
+            except Exception as e:
+                sys.stderr.write('Failed to copy to %s: %s\n' % (slave, e))
+
+    print
+
+    for slave in success:
+        print '%s\t%s' % (slave, 'uploaded')
+
+    print
+
+    failed = slaves - success
+    for slave in failed:
+        print '%s\t%s' % (slave, 'failed')
+
+    print
+
+    print ('----- %d uploaded, %d failed of total %d slaves'
+           % (len(success), len(failed), len(slaves)))
+
+    print
+
+
+if __name__ == '__main__':
+    def handler(signal, frame):
+        sys.stdout.write('\n')
+        sys.exit(130)
+
+    signal.signal(signal.SIGINT, handler)
+
+    main()

http://git-wip-us.apache.org/repos/asf/mesos/blob/cac2d399/src/cli/mesos-tail
----------------------------------------------------------------------
diff --git a/src/cli/mesos-tail b/src/cli/mesos-tail
index e2aeaac..256a804 100755
--- a/src/cli/mesos-tail
+++ b/src/cli/mesos-tail
@@ -7,9 +7,8 @@ import sys
 import time
 import itertools
 
-from contextlib import closing
 from optparse import OptionParser
-from urllib2 import HTTPError, urlopen
+from urllib2 import HTTPError
 
 from mesos import http
 from mesos.cli import *
@@ -17,8 +16,7 @@ from mesos.futures import *
 
 
 if sys.version_info < (2,6,0):
-    sys.stderr.write('Expecting Python >= 2.6\n')
-    sys.exit(1)
+    fatal('Expecting Python >= 2.6')
 
 def read_forever(slave, task, file):
     framework_id = task['framework_id']
@@ -32,8 +30,7 @@ def read_forever(slave, task, file):
     try:
       state = json.loads(http.get(slave['pid'], '/state.json'))
     except:
-      sys.stderr.write('Failed to get state from slave\n')
-      sys.exit(1)
+      fatal('Failed to get state from slave')
 
     directory = None
 
@@ -47,8 +44,7 @@ def read_forever(slave, task, file):
                     break
 
     if directory is None:
-        sys.stderr.write('Task directory not found\n')
-        sys.exit(1)
+        fatal('Task directory not found')
 
     path = os.path.join(directory, file)
 
@@ -65,10 +61,9 @@ def read_forever(slave, task, file):
                  'length': PAGE_LENGTH}))
         except HTTPError as error:
             if error.code == 404:
-                sys.stderr.write('No such file or directory\n')
+                fatal('No such file or directory')
             else:
-                sys.stderr.write('Failed to read file from slave\n')
-            sys.exit(1)
+                fatal('Failed to read file from slave')
         if len(result['data']) == 0:
             time.sleep(0.5)
             continue
@@ -102,8 +97,7 @@ def main():
         master_state = json.loads(http.get(resolve(options.master),
                                            '/master/state.json'))
     except:
-        sys.stderr.write('Failed to get the master state\n')
-        sys.exit(1)
+        fatal('Failed to get the master state')
 
     # Build a dict from slave ID to `slaves'.
     slaves = {}
@@ -123,16 +117,14 @@ def main():
                     tail(slaves[task['slave_id']], task, options.file)
                     sys.exit(0)
 
-    sys.stderr.write('No task or framework found!\n')
-    sys.stderr.flush()
-    sys.exit(-1)
+    fatal('No task or framework found!')
 
 
 if __name__ == '__main__':
-  def signal_handler(signal, frame):
-    sys.stdout.write('\n')
-    sys.exit(130)
+    def signal_handler(signal, frame):
+        sys.stdout.write('\n')
+        sys.exit(130)
 
-  signal.signal(signal.SIGINT, signal_handler)
+    signal.signal(signal.SIGINT, signal_handler)
 
-  main()
+    main()

http://git-wip-us.apache.org/repos/asf/mesos/blob/cac2d399/src/cli/python/mesos/cli.py
----------------------------------------------------------------------
diff --git a/src/cli/python/mesos/cli.py b/src/cli/python/mesos/cli.py
index 5c11d46..f32ba49 100644
--- a/src/cli/python/mesos/cli.py
+++ b/src/cli/python/mesos/cli.py
@@ -6,6 +6,13 @@ def usage(message, parser):
     sys.exit(-1)
 
 
+# Helper for printing out a message and then exiting.
+def fatal(message):
+    import sys
+    sys.stderr.write(message + '\n')
+    sys.exit(-1)
+
+
 # Helper that uses 'mesos-resolve' to resolve a master IP:port from
 # one of:
 #     zk://host1:port1,host2:port2,.../path