You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/17 17:13:12 UTC

svn commit: r911048 - in /qpid/trunk/qpid/python/examples/reservations: ./ common.py inventory machine-agent reserve

Author: rhs
Date: Wed Feb 17 16:13:12 2010
New Revision: 911048

URL: http://svn.apache.org/viewvc?rev=911048&view=rev
Log:
added reservations to examples

Added:
    qpid/trunk/qpid/python/examples/reservations/
    qpid/trunk/qpid/python/examples/reservations/common.py
    qpid/trunk/qpid/python/examples/reservations/inventory   (with props)
    qpid/trunk/qpid/python/examples/reservations/machine-agent   (with props)
    qpid/trunk/qpid/python/examples/reservations/reserve   (with props)

Added: qpid/trunk/qpid/python/examples/reservations/common.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/reservations/common.py?rev=911048&view=auto
==============================================================================
--- qpid/trunk/qpid/python/examples/reservations/common.py (added)
+++ qpid/trunk/qpid/python/examples/reservations/common.py Wed Feb 17 16:13:12 2010
@@ -0,0 +1,86 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import traceback
+from fnmatch import fnmatch
+from qpid.messaging import *
+
+class Dispatcher:
+
+  def unhandled(self, msg):
+    print "UNHANDLED MESSAGE: %s" % msg
+
+  def ignored(self, msg):
+    return False
+
+  def dispatch(self, msg):
+    try:
+      if self.ignored(msg):
+        return ()
+      else:
+        type = msg.properties.get("type")
+        replies = getattr(self, "do_%s" % type, self.unhandled)(msg)
+        if replies is None:
+          return ()
+        else:
+          return replies
+    except:
+      traceback.print_exc()
+      return ()
+
+  def run(self, session):
+    senders = {}
+    while self.running():
+      msg = session.next_receiver().fetch()
+      replies = self.dispatch(msg)
+
+      count = len(replies)
+      sequence = 1
+      for r in replies:
+        if senders.has_key(r.to):
+          rsnd = senders[r.to]
+        else:
+          rsnd = session.sender(r.to)
+          senders[r.to] = rsnd
+
+        r.correlation_id = msg.correlation_id
+        r.properties["count"] = count
+        r.properties["sequence"] = sequence
+        sequence += 1
+        try:
+          rsnd.send(r)
+        except SendError, e:
+          print e
+          del senders[r.to]
+          rsnd.close()
+
+      session.acknowledge(msg)
+
+def get_status(msg):
+  return msg.content["identity"], msg.content["status"], msg.content["owner"]
+
+FREE = "free"
+BUSY = "busy"
+
+def match(value, patterns):
+  for p in patterns:
+    if fnmatch(value, p):
+      return True
+  return False

Added: qpid/trunk/qpid/python/examples/reservations/inventory
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/reservations/inventory?rev=911048&view=auto
==============================================================================
--- qpid/trunk/qpid/python/examples/reservations/inventory (added)
+++ qpid/trunk/qpid/python/examples/reservations/inventory Wed Feb 17 16:13:12 2010
@@ -0,0 +1,100 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, traceback
+from qpid.messaging import *
+from qpid.util import URL
+from qpid.log import enable, DEBUG, WARN
+from common import *
+
+parser = optparse.OptionParser(usage="usage: %prog [options]",
+                               description="machine inventory agent")
+parser.add_option("-b", "--broker", default="localhost",
+                  help="connect to specified BROKER (default %default)")
+parser.add_option("-d", "--database",
+                  help="database file for persistent machine status")
+parser.add_option("-a", "--address", default="reservations",
+                  help="address for reservation requests")
+parser.add_option("-v", dest="verbose", action="store_true",
+                  help="enable verbose logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+  enable("qpid", DEBUG)
+else:
+  enable("qpid", WARN)
+
+url = URL(opts.broker)
+conn = Connection.open(url.host, url.port or AMQP_PORT,
+                       username=url.user, password=url.password,
+                       reconnect=True,
+                       reconnect_delay=1)
+
+class Inventory(Dispatcher):
+
+  def __init__(self):
+    self.agents = {}
+
+  def running(self):
+    return True
+
+  def do_status(self, msg):
+    id, status, owner = get_status(msg)
+    self.agents[id] = (status, owner)
+
+  def do_query(self, msg):
+    patterns = msg.content["identity"]
+    result = []
+    for id, (status, owner) in self.agents.items():
+      if match(id, patterns):
+        r = Message(to = msg.reply_to,
+                    properties = {
+            "type": "status"
+            },
+                    content = {
+            "identity": id,
+            "status": status,
+            "owner": owner
+            })
+        result.append(r)
+        continue
+    if not result:
+      result.append(Message(to = msg.reply_to,
+                            properties = {"type": "empty"}))
+    return result
+
+  def ignored(self, msg):
+    type = msg.properties.get("type")
+    return type not in ("status", "query")
+
+try:
+  ssn = conn.session()
+  rcv = ssn.receiver(opts.address, capacity = 10)
+  snd = ssn.sender(opts.address)
+  snd.send(Message(reply_to = opts.address,
+                   properties = {"type": "discover", "identity": ["*"]}))
+
+  inv = Inventory()
+  inv.run(ssn)
+except KeyboardInterrupt:
+  pass
+finally:
+  conn.close()

Propchange: qpid/trunk/qpid/python/examples/reservations/inventory
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/python/examples/reservations/machine-agent
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/reservations/machine-agent?rev=911048&view=auto
==============================================================================
--- qpid/trunk/qpid/python/examples/reservations/machine-agent (added)
+++ qpid/trunk/qpid/python/examples/reservations/machine-agent Wed Feb 17 16:13:12 2010
@@ -0,0 +1,109 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, socket
+from qpid.messaging import *
+from qpid.util import URL
+from qpid.log import enable, DEBUG, WARN
+from common import *
+
+host = socket.gethostname()
+
+parser = optparse.OptionParser(usage="usage: %prog [options]",
+                               description="machine reservation agent")
+parser.add_option("-b", "--broker", default="localhost",
+                  help="connect to specified BROKER (default %default)")
+parser.add_option("-d", "--database",
+                  help="database file for persistent machine status")
+parser.add_option("-a", "--address", default="reservations",
+                  help="address for reservation requests")
+parser.add_option("-i", "--identity", default=host,
+                  help="resource id (default %default)")
+parser.add_option("-v", dest="verbose", action="store_true",
+                  help="enable verbose logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+  enable("qpid", DEBUG)
+else:
+  enable("qpid", WARN)
+
+url = URL(opts.broker)
+conn = Connection.open(url.host, url.port or AMQP_PORT,
+                       username=url.user, password=url.password,
+                       reconnect=True,
+                       reconnect_delay=1)
+
+
+class Agent(Dispatcher):
+
+  def __init__(self, identity):
+    self.identity = identity
+    self.status = FREE
+    self.owner = None
+
+  def running(self):
+    return True
+
+  def get_status(self):
+    msg = Message(properties = {"type": "status"},
+                  content = {"identity": self.identity,
+                             "status": self.status,
+                             "owner": self.owner})
+    return msg
+
+  def do_discover(self, msg):
+    r = self.get_status()
+    r.to = msg.reply_to
+    return [r]
+
+  def do_reserve(self, msg):
+    if self.status == FREE:
+      self.owner = msg.content["owner"]
+      self.status = BUSY
+    return self.do_discover(msg)
+
+  def do_release(self, msg):
+    if self.owner == msg.content["owner"]:
+      self.status = FREE
+      self.owner = None
+    return self.do_discover(msg)
+
+  def ignored(self, msg):
+    patterns = msg.properties.get("identity")
+    type = msg.properties.get("type")
+    if patterns and match(self.identity, patterns):
+      return type == "status"
+    else:
+      return True
+
+try:
+  ssn = conn.session()
+  rcv = ssn.receiver(opts.address)
+  rcv.capacity = 10
+  snd = ssn.sender(opts.address)
+  agent = Agent(opts.identity)
+  snd.send(agent.get_status())
+  agent.run(ssn)
+except KeyboardInterrupt:
+  pass
+finally:
+  conn.close()

Propchange: qpid/trunk/qpid/python/examples/reservations/machine-agent
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/python/examples/reservations/reserve
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/reservations/reserve?rev=911048&view=auto
==============================================================================
--- qpid/trunk/qpid/python/examples/reservations/reserve (added)
+++ qpid/trunk/qpid/python/examples/reservations/reserve Wed Feb 17 16:13:12 2010
@@ -0,0 +1,200 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, os, sys, time
+from uuid import uuid4
+from qpid.messaging import *
+from qpid.util import URL
+from qpid.log import enable, DEBUG, WARN
+from common import *
+
+parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...",
+                               description="reserve a machine")
+parser.add_option("-b", "--broker", default="localhost",
+                  help="connect to specified BROKER (default %default)")
+parser.add_option("-a", "--address", default="reservations",
+                  help="address for reservation requests")
+parser.add_option("-r", "--release", action="store_true",
+                  help="release any machines matching the pattern")
+parser.add_option("-s", "--status", action="store_true",
+                  help="list machine status")
+parser.add_option("-d", "--discover", action="store_true",
+                  help="use discovery instead of inventory")
+parser.add_option("-o", "--owner", default=os.environ["USER"],
+                  help="the holder of the reservation")
+parser.add_option("-n", "--number", type=int, default=1,
+                  help="the number of machines to reserve")
+parser.add_option("-t", "--timeout", type=float, default=10,
+                  help="timeout in seconds to wait for resources")
+parser.add_option("-v", dest="verbose", action="store_true",
+                  help="enable verbose logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+  enable("qpid", DEBUG)
+else:
+  enable("qpid", WARN)
+
+if args:
+  patterns = args
+else:
+  patterns = ["*"]
+
+url = URL(opts.broker)
+conn = Connection.open(url.host, url.port or AMQP_PORT,
+                       username=url.user, password=url.password)
+
+if opts.release:
+  request_type = "release"
+  candidate_status = BUSY
+  candidate_owner = opts.owner
+else:
+  request_type = "reserve"
+  candidate_status = FREE
+  candidate_owner = None
+
+class Requester(Dispatcher):
+
+  def __init__(self):
+    self.agents = {}
+    self.requests = set()
+    self.outstanding = set()
+
+  def agent_status(self, id):
+    status, owner = self.agents[id]
+    if owner:
+      return "%s %s(%s)" % (id, status, owner)
+    else:
+      return "%s %s" % (id, status)
+
+  def correlation(self, cid):
+    self.requests.add(cid)
+    self.outstanding.add(cid)
+
+  def ignored(self, msg):
+    return msg.properties.get("type") not in ("status", "empty") or \
+        msg.correlation_id not in self.requests
+
+  def do_status(self, msg):
+    id, status, owner = get_status(msg)
+    self.agents[id] = (status, owner)
+
+    if opts.status:
+      print self.agent_status(id)
+
+  def do_empty(self, msg):
+    print "no matching resources"
+
+  def candidates(self, candidate_status, candidate_owner):
+    for id, (status, owner) in self.agents.items():
+      if status == candidate_status and owner == candidate_owner:
+        yield id
+
+  def dispatch(self, msg):
+    result = Dispatcher.dispatch(self, msg)
+    count = msg.properties.get("count")
+    sequence = msg.properties.get("sequence")
+    if count and sequence == count:
+      self.outstanding.discard(msg.correlation_id)
+    return result
+
+try:
+  ssn = conn.session()
+  rcv = ssn.receiver(opts.address, capacity=10)
+  snd = ssn.sender(opts.address)
+
+  correlation_id = str(uuid4())
+
+  if opts.discover:
+    properties = {"type": "discover", "identity": patterns}
+    content = None
+  else:
+    properties = {"type": "query"}
+    content = {"identity": patterns}
+
+  snd.send(Message(reply_to = opts.address,
+                   correlation_id = correlation_id,
+                   properties = properties,
+                   content = content))
+
+  req = Requester()
+  req.correlation(correlation_id)
+
+  start = time.time()
+  ellapsed = 0
+  requested = set()
+  discovering = opts.discover
+
+  while ellapsed <= opts.timeout and (discovering or req.outstanding):
+    try:
+      msg = rcv.fetch(opts.timeout - ellapsed)
+      ssn.acknowledge(msg)
+    except Empty:
+      continue
+    finally:
+      ellapsed = time.time() - start
+
+    req.dispatch(msg)
+    if not opts.status:
+      if len(requested) < opts.number:
+        for cid in req.candidates(candidate_status, candidate_owner):
+          if cid in requested: continue
+          req_msg = Message(reply_to = opts.address,
+                            correlation_id = str(uuid4()),
+                            properties = {"type": request_type,
+                                          "identity": [cid]},
+                            content = {"owner": opts.owner})
+          if not requested:
+            print "requesting %s:" % request_type,
+          print cid,
+          sys.stdout.flush()
+          req.correlation(req_msg.correlation_id)
+          snd.send(req_msg)
+          requested.add(cid)
+      else:
+        discovering = False
+
+  if requested:
+    print
+    owners = {}
+    for id in requested:
+      st, ow = req.agents[id]
+      if not owners.has_key(ow):
+        owners[ow] = []
+      owners[ow].append(id)
+    keys = list(owners.keys())
+    keys.sort()
+    for k in keys:
+      owners[k].sort()
+      v = ", ".join(owners[k])
+      if k is None:
+        print "free: %s" % v
+      else:
+        print "owner %s: %s" % (k, v)
+  elif req.agents and not opts.status:
+    print "no available resources"
+
+  if req.outstanding:
+    print "request timed out"
+except KeyboardInterrupt:
+  pass
+finally:
+  conn.close()

Propchange: qpid/trunk/qpid/python/examples/reservations/reserve
------------------------------------------------------------------------------
    svn:executable = *



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org