You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC

svn commit: r1187150 [42/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java (original)
+++ qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java Fri Oct 21 01:19:00 2011
@@ -20,113 +20,36 @@
  */
 package org.apache.qpid.tools;
 
-import java.net.InetAddress;
 import java.text.DecimalFormat;
-import java.util.UUID;
+import java.util.Hashtable;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
 
 import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession_0_10;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.messaging.Address;
 
 public class PerfBase
 {
-    public final static String CODE = "CODE";
-    public final static String ID = "ID";
-    public final static String REPLY_ADDR = "REPLY_ADDR";
-    public final static String MAX_LATENCY = "MAX_LATENCY";
-    public final static String MIN_LATENCY = "MIN_LATENCY";
-    public final static String AVG_LATENCY = "AVG_LATENCY";
-    public final static String STD_DEV = "STD_DEV";
-    public final static String CONS_RATE = "CONS_RATE";
-    public final static String PROD_RATE = "PROD_RATE";
-    public final static String MSG_COUNT = "MSG_COUNT";
-    public final static String TIMESTAMP = "Timestamp";
-
-    String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}");
-
     TestParams params;
     Connection con;
     Session session;
-    Session controllerSession;
     Destination dest;
-    Destination myControlQueue;
-    Destination controllerQueue;
+    Destination feedbackDest;
     DecimalFormat df = new DecimalFormat("###.##");
-    String id;
-    String myControlQueueAddr;
-
-    MessageProducer sendToController;
-    MessageConsumer receiveFromController;
-    String prefix = "";
-
-    enum OPCode {
-        REGISTER_CONSUMER, REGISTER_PRODUCER,
-        PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP,
-        CONSUMER_READY, PRODUCER_READY,
-        PRODUCER_START,
-        RECEIVED_END_MSG, CONSUMER_STOP,
-        RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS,
-        CONTINUE_TEST, STOP_TEST
-    };
-
-    enum MessageType {
-        BYTES, TEXT, MAP, OBJECT;
-
-        public static MessageType getType(String s) throws Exception
-        {
-            if ("text".equalsIgnoreCase(s))
-            {
-                return TEXT;
-            }
-            else if ("bytes".equalsIgnoreCase(s))
-            {
-                return BYTES;
-            }
-            /*else if ("map".equalsIgnoreCase(s))
-            {
-                return MAP;
-            }
-            else if ("object".equalsIgnoreCase(s))
-            {
-                return OBJECT;
-            }*/
-            else
-            {
-                throw new Exception("Unsupported message type");
-            }
-        }
-    };
-
-    MessageType msgType = MessageType.BYTES;
 
-    public PerfBase(String prefix)
+    public PerfBase()
     {
         params = new TestParams();
-        String host = "";
-        try
-        {
-            host = InetAddress.getLocalHost().getHostName();
-        }
-        catch (Exception e)
-        {
-        }
-        id = host + "-" + UUID.randomUUID().toString();
-        this.prefix = prefix;
-        this.myControlQueueAddr = id + ";{create: always}";
     }
 
     public void setUp() throws Exception
-    {
+    {        
+
         if (params.getHost().equals("") || params.getPort() == -1)
         {
             con = new AMQConnection(params.getUrl());
@@ -139,78 +62,7 @@ public class PerfBase
         session = con.createSession(params.isTransacted(),
                                     params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
 
-        controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-        dest = createDestination();
-        controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR);
-        myControlQueue = session.createQueue(myControlQueueAddr);
-        msgType = MessageType.getType(params.getMessageType());
-        System.out.println("Using " + msgType + " messages");
-
-        sendToController = controllerSession.createProducer(controllerQueue);
-        receiveFromController = controllerSession.createConsumer(myControlQueue);
-    }
-
-    private Destination createDestination() throws Exception
-    {
-        if (params.isUseUniqueDests())
-        {
-            System.out.println("Prefix : " + prefix);
-            Address addr = Address.parse(params.getAddress());
-            AMQAnyDestination temp = new AMQAnyDestination(params.getAddress());
-            int type = ((AMQSession_0_10)session).resolveAddressType(temp);
-
-            if ( type == AMQDestination.TOPIC_TYPE)
-            {
-                addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions());
-                System.out.println("Setting subject : " + addr);
-            }
-            else
-            {
-                addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions());
-                System.out.println("Setting name : " + addr);
-            }
-
-            return new AMQAnyDestination(addr);
-        }
-        else
-        {
-            return new AMQAnyDestination(params.getAddress());
-        }
-    }
-
-    public synchronized void sendMessageToController(MapMessage m) throws Exception
-    {
-        m.setString(ID, id);
-        m.setString(REPLY_ADDR,myControlQueueAddr);
-        sendToController.send(m);
-    }
-
-    public void receiveFromController(OPCode expected) throws Exception
-    {
-        MapMessage m = (MapMessage)receiveFromController.receive();
-        OPCode code = OPCode.values()[m.getInt(CODE)];
-        System.out.println("Received Code : " + code);
-        if (expected != code)
-        {
-            throw new Exception("Expected OPCode : " + expected + " but received : " + code);
-        }
-
-    }
-
-    public boolean continueTest() throws Exception
-    {
-        MapMessage m = (MapMessage)receiveFromController.receive();
-        OPCode code = OPCode.values()[m.getInt(CODE)];
-        System.out.println("Received Code : " + code);
-        return (code == OPCode.CONTINUE_TEST);
-    }
-
-    public void tearDown() throws Exception
-    {
-        session.close();
-        controllerSession.close();
-        con.close();
+        dest = new AMQAnyDestination(params.getAddress());
     }
 
     public void handleError(Exception e,String msg)

Modified: qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java (original)
+++ qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java Fri Oct 21 01:19:00 2011
@@ -20,17 +20,13 @@
  */
 package org.apache.qpid.tools;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.MapMessage;
+import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
 import javax.jms.TextMessage;
 
-import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.thread.Threading;
 
 /**
@@ -51,7 +47,7 @@ import org.apache.qpid.thread.Threading;
  * b) They are on separate machines that have their time synced via a Time Server
  *
  * In order to calculate latency the producer inserts a timestamp
- * when the message is sent. The consumer will note the current time the message is
+ * hen the message is sent. The consumer will note the current time the message is
  * received and will calculate the latency as follows
  * latency = rcvdTime - msg.getJMSTimestamp()
  *
@@ -59,9 +55,13 @@ import org.apache.qpid.thread.Threading;
  * variance in latencies.
  *
  * Avg latency is measured by adding all latencies and dividing by the total msgs.
+ * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount
  *
  * Throughput
  * ===========
+ * System throughput is calculated as follows
+ * rcvdMsgCount/(rcvdTime - testStartTime)
+ *
  * Consumer rate is calculated as
  * rcvdMsgCount/(rcvdTime - startTime)
  *
@@ -81,160 +81,130 @@ public class PerfConsumer extends PerfBa
     long minLatency = Long.MAX_VALUE;
     long totalLatency = 0;  // to calculate avg latency.
     int rcvdMsgCount = 0;
+    long testStartTime = 0; // to measure system throughput
     long startTime = 0;     // to measure consumer throughput
     long rcvdTime = 0;
     boolean transacted = false;
     int transSize = 0;
 
-    boolean printStdDev = false;
-    List<Long> sample;
-
     final Object lock = new Object();
 
-    public PerfConsumer(String prefix)
+    public PerfConsumer()
     {
-        super(prefix);
-        System.out.println("Consumer ID : " + id);
+        super();
     }
 
     public void setUp() throws Exception
     {
         super.setUp();
         consumer = session.createConsumer(dest);
-        System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n");
 
         // Storing the following two for efficiency
         transacted = params.isTransacted();
         transSize = params.getTransactionSize();
-        printStdDev = params.isPrintStdDev();
-        MapMessage m = controllerSession.createMapMessage();
-        m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
-        sendMessageToController(m);
     }
 
     public void warmup()throws Exception
     {
-        receiveFromController(OPCode.CONSUMER_STARTWARMUP);
-        Message msg = consumer.receive();
-        // This is to ensure we drain the queue before we start the actual test.
-        while ( msg != null)
+        System.out.println("Warming up......");
+
+        boolean start = false;
+        while (!start)
         {
-            if (msg.getBooleanProperty("End") == true)
+            Message msg = consumer.receive();
+            if (msg instanceof TextMessage)
             {
-                // It's more realistic for the consumer to signal this.
-                MapMessage m = controllerSession.createMapMessage();
-                m.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
-                sendMessageToController(m);
+                if (((TextMessage)msg).getText().equals("End"))
+                {
+                    start = true;
+                    MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
+                    temp.send(session.createMessage());
+                    if (params.isTransacted())
+                    {
+                        session.commit();
+                    }
+                    temp.close();
+                }
             }
-            msg = consumer.receive(1000);
-        }
-
-        if (params.isTransacted())
-        {
-            session.commit();
         }
-
-        MapMessage m = controllerSession.createMapMessage();
-        m.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
-        sendMessageToController(m);
-        consumer.setMessageListener(this);
     }
 
     public void startTest() throws Exception
     {
-        System.out.println("Consumer: " + id + " Starting test......" + "\n");
-        resetCounters();
+        System.out.println("Starting test......");
+        consumer.setMessageListener(this);
     }
 
-    public void resetCounters()
+    public void printResults() throws Exception
     {
-        rcvdMsgCount = 0;
-        maxLatency = 0;
-        minLatency = Long.MAX_VALUE;
-        totalLatency = 0;
-        if (printStdDev)
+        synchronized (lock)
         {
-            sample = null;
-            sample = new ArrayList<Long>(params.getMsgCount());
+            lock.wait();
         }
-    }
-
-    public void sendResults() throws Exception
-    {
-        receiveFromController(OPCode.CONSUMER_STOP);
 
         double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
-        double consRate   = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime);
-        double stdDev = 0.0;
-        if (printStdDev)
-        {
-            stdDev = calculateStdDev(avgLatency);
-        }
-        MapMessage m  = controllerSession.createMapMessage();
-        m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal());
-        m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs());
-        m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs());
-        m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs());
-        m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs());
-        m.setDouble(CONS_RATE, consRate);
-        m.setLong(MSG_COUNT, rcvdMsgCount);
-        sendMessageToController(m);
-
+        double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000;
+        double consRate   = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000;
         System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
         System.out.println(new StringBuilder("Consumer rate       : ").
                            append(df.format(consRate)).
                            append(" msg/sec").toString());
+        System.out.println(new StringBuilder("System Throughput   : ").
+                           append(df.format(throughput)).
+                           append(" msg/sec").toString());
         System.out.println(new StringBuilder("Avg Latency         : ").
-                           append(df.format(avgLatency/Clock.convertToMiliSecs())).
+                           append(df.format(avgLatency)).
                            append(" ms").toString());
         System.out.println(new StringBuilder("Min Latency         : ").
-                           append(df.format(minLatency/Clock.convertToMiliSecs())).
+                           append(minLatency).
                            append(" ms").toString());
         System.out.println(new StringBuilder("Max Latency         : ").
-                           append(df.format(maxLatency/Clock.convertToMiliSecs())).
+                           append(maxLatency).
                            append(" ms").toString());
-        if (printStdDev)
-        {
-            System.out.println(new StringBuilder("Std Dev             : ").
-                               append(stdDev/Clock.convertToMiliSecs()).toString());
-        }
+        System.out.println("Completed the test......\n");
     }
 
-    public double calculateStdDev(double mean)
+    public void notifyCompletion(Destination replyTo) throws Exception
     {
-        double v = 0;
-        for (double latency: sample)
+        MessageProducer tmp = session.createProducer(replyTo);
+        Message endMsg = session.createMessage();
+        tmp.send(endMsg);
+        if (params.isTransacted())
         {
-            v = v + Math.pow((latency-mean), 2);
+            session.commit();
         }
-        v = v/sample.size();
-        return Math.round(Math.sqrt(v));
+        tmp.close();
+    }
+
+    public void tearDown() throws Exception
+    {
+        consumer.close();
+        session.close();
+        con.close();
     }
 
     public void onMessage(Message msg)
     {
         try
         {
-            // To figure out the decoding overhead of text
-            if (msgType == MessageType.TEXT)
+            if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
             {
-                ((TextMessage)msg).getText();
-            }
+                notifyCompletion(msg.getJMSReplyTo());
 
-            if (msg.getBooleanProperty("End"))
-            {
-                MapMessage m = controllerSession.createMapMessage();
-                m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
-                sendMessageToController(m);
+                synchronized (lock)
+                {
+                   lock.notifyAll();
+                }
             }
             else
             {
-                rcvdTime = Clock.getTime();
+                rcvdTime = System.currentTimeMillis();
                 rcvdMsgCount ++;
 
                 if (rcvdMsgCount == 1)
                 {
                     startTime = rcvdTime;
+                    testStartTime = msg.getJMSTimestamp();
                 }
 
                 if (transacted && (rcvdMsgCount % transSize == 0))
@@ -242,14 +212,10 @@ public class PerfConsumer extends PerfBa
                     session.commit();
                 }
 
-                long latency = rcvdTime - msg.getLongProperty(TIMESTAMP);
+                long latency = rcvdTime - msg.getJMSTimestamp();
                 maxLatency = Math.max(maxLatency, latency);
                 minLatency = Math.min(minLatency, latency);
                 totalLatency = totalLatency + latency;
-                if (printStdDev)
-                {
-                    sample.add(latency);
-                }
             }
 
         }
@@ -260,21 +226,14 @@ public class PerfConsumer extends PerfBa
 
     }
 
-    public void run()
+    public void test()
     {
         try
         {
             setUp();
             warmup();
-            boolean nextIteration = true;
-            while (nextIteration)
-            {
-                System.out.println("=========================================================\n");
-                System.out.println("Consumer: " + id + " starting a new iteration ......\n");
-                startTest();
-                sendResults();
-                nextIteration = continueTest();
-            }
+            startTest();
+            printResults();
             tearDown();
         }
         catch(Exception e)
@@ -283,43 +242,26 @@ public class PerfConsumer extends PerfBa
         }
     }
 
-        @Override
-    public void tearDown() throws Exception
-    {
-        super.tearDown();
-    }
-
-    public static void main(String[] args) throws InterruptedException
+    public static void main(String[] args)
     {
-        String scriptId = (args.length == 1) ? args[0] : "";
-        int conCount = Integer.getInteger("con_count",1);
-        final CountDownLatch testCompleted = new CountDownLatch(conCount);
-        for (int i=0; i < conCount; i++)
+        final PerfConsumer cons = new PerfConsumer();
+        Runnable r = new Runnable()
         {
-
-            final PerfConsumer cons = new PerfConsumer(scriptId + i);
-            Runnable r = new Runnable()
-            {
-                public void run()
-                {
-                    cons.run();
-                    testCompleted.countDown();
-                }
-            };
-
-            Thread t;
-            try
-            {
-                t = Threading.getThreadFactory().createThread(r);
-            }
-            catch(Exception e)
+            public void run()
             {
-                throw new Error("Error creating consumer thread",e);
+                cons.test();
             }
-            t.start();
-
+        };
+        
+        Thread t;
+        try
+        {
+            t = Threading.getThreadFactory().createThread(r);                      
+        }
+        catch(Exception e)
+        {
+            throw new Error("Error creating consumer thread",e);
         }
-        testCompleted.await();
-        System.out.println("Consumers have completed the test......\n");
+        t.start(); 
     }
 }
\ No newline at end of file

Modified: qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java (original)
+++ qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java Fri Oct 21 01:19:00 2011
@@ -23,15 +23,13 @@ package org.apache.qpid.tools;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
-import java.util.concurrent.CountDownLatch;
 
 import javax.jms.BytesMessage;
 import javax.jms.DeliveryMode;
-import javax.jms.MapMessage;
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 
-import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.thread.Threading;
 
 /**
@@ -53,52 +51,38 @@ import org.apache.qpid.thread.Threading;
  * System throughput and latencies calculated by the PerfConsumer are more realistic
  * numbers.
  *
- * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs
- * I have done so far, it seems quite useful to compute the producer rate as it gives an
- * indication of how the system behaves. For ex if there is a gap between producer and consumer rates
- * you could clearly see the higher latencies and when producer and consumer rates are very close,
- * latency is good.
- *
  */
 public class PerfProducer extends PerfBase
 {
-    private static long SEC = 60000;
-
     MessageProducer producer;
     Message msg;
-    Object payload;
-    List<Object> payloads;
+    byte[] payload;
+    List<byte[]> payloads;
     boolean cacheMsg = false;
     boolean randomMsgSize = false;
     boolean durable = false;
     Random random;
     int msgSizeRange = 1024;
-    boolean rateLimitProducer = false;
-    double rateFactor = 0.4;
-    double rate = 0.0;
-
-    public PerfProducer(String prefix)
+    
+    public PerfProducer()
     {
-        super(prefix);
-        System.out.println("Producer ID : " + id);
+        super();
     }
 
     public void setUp() throws Exception
     {
         super.setUp();
-        durable = params.isDurable();
-        rateLimitProducer = params.getRate() > 0 ? true : false;
-        if (rateLimitProducer)
-        {
-            System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec");
-        }
+        feedbackDest = session.createTemporaryQueue();
 
+        durable = params.isDurable();
+        
         // if message caching is enabled we pre create the message
         // else we pre create the payload
         if (params.isCacheMessage())
         {
             cacheMsg = true;
-            msg = createMessage(createPayload(params.getMsgSize()));
+            
+            msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
             msg.setJMSDeliveryMode(durable?
                                    DeliveryMode.PERSISTENT :
                                    DeliveryMode.NON_PERSISTENT
@@ -109,52 +93,21 @@ public class PerfProducer extends PerfBa
             random = new Random(20080921);
             randomMsgSize = true;
             msgSizeRange = params.getMsgSize();
-            payloads = new ArrayList<Object>(msgSizeRange);
-
+            payloads = new ArrayList<byte[]>(msgSizeRange);
+            
             for (int i=0; i < msgSizeRange; i++)
             {
-                payloads.add(createPayload(i));
+                payloads.add(MessageFactory.createMessagePayload(i).getBytes());
             }
-        }
+        }        
         else
         {
-            payload = createPayload(params.getMsgSize());
+            payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
         }
 
         producer = session.createProducer(dest);
-        System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName());
         producer.setDisableMessageID(params.isDisableMessageID());
         producer.setDisableMessageTimestamp(params.isDisableTimestamp());
-
-        MapMessage m = controllerSession.createMapMessage();
-        m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
-        sendMessageToController(m);
-    }
-
-    Object createPayload(int size)
-    {
-        if (msgType == MessageType.TEXT)
-        {
-           return MessageFactory.createMessagePayload(size);
-        }
-        else
-        {
-            return MessageFactory.createMessagePayload(size).getBytes();
-        }
-    }
-
-    Message createMessage(Object payload) throws Exception
-    {
-        if (msgType == MessageType.TEXT)
-        {
-            return session.createTextMessage((String)payload);
-        }
-        else
-        {
-            BytesMessage m = session.createBytesMessage();
-            m.writeBytes((byte[])payload);
-            return m;
-        }
     }
 
     protected Message getNextMessage() throws Exception
@@ -164,130 +117,117 @@ public class PerfProducer extends PerfBa
             return msg;
         }
         else
-        {
-            Message m;
-
+        {            
+            msg = session.createBytesMessage();
+            
             if (!randomMsgSize)
             {
-                m = createMessage(payload);
+                ((BytesMessage)msg).writeBytes(payload);
             }
             else
             {
-                m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
+                ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange)));
             }
-            m.setJMSDeliveryMode(durable?
+            msg.setJMSDeliveryMode(durable?
                     DeliveryMode.PERSISTENT :
                     DeliveryMode.NON_PERSISTENT
                    );
-            return m;
+            return msg;
         }
     }
 
     public void warmup()throws Exception
     {
-        receiveFromController(OPCode.PRODUCER_STARTWARMUP);
-        System.out.println("Producer: " + id + " Warming up......");
+        System.out.println("Warming up......");
+        MessageConsumer tmp = session.createConsumer(feedbackDest);
 
         for (int i=0; i < params.getWarmupCount() -1; i++)
         {
             producer.send(getNextMessage());
         }
-        sendEndMessage();
+        Message msg = session.createTextMessage("End");
+        msg.setJMSReplyTo(feedbackDest);
+        producer.send(msg);
+
+        if (params.isTransacted())
+        {
+            session.commit();
+        }
+
+        tmp.receive();
 
         if (params.isTransacted())
         {
             session.commit();
         }
+
+        tmp.close();
     }
 
     public void startTest() throws Exception
     {
-        resetCounters();
-        receiveFromController(OPCode.PRODUCER_START);
+        System.out.println("Starting test......");
         int count = params.getMsgCount();
         boolean transacted = params.isTransacted();
         int tranSize =  params.getTransactionSize();
 
-        long limit = (long)(params.getRate() * rateFactor); // in msecs
-        long timeLimit = (long)(SEC * rateFactor); // in msecs
-
-        long start = Clock.getTime(); // defaults to nano secs
-        long interval = start;
+        long start = System.currentTimeMillis();
         for(int i=0; i < count; i++ )
         {
             Message msg = getNextMessage();
-            msg.setLongProperty(TIMESTAMP, Clock.getTime());
+            msg.setJMSTimestamp(System.currentTimeMillis());
             producer.send(msg);
             if ( transacted && ((i+1) % tranSize == 0))
             {
                 session.commit();
             }
-
-            if (rateLimitProducer && i%limit == 0)
-            {
-                long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs
-                if (elapsed < timeLimit)
-                {
-                    Thread.sleep(elapsed);
-                }
-                interval = Clock.getTime();
-
-            }
-        }
-        sendEndMessage();
-        if ( transacted)
-        {
-            session.commit();
         }
-        long time = Clock.getTime() - start;
-        rate = (double)count*Clock.convertToSecs()/(double)time;
+        long time = System.currentTimeMillis() - start;
+        double rate = ((double)count/(double)time)*1000;
         System.out.println(new StringBuilder("Producer rate: ").
                                append(df.format(rate)).
                                append(" msg/sec").
                                toString());
     }
 
-    public void resetCounters()
+    public void waitForCompletion() throws Exception
     {
+        MessageConsumer tmp = session.createConsumer(feedbackDest);
+        Message msg = session.createTextMessage("End");
+        msg.setJMSReplyTo(feedbackDest);
+        producer.send(msg);
 
-    }
+        if (params.isTransacted())
+        {
+            session.commit();
+        }
 
-    public void sendEndMessage() throws Exception
-    {
-        Message msg = session.createMessage();
-        msg.setBooleanProperty("End", true);
-        producer.send(msg);
-    }
+        tmp.receive();
 
-    public void sendResults() throws Exception
-    {
-        MapMessage msg = controllerSession.createMapMessage();
-        msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal());
-        msg.setDouble(PROD_RATE, rate);
-        sendMessageToController(msg);
+        if (params.isTransacted())
+        {
+            session.commit();
+        }
+
+        tmp.close();
+        System.out.println("Consumer has completed the test......");
     }
 
-    @Override
     public void tearDown() throws Exception
     {
-        super.tearDown();
+        producer.close();
+        session.close();
+        con.close();
     }
 
-    public void run()
+    public void test()
     {
         try
         {
             setUp();
             warmup();
-            boolean nextIteration = true;
-            while (nextIteration)
-            {
-                System.out.println("=========================================================\n");
-                System.out.println("Producer: " + id + " starting a new iteration ......\n");
-                startTest();
-                sendResults();
-                nextIteration = continueTest();
-            }
+            startTest();
+            waitForCompletion();
             tearDown();
         }
         catch(Exception e)
@@ -296,63 +236,27 @@ public class PerfProducer extends PerfBa
         }
     }
 
-    public void startControllerIfNeeded()
+
+    public static void main(String[] args)
     {
-        if (!params.isExternalController())
+        final PerfProducer prod = new PerfProducer();
+        Runnable r = new Runnable()
         {
-            final PerfTestController controller = new PerfTestController();
-            Runnable r = new Runnable()
-            {
-                public void run()
-                {
-                    controller.run();
-                }
-            };
-
-            Thread t;
-            try
-            {
-                t = Threading.getThreadFactory().createThread(r);
-            }
-            catch(Exception e)
+            public void run()
             {
-                throw new Error("Error creating controller thread",e);
+                prod.test();
             }
-            t.start();
+        };
+        
+        Thread t;
+        try
+        {
+            t = Threading.getThreadFactory().createThread(r);                      
         }
-    }
-
-
-    public static void main(String[] args) throws InterruptedException
-    {
-        String scriptId = (args.length == 1) ? args[0] : "";
-        int conCount = Integer.getInteger("con_count",1);
-        final CountDownLatch testCompleted = new CountDownLatch(conCount);
-        for (int i=0; i < conCount; i++)
-        {
-            final PerfProducer prod = new PerfProducer(scriptId + i);
-            prod.startControllerIfNeeded();
-            Runnable r = new Runnable()
-            {
-                public void run()
-                {
-                    prod.run();
-                    testCompleted.countDown();
-                }
-            };
-
-            Thread t;
-            try
-            {
-                t = Threading.getThreadFactory().createThread(r);
-            }
-            catch(Exception e)
-            {
-                throw new Error("Error creating producer thread",e);
-            }
-            t.start();
+        catch(Exception e)
+        {
+            throw new Error("Error creating producer thread",e);
         }
-        testCompleted.await();
-        System.out.println("Producers have completed the test......");
+        t.start();            
     }
 }
\ No newline at end of file

Modified: qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java (original)
+++ qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java Fri Oct 21 01:19:00 2011
@@ -25,25 +25,25 @@ import javax.jms.Session;
 public class TestParams
 {
     /*
-     * By default the connection URL is used.
+     * By default the connection URL is used. 
      * This allows a user to easily specify a fully fledged URL any given property.
      * Ex. SSL parameters
-     *
+     *  
      * By providing a host & port allows a user to simply override the URL.
      * This allows to create multiple clients in test scripts easily,
-     * without having to deal with the long URL format.
+     * without having to deal with the long URL format. 
      */
     private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
-
+    
     private String host = "";
-
+    
     private int port = -1;
 
     private String address = "queue; {create : always}";
 
     private int msg_size = 1024;
 
-    private int random_msg_size_start_from = 1;
+    private int msg_type = 1;   // not used yet
 
     private boolean cacheMessage = false;
 
@@ -62,28 +62,19 @@ public class TestParams
     private int msg_count = 10;
 
     private int warmup_count = 1;
-
+    
     private boolean random_msg_size = false;
 
-    private String msgType = "bytes";
-
-    private boolean printStdDev = false;
-
-    private long rate = -1;
-
-    private boolean externalController = false;
-
-    private boolean useUniqueDest = false; // useful when using multiple connections.
-
     public TestParams()
     {
-
+     
         url = System.getProperty("url",url);
         host = System.getProperty("host","");
         port = Integer.getInteger("port", -1);
-        address = System.getProperty("address",address);
+        address = System.getProperty("address","queue");
 
         msg_size  = Integer.getInteger("msg_size", 1024);
+        msg_type = Integer.getInteger("msg_type",1);
         cacheMessage = Boolean.getBoolean("cache_msg");
         disableMessageID = Boolean.getBoolean("disableMessageID");
         disableTimestamp = Boolean.getBoolean("disableTimestamp");
@@ -94,12 +85,6 @@ public class TestParams
         msg_count = Integer.getInteger("msg_count",msg_count);
         warmup_count = Integer.getInteger("warmup_count",warmup_count);
         random_msg_size = Boolean.getBoolean("random_msg_size");
-        msgType = System.getProperty("msg_type","bytes");
-        printStdDev = Boolean.getBoolean("print_std_dev");
-        rate = Long.getLong("rate",-1);
-        externalController = Boolean.getBoolean("ext_controller");
-        useUniqueDest = Boolean.getBoolean("use_unique_dest");
-        random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1);
     }
 
     public String getUrl()
@@ -137,9 +122,9 @@ public class TestParams
         return msg_size;
     }
 
-    public int getRandomMsgSizeStartFrom()
+    public int getMsgType()
     {
-        return random_msg_size_start_from;
+        return msg_type;
     }
 
     public boolean isDurable()
@@ -176,39 +161,10 @@ public class TestParams
     {
         return disableTimestamp;
     }
-
+    
     public boolean isRandomMsgSize()
     {
         return random_msg_size;
     }
 
-    public String getMessageType()
-    {
-        return msgType;
-    }
-
-    public boolean isPrintStdDev()
-    {
-        return printStdDev;
-    }
-
-    public long getRate()
-    {
-        return rate;
-    }
-
-    public boolean isExternalController()
-    {
-        return externalController;
-    }
-
-    public void setAddress(String addr)
-    {
-        address = addr;
-    }
-
-    public boolean isUseUniqueDests()
-    {
-        return useUniqueDest;
-    }
 }

Propchange: qpid/branches/QPID-2519/packaging/windows/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 01:19:00 2011
@@ -1,7 +1,4 @@
-/qpid/branches/0.10/qpid/packaging/windows:1103083
 /qpid/branches/0.5.x-dev/qpid/packaging/windows:892761,894875
 /qpid/branches/0.6-release-windows-installer/packaging/windows:926803
-/qpid/branches/0.6-release-windows-installer/qpid/packaging/windows:926865,927233
+/qpid/branches/0.6-release-windows-installer/qpid/packaging/windows:926803,926865,927233
 /qpid/branches/java-network-refactor/qpid/packaging/windows:805429-825319
-/qpid/branches/qpid-2935/qpid/packaging/windows:1061302-1072333
-/qpid/trunk/qpid/packaging/windows:1072051-1185907

Modified: qpid/branches/QPID-2519/packaging/windows/INSTALL_NOTES.html
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/packaging/windows/INSTALL_NOTES.html?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/packaging/windows/INSTALL_NOTES.html (original)
+++ qpid/branches/QPID-2519/packaging/windows/INSTALL_NOTES.html Fri Oct 21 01:19:00 2011
@@ -1,11 +1,11 @@
 <html>
 <head>
-<title>Apache Qpid C++ 0.10 Installation Notes</title>
+<title>Apache Qpid C++ 0.9 Installation Notes</title>
 </head>
 <body>
-<H1>Apache Qpid C++ 0.10 Installation Notes</H1>
+<H1>Apache Qpid C++ 0.9 Installation Notes</H1>
 
-<p>Thank you for installing Apache Qpid version 0.10 for Windows.
+<p>Thank you for installing Apache Qpid version 0.9 for Windows.
 If the requisite features were installed, you can now run a broker,
 use the example programs, and design your own messaging programs while
 reading the Qpid C++ API reference documentation.</p>

Modified: qpid/branches/QPID-2519/packaging/windows/installer.proj
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/packaging/windows/installer.proj?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/packaging/windows/installer.proj (original)
+++ qpid/branches/QPID-2519/packaging/windows/installer.proj Fri Oct 21 01:19:00 2011
@@ -20,7 +20,7 @@
 <!--
  Packaging script for Apache Qpid on Windows
 
- Builds the C++, .NET, and WCF components, and packages those along with user
+ Builds the C++ and WCF components, and packages those along with user
  documentation and the python pieces needed to generate QMF stuff.
 -->
 
@@ -87,8 +87,7 @@
 
     <ItemGroup>
 	<WcfProjects Include="$(source_root)\wcf\src\Apache\Qpid\**\*.csproj"/>
-	<WcfArtifacts Include="$(source_root)\wcf\src\Apache\Qpid\Channel\bin\Release\Apache.Qpid.Channel.dll"/>
-	<WcfArtifacts Include="$(source_root)\wcf\src\Apache\Qpid\Channel\bin\Release\Apache.Qpid.Interop.dll"/>
+	<WcfArtifacts Include="$(source_root)\wcf\src\Apache\Qpid\Channel\bin\Release\*.dll"/>
 	<WcfExamples Include="$(source_root)\wcf\samples\**\*"
 	             Exclude="$(source_root)\wcf\samples\**\.svn\**"/>
     </ItemGroup>
@@ -218,9 +217,9 @@
 	    OutputFile="boost_dlls.wxs" />
 	<Candle
 	    ToolPath="$(WixToolPath)"
-	    DefineConstants="qpidc_version=0.13"
+	    DefineConstants="qpidc_version=0.9"
 	    InstallerPlatform="x64"
-	    OutputFile="qpidc-0.13-x64.msi" />
+	    OutputFile="qpidc-0.9-x64.msi" />
     -->
         <Exec
 	    Command="heat dir $(staging_dir)\include\qpid -var var.qpid_headers_dir -dr QpidInclude -gg -cg group_QpidHeaders -out qpid_headers.wxs" />
@@ -230,19 +229,13 @@
 	<Exec
 	    Command="heat dir $(staging_dir)\bin\boost -var var.boost_dll_dir -dr QpidBin -srd -gg -cg group_BoostDlls -sw5150 -out boost_dlls.wxs" />
 	<Exec
-	    Command="heat file $(staging_dir)\examples\README.txt -var var.examples_dir -dr INSTALLLOCATION -gg -cg group_Examples -out examples_README.wxs" />
-	<Exec
-	    Command="heat file $(staging_dir)\examples\examples.sln -var var.examples_dir -dr INSTALLLOCATION -gg -cg group_Examples -out examples_examples.wxs" />
-	<Exec
-	    Command="heat dir $(staging_dir)\examples\messaging -var var.examples_dir -dr INSTALLLOCATION -gg -cg group_Examples -out examples_messaging.wxs" />
-	<Exec
-	    Command="heat dir $(staging_dir)\examples\qmf-console -var var.examples_dir -dr INSTALLLOCATION -gg -cg group_Examples -out examples_qmf-console.wxs" />
+	    Command="heat dir $(staging_dir)\examples -var var.examples_dir -dr INSTALLLOCATION -gg -cg group_Examples -out examples.wxs" />
 	<Exec
 	    Command="heat dir $(staging_dir)\docs\api  -var var.api_docs_dir -dr QpidDoc -gg -cg group_APIDocs -out api_docs.wxs" />
 	<Exec
-	    Command="candle -dqpidc_version=0.13 -dProgramFiles=$(ProgramFiles) -dstaging_dir=$(staging_dir) -dqpid_headers_dir=$(staging_dir)\include\qpid -dboost_headers_dir=$(staging_dir)\include\boost -dboost_dll_dir=$(staging_dir)\bin\boost -dexamples_dir=$(staging_dir)\examples -dapi_docs_dir=$(staging_dir)\docs\api  -ext WiXNetFxExtension qpidc.wxs qpid_headers.wxs boost_headers.wxs boost_dlls.wxs examples.wxs api_docs.wxs -arch $(Architecture)" />
+	    Command="candle -dqpidc_version=0.9 -dProgramFiles=$(ProgramFiles) -dstaging_dir=$(staging_dir) -dqpid_headers_dir=$(staging_dir)\include\qpid -dboost_headers_dir=$(staging_dir)\include\boost -dboost_dll_dir=$(staging_dir)\bin\boost -dexamples_dir=$(staging_dir)\examples -dapi_docs_dir=$(staging_dir)\docs\api  -ext WiXNetFxExtension qpidc.wxs qpid_headers.wxs boost_headers.wxs boost_dlls.wxs examples.wxs api_docs.wxs -arch $(Architecture)" />
 	<Exec
-	    Command="light -ext WiXNetFxExtension -ext WixUtilExtension -ext WixUIExtension -cultures:en-us -out qpidc-0.13-$(Architecture).msi qpidc.wixobj qpid_headers.wixobj boost_headers.wixobj boost_dlls.wixobj examples_README.wixobj examples_examples.wixobj examples_messaging.wixobj examples_qmf-console.wixobj api_docs.wixobj" />
+	    Command="light -ext WiXNetFxExtension -ext WixUtilExtension -ext WixUIExtension -cultures:en-us -out qpidc-0.9-$(Architecture).msi qpidc.wixobj qpid_headers.wixobj boost_headers.wixobj boost_dlls.wixobj examples.wixobj api_docs.wixobj" />
     </Target>
 
 </Project>

Propchange: qpid/branches/QPID-2519/python/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 01:19:00 2011
@@ -1,5 +1,3 @@
 /qpid/branches/0.5.x-dev/qpid/python:892761,894875
 /qpid/branches/java-network-refactor/qpid/python:805429-825319
 /qpid/branches/qmfv2/qpid/python:902858,902894
-/qpid/branches/qpid-2935/qpid/python:1061302-1072333
-/qpid/trunk/qpid/python:1072051-1185907

Propchange: qpid/branches/QPID-2519/python/examples/api/spout
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 01:19:00 2011
@@ -1,4 +1,2 @@
 /qpid/branches/qmfv2/qpid/python/examples/api/spout:902858,902894
-/qpid/branches/qpid-2935/qpid/python/examples/api/spout:1061302-1072333
 /qpid/branches/qpid.rnr/python/examples/api/spout:894071-896158
-/qpid/trunk/qpid/python/examples/api/spout:1072051-1185907

Modified: qpid/branches/QPID-2519/python/qpid/client.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/python/qpid/client.py?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/python/qpid/client.py (original)
+++ qpid/branches/QPID-2519/python/qpid/client.py Fri Oct 21 01:19:00 2011
@@ -106,7 +106,7 @@ class Client:
     try:
       id = None
       for i in xrange(1, 64*1024):
-        if not self.sessions.has_key(i):
+        if not self.sessions.has_key(id):
           id = i
           break
     finally:

Modified: qpid/branches/QPID-2519/python/qpid/codec010.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/python/qpid/codec010.py?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/python/qpid/codec010.py (original)
+++ qpid/branches/QPID-2519/python/qpid/codec010.py Fri Oct 21 01:19:00 2011
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-import datetime, string
+import datetime
 from packer import Packer
 from datatypes import serial, timestamp, RangedSet, Struct, UUID
 from ops import Compound, PRIMITIVE, COMPOUND
@@ -241,20 +241,15 @@ class Codec(Packer):
       v = sc.read_primitive(type)
       result[k] = v
     return result
-
-  def _write_map_elem(self, k, v):
-    type = self.encoding(v)
-    sc = StringCodec()
-    sc.write_str8(k)
-    sc.write_uint8(type.CODE)
-    sc.write_primitive(type, v)
-    return sc.encoded
-
   def write_map(self, m):
     sc = StringCodec()
     if m is not None:
       sc.write_uint32(len(m))
-      sc.write(string.joinfields(map(self._write_map_elem, m.keys(), m.values()), ""))
+      for k, v in m.items():
+        type = self.encoding(v)
+        sc.write_str8(k)
+        sc.write_uint8(type.CODE)
+        sc.write_primitive(type, v)
     self.write_vbin32(sc.encoded)
 
   def read_array(self):

Propchange: qpid/branches/QPID-2519/python/qpid/concurrency.py
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 01:19:00 2011
@@ -1,4 +1,2 @@
 /qpid/branches/qmfv2/qpid/python/qpid/concurrency.py:902858,902894
-/qpid/branches/qpid-2935/qpid/python/qpid/concurrency.py:1061302-1072333
 /qpid/branches/qpid.rnr/python/qpid/concurrency.py:894071-896158
-/qpid/trunk/qpid/python/qpid/concurrency.py:1072051-1185907

Modified: qpid/branches/QPID-2519/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/python/qpid/messaging/driver.py?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/python/qpid/messaging/driver.py (original)
+++ qpid/branches/QPID-2519/python/qpid/messaging/driver.py Fri Oct 21 01:19:00 2011
@@ -66,7 +66,7 @@ class Attachment:
 
 # XXX
 
-DURABLE_DEFAULT=False
+DURABLE_DEFAULT=True
 
 # XXX
 
@@ -526,7 +526,7 @@ class Driver:
       rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
       trans = transports.TRANSPORTS.get(self.connection.transport)
       if trans:
-        self._transport = trans(self.connection, host, port)
+        self._transport = trans(host, port)
       else:
         raise ConnectError("no such transport: %s" % self.connection.transport)
       if self._retrying and self._reconnect_log:
@@ -828,9 +828,8 @@ class Engine:
     self._closing = True
 
   def attach(self, ssn):
-    if ssn.closed: return
     sst = self._attachments.get(ssn)
-    if sst is None:
+    if sst is None and not ssn.closed:
       for i in xrange(0, self.channel_max):
         if not self._sessions.has_key(i):
           ch = i
@@ -931,7 +930,6 @@ class Engine:
 
   def resolve_declare(self, sst, lnk, dir, action):
     declare = lnk.options.get("create") in ("always", dir)
-    assrt = lnk.options.get("assert") in ("always", dir)
     def do_resolved(type, subtype):
       err = None
       if type is None:
@@ -940,12 +938,7 @@ class Engine:
         else:
           err = NotFound(text="no such queue: %s" % lnk.name)
       else:
-        if assrt:
-          expected = lnk.options.get("node", {}).get("type")
-          if expected and type != expected:
-            err = AssertionFailed(text="expected %s, got %s" % (expected, type))
-        if err is None:
-          action(type, subtype)
+        action(type, subtype)
 
       if err:
         tgt = lnk.target

Modified: qpid/branches/QPID-2519/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/python/qpid/messaging/endpoints.py?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/python/qpid/messaging/endpoints.py (original)
+++ qpid/branches/QPID-2519/python/qpid/messaging/endpoints.py Fri Oct 21 01:19:00 2011
@@ -158,7 +158,6 @@ class Connection(Endpoint):
     self.reconnect_log = options.get("reconnect_log", True)
 
     self.address_ttl = options.get("address_ttl", 60)
-    self.tcp_nodelay = options.get("tcp_nodelay", False)
 
     self.options = options
 
@@ -198,7 +197,7 @@ class Connection(Endpoint):
     return result
 
   def check_closed(self):
-    if not self._connected:
+    if self.closed:
       self._condition.gc()
       raise ConnectionClosed()
 
@@ -1007,9 +1006,9 @@ class Receiver(Endpoint, object):
       self.draining = True
       self._wakeup()
       self._ecwait(lambda: not self.draining)
-      msg = self.session._get(self, timeout=0)
       self._grant()
       self._wakeup()
+      msg = self.session._get(self, timeout=0)
       if msg is None:
         raise Empty()
     elif self._capacity not in (0, UNLIMITED.value):

Modified: qpid/branches/QPID-2519/python/qpid/messaging/transports.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/python/qpid/messaging/transports.py?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/python/qpid/messaging/transports.py (original)
+++ qpid/branches/QPID-2519/python/qpid/messaging/transports.py Fri Oct 21 01:19:00 2011
@@ -17,23 +17,18 @@
 # under the License.
 #
 
-import socket
 from qpid.util import connect
 
 TRANSPORTS = {}
 
-class SocketTransport:
+class tcp:
 
-  def __init__(self, conn, host, port):
+  def __init__(self, host, port):
     self.socket = connect(host, port)
-    if conn.tcp_nodelay:
-      self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
   def fileno(self):
     return self.socket.fileno()
 
-class tcp(SocketTransport):
-
   def reading(self, reading):
     return reading
 
@@ -57,14 +52,17 @@ try:
 except ImportError:
   pass
 else:
-  class tls(SocketTransport):
+  class tls:
 
-    def __init__(self, conn, host, port):
-      SocketTransport.__init__(self, conn, host, port)
+    def __init__(self, host, port):
+      self.socket = connect(host, port)
       self.tls = wrap_socket(self.socket)
       self.socket.setblocking(0)
       self.state = None
 
+    def fileno(self):
+      return self.socket.fileno()
+
     def reading(self, reading):
       if self.state is None:
         return reading

Modified: qpid/branches/QPID-2519/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/python/qpid/tests/messaging/endpoints.py?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/branches/QPID-2519/python/qpid/tests/messaging/endpoints.py Fri Oct 21 01:19:00 2011
@@ -46,10 +46,6 @@ class SetupTests(Base):
     self.conn.open()
     self.ping(self.conn.session())
 
-  def testTcpNodelay(self):
-    self.conn = Connection.establish(self.broker, tcp_nodelay=True)
-    assert self.conn._driver._transport.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
-
   def testConnectError(self):
     try:
       # Specifying port 0 yields a bad address on Windows; port 4 is unassigned
@@ -115,8 +111,8 @@ class SetupTests(Base):
 
     class flaky:
 
-      def __init__(self, conn, host, port):
-        self.real = real(conn, host, port)
+      def __init__(self, host, port):
+        self.real = real(host, port)
         self.sent_count = 0
         self.recv_count = 0
 
@@ -190,9 +186,6 @@ class ConnectionTests(Base):
   def setup_connection(self):
     return Connection.establish(self.broker, **self.connection_options())
 
-  def testCheckClosed(self):
-    assert not self.conn.check_closed()
-
   def testSessionAnon(self):
     ssn1 = self.conn.session()
     ssn2 = self.conn.session()
@@ -255,8 +248,8 @@ class ConnectionTests(Base):
 
 class hangable:
 
-  def __init__(self, conn, host, port):
-    self.tcp = TRANSPORTS["tcp"](conn, host, port)
+  def __init__(self, host, port):
+    self.tcp = TRANSPORTS["tcp"](host, port)
     self.hung = False
 
   def hang(self):
@@ -1186,16 +1179,6 @@ test-link-bindings-queue; {
       snd.send(m)
     self.drain(qrcv, expected=msgs)
 
-  def testAssert1(self):
-    try:
-      snd = self.ssn.sender("amq.topic; {assert: always, node: {type: queue}}")
-      assert 0, "assertion failed to trigger"
-    except AssertionFailed, e:
-      pass
-
-  def testAssert2(self):
-    snd = self.ssn.sender("amq.topic; {assert: always}")
-
 NOSUCH_Q = "this-queue-should-not-exist"
 UNPARSEABLE_ADDR = "name/subject; {bad options"
 UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"

Modified: qpid/branches/QPID-2519/python/qpid/util.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/python/qpid/util.py?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/python/qpid/util.py (original)
+++ qpid/branches/QPID-2519/python/qpid/util.py Fri Oct 21 01:19:00 2011
@@ -39,17 +39,12 @@ except ImportError:
       self.sock.close()
 
 def connect(host, port):
-  for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
-    af, socktype, proto, canonname, sa = res
-    sock = socket.socket(af, socktype, proto)
-    try:
-      sock.connect(sa)
-      break
-    except socket.error, msg:
-      sock.close
-  else:
-    # If we got here then we couldn't connect (yet)
-    raise
+  sock = socket.socket()
+  sock.connect((host, port))
+  sock.setblocking(1)
+  # XXX: we could use this on read, but we'd have to put write in a
+  # loop as well
+  # sock.settimeout(1)
   return sock
 
 def listen(host, port, predicate = lambda: True, bound = lambda: None):
@@ -106,23 +101,15 @@ def fill(text, indent, heading = None):
 class URL:
 
   RE = re.compile(r"""
-        # [   <scheme>://  ] [    <user>   [   / <password>   ] @]    ( <host4>     | \[    <host6>    \] )  [   :<port>   ]
-        ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: / ([^:/@]+)   )? @)? (?: ([^@:/\[]+) | \[ ([a-f0-9:.]+) \] ) (?: :([0-9]+))?$
-""", re.X | re.I)
+        # [   <scheme>://  ] [    <user>   [   / <password>   ] @]   <host>   [   :<port>   ]
+        ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: / ([^:/@]+)   )? @)? ([^@:/]+) (?: :([0-9]+))?$
+""", re.X)
 
   AMQPS = "amqps"
   AMQP = "amqp"
 
-  def __init__(self, s=None, **kwargs):
-    if s is None:
-      self.scheme = kwargs.get('scheme', None)
-      self.user = kwargs.get('user', None)
-      self.password = kwargs.get('password', None)
-      self.host = kwargs.get('host', None)
-      self.port = kwargs.get('port', None)
-      if self.host is None:
-        raise ValueError('Host required for url')
-    elif isinstance(s, URL):
+  def __init__(self, s):
+    if isinstance(s, URL):
       self.scheme = s.scheme
       self.user = s.user
       self.password = s.password
@@ -132,8 +119,7 @@ class URL:
       match = URL.RE.match(s)
       if match is None:
         raise ValueError(s)
-      self.scheme, self.user, self.password, host4, host6, port = match.groups()
-      self.host = host4 or host6
+      self.scheme, self.user, self.password, self.host, port = match.groups()
       if port is None:
         self.port = None
       else:
@@ -151,25 +137,11 @@ class URL:
       if self.password:
         s += "/%s" % self.password
       s += "@"
-    if ':' not in self.host:
-      s += self.host
-    else:
-      s += "[%s]" % self.host
+    s += self.host
     if self.port:
       s += ":%s" % self.port
     return s
 
-  def __eq__(self, url):
-    if isinstance(url, basestring):
-      url = URL(url)
-    return \
-      self.scheme==url.scheme and \
-      self.user==url.user and self.password==url.password and \
-      self.host==url.host and self.port==url.port
-
-  def __ne__(self, url):
-    return not self.__eq__(url)
-
 def default(value, default):
   if value is None:
     return default

Modified: qpid/branches/QPID-2519/python/setup.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/python/setup.py?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/python/setup.py (original)
+++ qpid/branches/QPID-2519/python/setup.py Fri Oct 21 01:19:00 2011
@@ -298,7 +298,7 @@ class install_lib(_install_lib):
     return outfiles + extra
 
 setup(name="qpid-python",
-      version="0.13",
+      version="0.9",
       author="Apache Qpid",
       author_email="dev@qpid.apache.org",
       packages=["mllib", "qpid", "qpid.messaging", "qpid.tests",

Modified: qpid/branches/QPID-2519/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/specs/management-schema.xml?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/specs/management-schema.xml (original)
+++ qpid/branches/QPID-2519/specs/management-schema.xml Fri Oct 21 01:19:00 2011
@@ -92,7 +92,6 @@
       <arg name="srcQueue"          dir="I" type="sstr" desc="Source queue"/>
       <arg name="destQueue"         dir="I" type="sstr" desc="Destination queue"/>
       <arg name="qty"               dir="I" type="uint32" desc="# of messages to move. 0 means all messages"/>
-      <arg name="filter"  dir="I" type="map" default="{}"   desc="if specified, move only those messages matching this filter"/>
     </method>
 
     <method name="setLogLevel" desc="Set the log level">
@@ -103,34 +102,6 @@
       <arg name="level"     dir="O" type="sstr"/>
     </method>
 
-    <method name="getTimestampConfig" desc="Get the message timestamping configuration">
-      <arg name="receive" dir="O" type="bool"  desc="True if received messages are timestamped."/>
-    </method>
-
-    <method name="setTimestampConfig" desc="Set the message timestamping configuration">
-      <arg name="receive"  dir="I" type="bool" desc="Set true to enable timestamping received messages."/>
-    </method>
-
-    <method name="create" desc="Create an object of the specified type">
-      <arg name="type" dir="I" type="sstr" desc="The type of object to create"/>
-      <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/> 
-      <arg name="properties" dir="I" type="map" desc="Type specific object properties"/> 
-      <arg name="strict" dir="I" type="bool" desc="If specified, treat unrecognised object properties as an error"/> 
-    </method>
-
-    <method name="delete" desc="Delete an object of the specified type">
-      <arg name="type" dir="I" type="sstr" desc="The type of object to delete"/>
-      <arg name="name" dir="I" type="sstr" desc="The name of the object to delete"/> 
-      <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/> 
-    </method>
-
-    <method name="query" desc="Query the current state of an object.">
-      <arg name="type" dir="I" type="sstr" desc="The type of object to query."/>
-      <arg name="name" dir="I" type="sstr" desc="The name of the object to query"/> 
-      <arg name="results" dir="O" type="map"  desc="A snapshot of the object's state."/>
-    </method>
-
-
   </class>
 
   <!--
@@ -179,8 +150,8 @@
     <statistic name="msgTxnDequeues"      type="count64"  unit="message"     desc="Transactional messages dequeued"/>
     <statistic name="msgPersistEnqueues"  type="count64"  unit="message"     desc="Persistent messages enqueued"/>
     <statistic name="msgPersistDequeues"  type="count64"  unit="message"     desc="Persistent messages dequeued"/>
-    <statistic name="msgDepth"            type="count64"  unit="message"     desc="Current size of queue in messages" assign="msgTotalEnqueues - msgTotalDequeues"/>
-    <statistic name="byteDepth"           type="count64"  unit="octet"       desc="Current size of queue in bytes"    assign="byteTotalEnqueues - byteTotalDequeues"/>
+    <statistic name="msgDepth"            type="count32"  unit="message"     desc="Current size of queue in messages" assign="msgTotalEnqueues - msgTotalDequeues"/>
+    <statistic name="byteDepth"           type="count32"  unit="octet"       desc="Current size of queue in bytes"    assign="byteTotalEnqueues - byteTotalDequeues"/>
     <statistic name="byteTotalEnqueues"   type="count64"  unit="octet"       desc="Total messages enqueued"/>
     <statistic name="byteTotalDequeues"   type="count64"  unit="octet"       desc="Total messages dequeued"/>
     <statistic name="byteTxnEnqueues"     type="count64"  unit="octet"       desc="Transactional messages enqueued"/>
@@ -191,19 +162,15 @@
     <statistic name="bindingCount"        type="hilo32"   unit="binding"     desc="Current bindings"/>
     <statistic name="unackedMessages"     type="hilo32"   unit="message"     desc="Messages consumed but not yet acked"/>
     <statistic name="messageLatency"      type="mmaTime"  unit="nanosecond"  desc="Broker latency through this queue"/>
-    <statistic name="flowStopped"         type="bool"     desc="Flow control active."/>
-    <statistic name="flowStoppedCount"    type="count32"  desc="Number of times flow control was activated for this queue"/>
 
     <method name="purge" desc="Discard all or some messages on a queue">
       <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/>
-      <arg name="filter"  dir="I" type="map" default="{}"  desc="if specified, purge only those messages matching this filter"/>
     </method>
 
     <method name="reroute" desc="Remove all or some messages on this queue and route them to an exchange">
       <arg name="request"        dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/>
       <arg name="useAltExchange" dir="I" type="bool"   desc="Iff true, use the queue's configured alternate exchange; iff false, use exchange named in the 'exchange' argument"/>
       <arg name="exchange"       dir="I" type="sstr"   desc="Name of the exchange to route the messages through"/>
-      <arg name="filter"  dir="I" type="map" default="{}" desc="if specified, reroute only those messages matching this filter"/>
     </method>
   </class>
 
@@ -272,16 +239,13 @@
     <property name="vhostRef" type="objId"  references="Vhost" access="RC" index="y" parentRef="y"/>
     <property name="address"  type="sstr"   access="RC" index="y"/>
     <property name="incoming" type="bool"   access="RC"/>
-    <property name="SystemConnection"   type="bool"   access="RC" desc="Infrastructure/ Inter-system connection (Cluster, Federation, ...)"/>
-    <property name="userProxyAuth"      type="bool"   access="RO" desc="Authorization to proxy for users not on broker"/>
+    <property name="SystemConnection"   type="bool"   access="RC" desc="Infrastucture/ Inter-system connection (Cluster, Federation, ...)"/>
     <property name="federationLink"     type="bool"   access="RO" desc="Is this a federation link"/>
     <property name="authIdentity"       type="sstr"   access="RO" desc="authId of connection if authentication enabled"/>
     <property name="remoteProcessName"  type="lstr"   access="RO" optional="y" desc="Name of executable running as remote client"/>
     <property name="remotePid"          type="uint32" access="RO" optional="y" desc="Process ID of remote client"/>
     <property name="remoteParentPid"    type="uint32" access="RO" optional="y" desc="Parent Process ID of remote client"/>
     <property name="shadow"             type="bool"   access="RO" desc="True for shadow connections"/>
-    <property name="saslMechanism"      type="sstr"   access="RO" desc="SASL mechanism"/>
-    <property name="saslSsf"            type="uint16"   access="RO" desc="SASL security strength factor"/>
     <statistic name="closing"          type="bool" desc="This client is closing by management request"/>
     <statistic name="framesFromClient" type="count64"/>
     <statistic name="framesToClient"   type="count64"/>
@@ -416,8 +380,8 @@
     <arg name="reason"  type="lstr"   desc="Reason for a failure"/>
     <arg name="rhost"   type="sstr"   desc="Address (i.e. DNS name, IP address, etc.) of a remotely connected host"/>
     <arg name="user"    type="sstr"   desc="Authentication identity"/>
-    <arg name="msgDepth" type="count64" desc="Current size of queue in messages"/>
-    <arg name="byteDepth" type="count64" desc="Current size of queue in bytes"/>
+    <arg name="msgDepth" type="count32" desc="Current size of queue in messages"/>
+    <arg name="byteDepth" type="count32" desc="Current size of queue in bytes"/>
   </eventArguments>
 
   <event name="clientConnect"     sev="inform" args="rhost, user"/>

Modified: qpid/branches/QPID-2519/tests/setup.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tests/setup.py?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tests/setup.py (original)
+++ qpid/branches/QPID-2519/tests/setup.py Fri Oct 21 01:19:00 2011
@@ -20,7 +20,7 @@
 from distutils.core import setup
 
 setup(name="qpid-tests",
-      version="0.13",
+      version="0.9",
       author="Apache Qpid",
       author_email="dev@qpid.apache.org",
       packages=["qpid_tests", "qpid_tests.broker_0_10", "qpid_tests.broker_0_9",

Modified: qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/__init__.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/__init__.py?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/__init__.py (original)
+++ qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/__init__.py Fri Oct 21 01:19:00 2011
@@ -33,4 +33,3 @@ from lvq import *
 from priority import *
 from threshold import *
 from extensions import *
-from msg_groups import *

Modified: qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py (original)
+++ qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py Fri Oct 21 01:19:00 2011
@@ -18,7 +18,7 @@
 #
 import traceback
 from qpid.queue import Empty
-from qpid.datatypes import Message, RangedSet
+from qpid.datatypes import Message
 from qpid.testlib import TestBase010
 from qpid.session import SessionException
 
@@ -77,7 +77,13 @@ class AlternateExchangeTests(TestBase010
         """
         session = self.session
         #set up a 'dead letter queue':
-        dlq = self.setup_dlq()
+        session.exchange_declare(exchange="dlq", type="fanout")
+        session.queue_declare(queue="deleted", exclusive=True, auto_delete=True)
+        session.exchange_bind(exchange="dlq", queue="deleted")
+        session.message_subscribe(destination="dlq", queue="deleted")
+        session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        dlq = session.incoming("dlq")
 
         #create a queue using the dlq as its alternate exchange:
         session.queue_declare(queue="delete-me", alternate_exchange="dlq")
@@ -230,121 +236,6 @@ class AlternateExchangeTests(TestBase010
             self.assertEqual("Three", dlq.get(timeout=1).body)
             self.assertEmpty(dlq)
 
-    def test_queue_delete_loop(self):
-        """
-        Test that if a queue is bound to its own alternate exchange,
-        then on deletion there is no infinite looping
-        """
-        session = self.session
-        dlq = self.setup_dlq()
-
-        #create a queue using the dlq as its alternate exchange:
-        session.queue_declare(queue="delete-me", alternate_exchange="dlq")
-        #bind that queue to the dlq as well:
-        session.exchange_bind(exchange="dlq", queue="delete-me")
-        #send it some messages:
-        dp=self.session.delivery_properties(routing_key="delete-me")
-        for m in ["One", "Two", "Three"]:
-            session.message_transfer(message=Message(dp, m))
-        #delete it:
-        session.queue_delete(queue="delete-me")
-        #cleanup:
-        session.exchange_delete(exchange="dlq")
-
-        #check the messages were delivered to the dlq:
-        for m in ["One", "Two", "Three"]:
-            self.assertEqual(m, dlq.get(timeout=1).body)
-        self.assertEmpty(dlq)
-
-    def test_queue_delete_no_match(self):
-        """
-        Test that on queue deletion, if the queues own alternate
-        exchange cannot find a match for the message, the
-        alternate-exchange of that exchange will be tried. Note:
-        though the spec rules out going to the alternate-exchanges
-        alternate exchange when sending to an exchange, it does not
-        cover this case.
-        """
-        session = self.session
-        dlq = self.setup_dlq()
-
-        #setu up an 'intermediary' exchange
-        session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq")
-
-        #create a queue using the intermediary as its alternate exchange:
-        session.queue_declare(queue="delete-me", alternate_exchange="my-exchange")
-        #bind that queue to the dlq as well:
-        session.exchange_bind(exchange="dlq", queue="delete-me")
-        #send it some messages:
-        dp=self.session.delivery_properties(routing_key="delete-me")
-        for m in ["One", "Two", "Three"]:
-            session.message_transfer(message=Message(dp, m))
-
-        #delete it:
-        session.queue_delete(queue="delete-me")
-        #cleanup:
-        session.exchange_delete(exchange="my-exchange")
-        session.exchange_delete(exchange="dlq")
-
-        #check the messages were delivered to the dlq:
-        for m in ["One", "Two", "Three"]:
-            self.assertEqual(m, dlq.get(timeout=1).body)
-        self.assertEmpty(dlq)
-
-    def test_reject_no_match(self):
-        """
-        Test that on rejecting a message, if the queues own alternate
-        exchange cannot find a match for the message, the
-        alternate-exchange of that exchange will be tried. Note:
-        though the spec rules out going to the alternate-exchanges
-        alternate exchange when sending to an exchange, it does not
-        cover this case.
-        """
-        session = self.session
-        dlq = self.setup_dlq()
-
-        #setu up an 'intermediary' exchange
-        session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq")
-
-        #create a queue using the intermediary as its alternate exchange:
-        session.queue_declare(queue="delivery-queue", alternate_exchange="my-exchange", auto_delete=True)
-        #bind that queue to the dlq as well:
-        session.exchange_bind(exchange="dlq", queue="delivery-queue")
-        #send it some messages:
-        dp=self.session.delivery_properties(routing_key="delivery-queue")
-        for m in ["One", "Two", "Three"]:
-            session.message_transfer(message=Message(dp, m))
-
-        #get and reject those messages:
-        session.message_subscribe(destination="a", queue="delivery-queue")
-        session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
-        session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
-        incoming = session.incoming("a")
-        for m in ["One", "Two", "Three"]:
-            msg = incoming.get(timeout=1)
-            self.assertEqual(m, msg.body)
-            session.message_reject(RangedSet(msg.id))
-        session.message_cancel(destination="a")
-
-        #check the messages were delivered to the dlq:
-        for m in ["One", "Two", "Three"]:
-            self.assertEqual(m, dlq.get(timeout=1).body)
-        self.assertEmpty(dlq)
-        #cleanup:
-        session.exchange_delete(exchange="my-exchange")
-        session.exchange_delete(exchange="dlq")
-
-    def setup_dlq(self):
-        session = self.session
-        #set up 'dead-letter' handling:
-        session.exchange_declare(exchange="dlq", type="fanout")
-        session.queue_declare(queue="deleted", exclusive=True, auto_delete=True)
-        session.exchange_bind(exchange="dlq", queue="deleted")
-        session.message_subscribe(destination="dlq", queue="deleted")
-        session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL)
-        session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
-        dlq = session.incoming("dlq")
-        return dlq
 
     def assertEmpty(self, queue):
         try:

Modified: qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/dtx.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/dtx.py?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/dtx.py (original)
+++ qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/dtx.py Fri Oct 21 01:19:00 2011
@@ -6,9 +6,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-#
+# 
 #   http://www.apache.org/licenses/LICENSE-2.0
-#
+# 
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -36,7 +36,7 @@ class DtxTests(TestBase010):
     and the appropriate result verified.
 
     The other tests enforce more specific rules and behaviour on a
-    per-method or per-field basis.
+    per-method or per-field basis.        
     """
 
     XA_RBROLLBACK = 1
@@ -49,8 +49,8 @@ class DtxTests(TestBase010):
         self.session = self.conn.session("dtx-session", 1)
 
     def test_simple_commit(self):
-        """
-        Test basic one-phase commit behaviour.
+        """        
+        Test basic one-phase commit behaviour.     
         """
         guard = self.keepQueuesAlive(["queue-a", "queue-b"])
         session = self.session
@@ -73,8 +73,8 @@ class DtxTests(TestBase010):
         self.assertMessageId("commit", "queue-b")
 
     def test_simple_prepare_commit(self):
-        """
-        Test basic two-phase commit behaviour.
+        """        
+        Test basic two-phase commit behaviour.     
         """
         guard = self.keepQueuesAlive(["queue-a", "queue-b"])
         session = self.session
@@ -100,8 +100,8 @@ class DtxTests(TestBase010):
 
 
     def test_simple_rollback(self):
-        """
-        Test basic rollback behaviour.
+        """        
+        Test basic rollback behaviour.     
         """
         guard = self.keepQueuesAlive(["queue-a", "queue-b"])
         session = self.session
@@ -123,8 +123,8 @@ class DtxTests(TestBase010):
         self.assertMessageId("rollback", "queue-a")
 
     def test_simple_prepare_rollback(self):
-        """
-        Test basic rollback behaviour after the transaction has been prepared.
+        """        
+        Test basic rollback behaviour after the transaction has been prepared.     
         """
         guard = self.keepQueuesAlive(["queue-a", "queue-b"])
         session = self.session
@@ -146,18 +146,18 @@ class DtxTests(TestBase010):
         #check result
         self.assertMessageCount(1, "queue-a")
         self.assertMessageCount(0, "queue-b")
-        self.assertMessageId("prepare-rollback", "queue-a")
+        self.assertMessageId("prepare-rollback", "queue-a")    
 
     def test_select_required(self):
         """
         check that an error is flagged if select is not issued before
-        start or end
+        start or end        
         """
         session = self.session
         tx = self.xid("dummy")
         try:
             session.dtx_start(xid=tx)
-
+            
             #if we get here we have failed, but need to do some cleanup:
             session.dtx_end(xid=tx)
             session.dtx_rollback(xid=tx)
@@ -197,10 +197,10 @@ class DtxTests(TestBase010):
             other.close()
         session1.dtx_end(xid=tx)
         session1.dtx_rollback(xid=tx)
-
+        
         #verification:
         if failed: self.assertEquals(530, error.args[0].error_code)
-        else: self.fail("Xid already known, expected exception!")
+        else: self.fail("Xid already known, expected exception!")                    
 
     def test_forget_xid_on_completion(self):
         """
@@ -210,8 +210,8 @@ class DtxTests(TestBase010):
         #do some transactional work & complete the transaction
         self.test_simple_commit()
         # session has been reset, so reselect for use with dtx
-        self.session.dtx_select()
-
+        self.session.dtx_select()        
+        
         #start association for the same xid as the previously completed txn
         tx = self.xid("my-xid")
         self.session.dtx_start(xid=tx)
@@ -237,9 +237,9 @@ class DtxTests(TestBase010):
             self.assertEquals(503, e.args[0].error_code)
 
     def test_start_join(self):
-        """
+        """        
         Verify 'join' behaviour, where a session is associated with a
-        transaction that is already associated with another session.
+        transaction that is already associated with another session.        
         """
         guard = self.keepQueuesAlive(["one", "two"])
         #create two sessions & select them for use with dtx:
@@ -269,14 +269,14 @@ class DtxTests(TestBase010):
         #mark end on both sessions
         session1.dtx_end(xid=tx)
         session2.dtx_end(xid=tx)
-
+        
         #commit and check
         session1.dtx_commit(xid=tx, one_phase=True)
         self.assertMessageCount(1, "one")
         self.assertMessageCount(1, "two")
         self.assertMessageId("a", "two")
         self.assertMessageId("b", "one")
-
+        
 
     def test_suspend_resume(self):
         """
@@ -300,7 +300,7 @@ class DtxTests(TestBase010):
         session.dtx_start(xid=tx, resume=True)
         self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
         session.dtx_end(xid=tx)
-
+        
         #commit and check
         session.dtx_commit(xid=tx, one_phase=True)
         self.assertMessageCount(1, "one")
@@ -308,7 +308,7 @@ class DtxTests(TestBase010):
         self.assertMessageId("a", "two")
         self.assertMessageId("b", "one")
 
-    def test_suspend_start_end_resume(self):
+    def test_suspend_start_end_resume(self):        
         """
         Test suspension and resumption of an association with work
         done on another transaction when the first transaction is
@@ -332,7 +332,7 @@ class DtxTests(TestBase010):
         session.dtx_start(xid=tx, resume=True)
         self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
         session.dtx_end(xid=tx)
-
+        
         #commit and check
         session.dtx_commit(xid=tx, one_phase=True)
         self.assertMessageCount(1, "one")
@@ -341,10 +341,10 @@ class DtxTests(TestBase010):
         self.assertMessageId("b", "one")
 
     def test_end_suspend_and_fail(self):
-        """
+        """        
         Verify that the correct error is signalled if the suspend and
         fail flag are both set when disassociating a transaction from
-        the session
+        the session        
         """
         session = self.session
         session.dtx_select()
@@ -356,16 +356,16 @@ class DtxTests(TestBase010):
         except SessionException, e:
             self.assertEquals(503, e.args[0].error_code)
 
-        #cleanup
+        #cleanup    
         other = self.connect()
         session = other.session("cleanup", 1)
         session.dtx_rollback(xid=tx)
         session.close()
         other.close()
-
+    
 
     def test_end_unknown_xid(self):
-        """
+        """        
         Verifies that the correct exception is thrown when an attempt
         is made to end the association for a xid not previously
         associated with the session
@@ -382,7 +382,7 @@ class DtxTests(TestBase010):
     def test_end(self):
         """
         Verify that the association is terminated by end and subsequent
-        operations are non-transactional
+        operations are non-transactional        
         """
         guard = self.keepQueuesAlive(["tx-queue"])
         session = self.conn.session("alternate", 1)
@@ -408,7 +408,7 @@ class DtxTests(TestBase010):
         session.message_accept(RangedSet(msg.id))
         session.close()
 
-        session = self.session
+        session = self.session        
         #commit the transaction and check that the first message (and
         #only the first message) is then delivered
         session.dtx_commit(xid=tx, one_phase=True)
@@ -418,7 +418,7 @@ class DtxTests(TestBase010):
     def test_invalid_commit_one_phase_true(self):
         """
         Test that a commit with one_phase = True is rejected if the
-        transaction in question has already been prepared.
+        transaction in question has already been prepared.        
         """
         other = self.connect()
         tester = other.session("tester", 1)
@@ -447,7 +447,7 @@ class DtxTests(TestBase010):
     def test_invalid_commit_one_phase_false(self):
         """
         Test that a commit with one_phase = False is rejected if the
-        transaction in question has not yet been prepared.
+        transaction in question has not yet been prepared.        
         """
         other = self.connect()
         tester = other.session("tester", 1)
@@ -474,7 +474,7 @@ class DtxTests(TestBase010):
 
     def test_invalid_commit_not_ended(self):
         """
-        Test that a commit fails if the xid is still associated with a session.
+        Test that a commit fails if the xid is still associated with a session.        
         """
         other = self.connect()
         tester = other.session("tester", 1)
@@ -502,7 +502,7 @@ class DtxTests(TestBase010):
 
     def test_invalid_rollback_not_ended(self):
         """
-        Test that a rollback fails if the xid is still associated with a session.
+        Test that a rollback fails if the xid is still associated with a session.        
         """
         other = self.connect()
         tester = other.session("tester", 1)
@@ -531,7 +531,7 @@ class DtxTests(TestBase010):
 
     def test_invalid_prepare_not_ended(self):
         """
-        Test that a prepare fails if the xid is still associated with a session.
+        Test that a prepare fails if the xid is still associated with a session.        
         """
         other = self.connect()
         tester = other.session("tester", 1)
@@ -586,9 +586,9 @@ class DtxTests(TestBase010):
         session1.dtx_rollback(xid=tx)
 
     def test_get_timeout(self):
-        """
+        """        
         Check that get-timeout returns the correct value, (and that a
-        transaction with a timeout can complete normally)
+        transaction with a timeout can complete normally)        
         """
         session = self.session
         tx = self.xid("dummy")
@@ -599,12 +599,12 @@ class DtxTests(TestBase010):
         session.dtx_set_timeout(xid=tx, timeout=60)
         self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout)
         self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status)
-        self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
-
+        self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)        
+        
     def test_set_timeout(self):
-        """
+        """        
         Test the timeout of a transaction results in the expected
-        behaviour
+        behaviour        
         """
 
         guard = self.keepQueuesAlive(["queue-a", "queue-b"])
@@ -627,7 +627,7 @@ class DtxTests(TestBase010):
         self.assertMessageId("timeout", "queue-a")
         #check the correct codes are returned when we try to complete the txn
         self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status)
-        self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status)
+        self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status)        
 
 
 
@@ -649,20 +649,20 @@ class DtxTests(TestBase010):
             if i in [2, 5, 6, 8]:
                 session.dtx_prepare(xid=tx)
                 prepared.append(tx)
-            else:
+            else:    
                 session.dtx_rollback(xid=tx)
 
         xids = session.dtx_recover().in_doubt
-
+        
         #rollback the prepared transactions returned by recover
         for x in xids:
-            session.dtx_rollback(xid=x)
+            session.dtx_rollback(xid=x)            
 
         #validate against the expected list of prepared transactions
         actual = set([x.global_id for x in xids]) #TODO: come up with nicer way to test these
         expected = set([x.global_id for x in prepared])
         intersection = actual.intersection(expected)
-
+        
         if intersection != expected:
             missing = expected.difference(actual)
             extra = actual.difference(expected)
@@ -723,7 +723,7 @@ class DtxTests(TestBase010):
         session.message_transfer(message=Message(dp, mp, "DtxMessage"))
 
         #start the transaction:
-        session.dtx_select()
+        session.dtx_select()        
         self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)
 
         #'swap' the message from one queue to the other, under that transaction:
@@ -760,7 +760,7 @@ class DtxTests(TestBase010):
     def getMessageProperty(self, msg, prop):
         for h in msg.headers:
             if hasattr(h, prop): return getattr(h, prop)
-        return None
+        return None            
 
     def keepQueuesAlive(self, names):
         session = self.conn.session("nasty", 99)
@@ -768,7 +768,7 @@ class DtxTests(TestBase010):
             session.queue_declare(queue=n, auto_delete=True)
             session.message_subscribe(destination=n, queue=n)
         return session
-
+        
     def createMessage(self, session, key, id, body):
         dp=session.delivery_properties(routing_key=key)
         mp=session.message_properties(correlation_id=id)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org