You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2007/11/07 23:30:40 UTC

svn commit: r592927 - in /incubator/qpid/trunk/qpid/python: hello-world qpid/client.py qpid/delegate.py qpid/peer.py

Author: rhs
Date: Wed Nov  7 14:30:40 2007
New Revision: 592927

URL: http://svn.apache.org/viewvc?rev=592927&view=rev
Log:
python API updates

Modified:
    incubator/qpid/trunk/qpid/python/hello-world
    incubator/qpid/trunk/qpid/python/qpid/client.py
    incubator/qpid/trunk/qpid/python/qpid/delegate.py
    incubator/qpid/trunk/qpid/python/qpid/peer.py

Modified: incubator/qpid/trunk/qpid/python/hello-world
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/hello-world?rev=592927&r1=592926&r2=592927&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/hello-world (original)
+++ incubator/qpid/trunk/qpid/python/hello-world Wed Nov  7 14:30:40 2007
@@ -5,20 +5,21 @@
 
 client = Client("127.0.0.1", 5672)
 client.start({"LOGIN": "guest", "PASSWORD": "guest"})
-ch = client.channel(1)
-ch.session_open()
-ch.queue_declare(queue="test")
-ch.queue_bind(exchange="amq.direct", queue="test", routing_key="test")
-#print ch.queue_query(queue="test")
-ch.message_subscribe(queue="test", destination="amq.direct")
-ch.message_flow("amq.direct", 0, 0xFFFFFFFF)
-ch.message_flow("amq.direct", 1, 0xFFFFFFFF)
+ssn = client.session()
+ssn.open()
+ssn.queue_declare(queue="test")
+ssn.queue_bind(exchange="amq.direct", queue="test", routing_key="test")
+#print ssn.queue_query(queue="test")
+ssn.message_subscribe(queue="test", destination="amq.direct")
+ssn.message_flow("amq.direct", 0, 0xFFFFFFFF)
+ssn.message_flow("amq.direct", 1, 0xFFFFFFFF)
 msg = Content("hello world")
 msg["content_type"] = "text/plain"
 msg["routing_key"] = "test"
 msg["reply_to"] = client.structs.reply_to("asdf", "fdsa")
 msg["application_headers"] = {"x": 1, "y": 2, "z": "zee"}
-ch.message_transfer(destination="amq.direct", content=msg)
+ssn.message_transfer(destination="amq.direct", content=msg)
 queue = client.queue("amq.direct")
 msg = queue.get(timeout=10)
 print msg
+ssn.close()

