You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/03/07 17:57:46 UTC

svn commit: r634744 - in /incubator/qpid/trunk/qpid/python: cpp_failing_0-10.txt hello-010-world qpid/connection010.py qpid/datatypes.py qpid/delegates.py qpid/exceptions.py qpid/session.py run-tests server010 tests/connection010.py tests/datatypes.py

Author: rhs
Date: Fri Mar  7 08:57:43 2008
New Revision: 634744

URL: http://svn.apache.org/viewvc?rev=634744&view=rev
Log:
added session.sync(); session.auto_sync; made transfers not auto-complete; fixed bug in RangedSet

Added:
    incubator/qpid/trunk/qpid/python/qpid/exceptions.py   (with props)
Modified:
    incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
    incubator/qpid/trunk/qpid/python/hello-010-world
    incubator/qpid/trunk/qpid/python/qpid/connection010.py
    incubator/qpid/trunk/qpid/python/qpid/datatypes.py
    incubator/qpid/trunk/qpid/python/qpid/delegates.py
    incubator/qpid/trunk/qpid/python/qpid/session.py
    incubator/qpid/trunk/qpid/python/run-tests
    incubator/qpid/trunk/qpid/python/server010
    incubator/qpid/trunk/qpid/python/tests/connection010.py
    incubator/qpid/trunk/qpid/python/tests/datatypes.py

Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=634744&r1=634743&r2=634744&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Fri Mar  7 08:57:43 2008
@@ -63,3 +63,5 @@
 tests_0-10.queue.QueueTests.test_purge
 tests_0-10.queue.QueueTests.test_bind
 tests_0-10.queue.QueueTests.test_unbind_headers
+tests_0-10.exchange.RecommendedTypesRuleTests.testTopic
+tests_0-10.exchange.RequiredInstancesRuleTests.testAmqTopic

Modified: incubator/qpid/trunk/qpid/python/hello-010-world
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/hello-010-world?rev=634744&r1=634743&r2=634744&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/hello-010-world (original)
+++ incubator/qpid/trunk/qpid/python/hello-010-world Fri Mar  7 08:57:43 2008
@@ -38,6 +38,12 @@
 m4 = ssn.incoming("test").get(timeout=10)
 print m4
 
+print ssn.sender._completed, ssn.sender.next_id
+ssn.sync(10)
+print ssn.sender.segments
+
+ssn.channel.session_flush(completed=True)
+
 ssn.message_accept(RangedSet(m1.id, m2.id, m3.id, m4.id))
 
 print ssn.queue_query("testing")

Modified: incubator/qpid/trunk/qpid/python/qpid/connection010.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection010.py?rev=634744&r1=634743&r2=634744&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/connection010.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/connection010.py Fri Mar  7 08:57:43 2008
@@ -26,9 +26,8 @@
 from session import Session
 from invoker import Invoker
 from spec010 import Control, Command
+from exceptions import *
 import delegates
-
-class Timeout(Exception): pass
 
 class ChannelBusy(Exception): pass
 

Modified: incubator/qpid/trunk/qpid/python/qpid/datatypes.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/datatypes.py?rev=634744&r1=634743&r2=634744&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/datatypes.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/datatypes.py Fri Mar  7 08:57:43 2008
@@ -67,10 +67,15 @@
     return self.lower <= n and n <= self.upper
 
   def touches(self, r):
+    # XXX
     return (self.lower - 1 in r or
             self.upper + 1 in r or
             r.lower - 1 in self or
-            r.upper + 1 in self)
+            r.upper + 1 in self or
+            self.lower in r or
+            self.upper in r or
+            r.lower in self or
+            r.upper in self)
 
   def span(self, r):
     return Range(min(self.lower, r.lower), max(self.upper, r.upper))

Modified: incubator/qpid/trunk/qpid/python/qpid/delegates.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/delegates.py?rev=634744&r1=634743&r2=634744&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/delegates.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/delegates.py Fri Mar  7 08:57:43 2008
@@ -20,6 +20,7 @@
 import connection010
 import session
 from util import notify
+from datatypes import RangedSet
 
 class Delegate:
 
@@ -78,6 +79,23 @@
     ssn = ch.session
     ssn.receiver.next_id = cp.command_id
     ssn.receiver.next_offset = cp.command_offset
+
+  def session_completed(self, ch, cmp):
+    ch.session.sender.completed(cmp.commands)
+    notify(ch.session.condition)
+
+  def session_flush(self, ch, f):
+    rcv = ch.session.receiver
+    if f.expected:
+      if rcv.next_id == None:
+        exp = None
+      else:
+        exp = RangedSet(rcv.next_id)
+      ch.session_expected(exp)
+    if f.confirmed:
+      ch.session_confirmed(rcv._completed)
+    if f.completed:
+      ch.session_completed(rcv._completed)
 
 class Server(Delegate):
 

Added: incubator/qpid/trunk/qpid/python/qpid/exceptions.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/exceptions.py?rev=634744&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/exceptions.py (added)
+++ incubator/qpid/trunk/qpid/python/qpid/exceptions.py Fri Mar  7 08:57:43 2008
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class Timeout(Exception): pass

Propchange: incubator/qpid/trunk/qpid/python/qpid/exceptions.py
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/python/qpid/session.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/session.py?rev=634744&r1=634743&r2=634744&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/session.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/session.py Fri Mar  7 08:57:43 2008
@@ -17,14 +17,15 @@
 # under the License.
 #
 
-from threading import Condition, RLock
+from threading import Condition, RLock, currentThread
 from invoker import Invoker
 from datatypes import RangedSet, Struct, Future
 from codec010 import StringCodec
 from assembler import Segment
 from queue import Queue
 from datatypes import Message
-from util import wait
+from util import wait, notify
+from exceptions import *
 from logging import getLogger
 
 class SessionDetached(Exception): pass
@@ -37,12 +38,14 @@
 
 class SessionException(Exception): pass
 
+INCOMPLETE = object()
+
 class Session(Invoker):
 
-  def __init__(self, name, spec, sync=True, timeout=10, delegate=client):
+  def __init__(self, name, spec, auto_sync=True, timeout=10, delegate=client):
     self.name = name
     self.spec = spec
-    self.sync = sync
+    self.auto_sync = auto_sync
     self.timeout = timeout
     self.channel = None
 
@@ -72,9 +75,29 @@
     finally:
       self.lock.release()
 
+  def error(self):
+    exc = self.exceptions[:]
+    if len(exc) == 1:
+      return exc[0]
+    else:
+      return tuple(exc)
+
+  def sync(self, timeout=None):
+    if currentThread() == self.channel.connection.thread:
+      raise SessionException("deadlock detected")
+    self.channel.session_flush(completed=True)
+    last = self.sender.next_id - 1
+    if not wait(self.condition, lambda:
+                  last in self.sender._completed or self.exceptions,
+                timeout):
+      raise Timeout()
+    if self.exceptions:
+      raise SessionException(self.error())
+
   def close(self, timeout=None):
     self.channel.session_detach(self.name)
-    wait(self.condition, lambda: self.channel is None, timeout)
+    if not wait(self.condition, lambda: self.channel is None, timeout):
+      raise Timeout()
 
   def resolve_method(self, name):
     cmd = self.spec.instructions.get(name)
@@ -132,10 +155,12 @@
         self.send(seg)
 
     if type.result:
-      if self.sync:
+      if self.auto_sync:
         return result.get(self.timeout)
       else:
         return result
+    elif self.auto_sync:
+      self.sync(self.timeout)
 
   def received(self, seg):
     self.receiver.received(seg)
@@ -148,6 +173,7 @@
       self.assembly = None
 
   def dispatch(self, assembly):
+    segments = assembly[:]
     cmd = assembly.pop(0).decode(self.spec)
     args = []
 
@@ -168,8 +194,9 @@
     if cmd.type.result:
       self.execution_result(cmd.id, result)
 
-    for seg in assembly:
-      self.receiver.completed(seg)
+    if result is not INCOMPLETE:
+      for seg in segments:
+        self.receiver.completed(seg)
 
   def send(self, seg):
     self.sender.send(seg)
@@ -212,6 +239,7 @@
     self.next_id = 0
     self.next_offset = 0
     self.segments = []
+    self._completed = RangedSet()
 
   def send(self, seg):
     seg.id = self.next_id
@@ -235,6 +263,8 @@
         del self.segments[idx]
       else:
         idx += 1
+    for range in commands.ranges:
+      self._completed.add(range.lower, range.upper)
 
 class Delegate:
 
@@ -249,17 +279,14 @@
     self.session.lock.acquire()
     try:
       self.session.exceptions.append(ex)
-      excs = self.session.exceptions[:]
-      if len(excs) == 1:
-        error = excs[0]
-      else:
-        error = tuple(excs)
+      error = self.session.error()
       for id in self.session.results:
         f = self.session.results.pop(id)
         f.error(error)
 
       for q in self.session._incoming.values():
         q.close(error)
+      notify(self.session.condition)
     finally:
       self.session.lock.release()
 
@@ -274,3 +301,4 @@
     messages = self.session.incoming(cmd.destination)
     messages.put(m)
     msg.debug("RECV: %s", m)
+    return INCOMPLETE

Modified: incubator/qpid/trunk/qpid/python/run-tests
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/run-tests?rev=634744&r1=634743&r2=634744&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/run-tests (original)
+++ incubator/qpid/trunk/qpid/python/run-tests Fri Mar  7 08:57:43 2008
@@ -18,8 +18,16 @@
 # under the License.
 #
 
-import sys
+import sys, logging
 from qpid.testlib import testrunner
+
+if "-v" in sys.argv:
+  level = logging.DEBUG
+else:
+  level = logging.WARN
+
+format = "%(asctime)s %(name)-12s %(levelname)-8s %(message)s"
+logging.basicConfig(level=level, format=format, datefmt='%H:%M:%S')
 
 if not testrunner.run(): sys.exit(1)
 

Modified: incubator/qpid/trunk/qpid/python/server010
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/server010?rev=634744&r1=634743&r2=634744&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/server010 (original)
+++ incubator/qpid/trunk/qpid/python/server010 Fri Mar  7 08:57:43 2008
@@ -7,6 +7,16 @@
 from qpid.session import Client
 from qpid.datatypes import Message
 
+import logging, sys
+
+if "-v" in sys.argv:
+  level = logging.DEBUG
+else:
+  level = logging.WARN
+
+format = "%(asctime)s %(name)-12s %(levelname)-8s %(message)s"
+logging.basicConfig(level=level, format=format, datefmt='%H:%M:%S')
+
 spec = load("../specs/amqp.0-10.xml")
 
 class Server:
@@ -15,6 +25,7 @@
     return delegates.Server(connection, self.session)
 
   def session(self, session):
+    session.auto_sync = False
     return SessionDelegate(session)
 
 class SessionDelegate(Client):

Modified: incubator/qpid/trunk/qpid/python/tests/connection010.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/connection010.py?rev=634744&r1=634743&r2=634744&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/connection010.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/connection010.py Fri Mar  7 08:57:43 2008
@@ -39,6 +39,7 @@
     return Server(connection, delegate=self.session)
 
   def session(self, session):
+    session.auto_sync = False
     return TestSession(session, self.queue)
 
 class TestSession(Delegate):

Modified: incubator/qpid/trunk/qpid/python/tests/datatypes.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/datatypes.py?rev=634744&r1=634743&r2=634744&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/datatypes.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/datatypes.py Fri Mar  7 08:57:43 2008
@@ -89,3 +89,14 @@
     assert 21 not in rs
     assert 20 in rs
     self.check(rs.ranges)
+
+  def testAddSelf(self):
+    a = RangedSet()
+    a.add(0, 8)
+    self.check(a.ranges)
+    a.add(0, 8)
+    self.check(a.ranges)
+    assert len(a.ranges) == 1
+    range = a.ranges[0]
+    assert range.lower == 0
+    assert range.upper == 8