Modified: incubator/qpid/trunk/qpid/python/qpid/client.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/client.py?rev=592927&r1=592926&r2=592927&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/client.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/client.py Wed Nov  7 14:30:40 2007
@@ -23,7 +23,7 @@
 """
 
 import os, threading
-from peer import Peer, Closed
+from peer import Peer, Channel, Closed
 from delegate import Delegate
 from connection import Connection, Frame, connect
 from spec import load
@@ -45,6 +45,7 @@
         raise EnvironmentError("environment variable AMQP_SPEC must be set")
       self.spec = load(name)
     self.structs = StructFactory(self.spec)
+    self.sessions = {}
 
     self.mechanism = None
     self.response = None
@@ -86,7 +87,7 @@
 
     self.socket = connect(self.host, self.port)
     self.conn = Connection(self.socket, self.spec)
-    self.peer = Peer(self.conn, ClientDelegate(self), self.opened)
+    self.peer = Peer(self.conn, ClientDelegate(self), Session)
 
     self.conn.init()
     self.peer.start()
@@ -94,10 +95,29 @@
     self.channel(0).connection_open(self.vhost)
 
   def channel(self, id):
-    return self.peer.channel(id)
+    self.lock.acquire()
+    try:
+      ssn = self.peer.channel(id)
+      ssn.client = self
+      self.sessions[id] = ssn
+    finally:
+      self.lock.release()
+    return ssn
 
-  def opened(self, ch):
-    ch.references = References()
+  def session(self):
+    self.lock.acquire()
+    try:
+      id = None
+      for i in xrange(1, 64*1024):
+        if not self.sessions.has_key(id):
+          id = i
+          break
+    finally:
+      self.lock.release()
+    if id == None:
+      raise RuntimeError("out of channels")
+    else:
+      return self.channel(id)
 
   def close(self):
     self.socket.close()
@@ -144,16 +164,16 @@
     msg.ok()
 
   def channel_close(self, ch, msg):
-    ch.close(msg)
+    ch.closed(msg)
 
   def session_ack(self, ch, msg):
     pass
 
   def session_closed(self, ch, msg):
-    ch.close(msg)
+    ch.closed(msg)
 
   def connection_close(self, ch, msg):
-    self.client.peer.close(msg)
+    self.client.peer.closed(msg)
 
   def execution_complete(self, ch, msg):
     ch.completion.complete(msg.cumulative_execution_mark)
@@ -162,7 +182,7 @@
     future = ch.futures[msg.command_id]
     future.put_response(ch, msg.data)
 
-  def close(self, reason):
+  def closed(self, reason):
     self.client.closed = True
     self.client.reason = reason
     self.client.started.set()
@@ -185,3 +205,21 @@
 
   def struct(self, name, *args, **kwargs):
     return self.spec.struct(name, *args, **kwargs)
+
+class Session(Channel):
+
+  def __init__(self, *args):
+    Channel.__init__(self, *args)
+    self.references = References()
+    self.client = None
+
+  def open(self):
+    self.session_open()
+
+  def close(self):
+    self.session_close()
+    self.client.lock.acquire()
+    try:
+      del self.client.sessions[self.id]
+    finally:
+      self.client.lock.release()

Modified: incubator/qpid/trunk/qpid/python/qpid/delegate.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/delegate.py?rev=592927&r1=592926&r2=592927&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/delegate.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/delegate.py Wed Nov  7 14:30:40 2007
@@ -49,5 +49,5 @@
       print >> sys.stderr, "Error in handler: %s\n\n%s" % \
             (_handler_name(method), traceback.format_exc())
 
-  def close(self, reason):
+  def closed(self, reason):
     print "Connection closed: %s" % reason

Modified: incubator/qpid/trunk/qpid/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/peer.py?rev=592927&r1=592926&r2=592927&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/peer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/peer.py Wed Nov  7 14:30:40 2007
@@ -51,14 +51,17 @@
 
 class Peer:
 
-  def __init__(self, conn, delegate, channel_callback=None):
+  def __init__(self, conn, delegate, channel_factory=None):
     self.conn = conn
     self.delegate = delegate
     self.outgoing = Queue(0)
     self.work = Queue(0)
     self.channels = {}
     self.lock = thread.allocate_lock()
-    self.channel_callback = channel_callback #notified when channels are created
+    if channel_factory:
+      self.channel_factory = channel_factory
+    else:
+      self.channel_factory = Channel
 
   def channel(self, id):
     self.lock.acquire()
@@ -66,10 +69,8 @@
       try:
         ch = self.channels[id]
       except KeyError:
-        ch = Channel(id, self.outgoing, self.conn.spec)
+        ch = self.channel_factory(id, self.outgoing, self.conn.spec)
         self.channels[id] = ch
-        if self.channel_callback:
-          self.channel_callback(ch)
     finally:
       self.lock.release()
     return ch
@@ -82,7 +83,7 @@
   def fatal(self, message=None):
     """Call when an unexpected exception occurs that will kill a thread."""
     if message: print >> sys.stderr, message
-    self.close("Fatal error: %s\n%s" % (message or "", traceback.format_exc()))
+    self.closed("Fatal error: %s\n%s" % (message or "", traceback.format_exc()))
 
   def reader(self):
     try:
@@ -97,13 +98,13 @@
     except:
       self.fatal()
 
-  def close(self, reason):
+  def closed(self, reason):
     # We must close the delegate first because closing channels
     # may wake up waiting threads and we don't want them to see
     # the delegate as open.
-    self.delegate.close(reason)
+    self.delegate.closed(reason)
     for ch in self.channels.values():
-      ch.close(reason)
+      ch.closed(reason)
 
   def writer(self):
     try:
@@ -112,7 +113,7 @@
           message = self.outgoing.get()
           self.conn.write(message)
         except socket.error, e:
-          self.close(e)
+          self.closed(e)
           break
         self.conn.flush()
     except:
@@ -131,7 +132,7 @@
 
         self.delegate(channel, Message(channel, frame, content))
     except QueueClosed:
-      self.close("worker closed")
+      self.closed("worker closed")
     except:
       self.fatal()
 
@@ -181,7 +182,7 @@
     self.incoming = Queue(0)
     self.responses = Queue(0)
     self.queue = None
-    self.closed = False
+    self._closed = False
     self.reason = None
 
     self.requester = Requester(self.write)
@@ -200,10 +201,10 @@
     self.use_execution_layer = (spec.major == 0 and spec.minor == 10)
     self.synchronous = True
 
-  def close(self, reason):
-    if self.closed:
+  def closed(self, reason):
+    if self._closed:
       return
-    self.closed = True
+    self._closed = True
     self.reason = reason
     self.incoming.close()
     self.responses.close()
@@ -213,7 +214,7 @@
       f.put_response(self, reason)
 
   def write(self, frame, content = None):
-    if self.closed:
+    if self._closed:
       raise Closed(self.reason)
     frame.channel = self.id
     self.outgoing.put(frame)
@@ -283,7 +284,7 @@
       if self.use_execution_layer and frame.method_type.is_l4_command():
         self.execution_sync()
         self.completion.wait()
-        if self.closed:
+        if self._closed:
           raise Closed(self.reason)
       return None
     try:
@@ -293,7 +294,7 @@
       else:
         return Message(self, resp)
     except QueueClosed, e:
-      if self.closed:
+      if self._closed:
         raise Closed(self.reason)
       else:
         raise e
@@ -328,7 +329,7 @@
       elif frame.method.result:
         if self.synchronous:
           fr = future.get_response(timeout=10)
-          if self.closed:
+          if self._closed:
             raise Closed(self.reason)
           return fr
         else:
@@ -337,12 +338,12 @@
                and self.use_execution_layer and frame.method.is_l4_command():
         self.execution_sync()
         completed = self.completion.wait(timeout=10)
-        if self.closed:
+        if self._closed:
           raise Closed(self.reason)
         if not completed:
-          self.close("Timed-out waiting for completion of %s" % frame)
+          self.closed("Timed-out waiting for completion of %s" % frame)
     except QueueClosed, e:
-      if self.closed:
+      if self._closed:
         raise Closed(self.reason)
       else:
         raise e
@@ -399,7 +400,7 @@
     self.sequence = Sequence(0) #issues ids for outgoing commands
     self.command_id = -1        #last issued id
     self.mark = -1              #commands up to this mark are known to be complete
-    self.closed = False
+    self._closed = False
 
   def next_command(self, method):
     #the following test is a hack until the track/sub-channel is available
@@ -413,7 +414,7 @@
     self.reset()
     self.condition.acquire()
     try:
-      self.closed = True
+      self._closed = True
       self.condition.notifyAll()
     finally:
       self.condition.release()
@@ -433,10 +434,10 @@
     remaining = timeout
     self.condition.acquire()
     try:
-      while not self.closed and point_of_interest > self.mark:
+      while not self._closed and point_of_interest > self.mark:
         #print "waiting for %s, mark = %s [%s]" % (point_of_interest, self.mark, self)
         self.condition.wait(remaining)
-        if not self.closed and point_of_interest > self.mark and timeout:
+        if not self._closed and point_of_interest > self.mark and timeout:
           if (start_time + timeout) < time(): break
           else: remaining = timeout - (time() - start_time)
     finally: