You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/20 20:43:26 UTC
svn commit: r1186990 [42/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java (original)
+++ qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java Thu Oct 20 18:42:46 2011
@@ -20,36 +20,113 @@
*/
package org.apache.qpid.tools;
+import java.net.InetAddress;
import java.text.DecimalFormat;
-import java.util.Hashtable;
+import java.util.UUID;
import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
public class PerfBase
{
+ public final static String CODE = "CODE";
+ public final static String ID = "ID";
+ public final static String REPLY_ADDR = "REPLY_ADDR";
+ public final static String MAX_LATENCY = "MAX_LATENCY";
+ public final static String MIN_LATENCY = "MIN_LATENCY";
+ public final static String AVG_LATENCY = "AVG_LATENCY";
+ public final static String STD_DEV = "STD_DEV";
+ public final static String CONS_RATE = "CONS_RATE";
+ public final static String PROD_RATE = "PROD_RATE";
+ public final static String MSG_COUNT = "MSG_COUNT";
+ public final static String TIMESTAMP = "Timestamp";
+
+ String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}");
+
TestParams params;
Connection con;
Session session;
+ Session controllerSession;
Destination dest;
- Destination feedbackDest;
+ Destination myControlQueue;
+ Destination controllerQueue;
DecimalFormat df = new DecimalFormat("###.##");
+ String id;
+ String myControlQueueAddr;
+
+ MessageProducer sendToController;
+ MessageConsumer receiveFromController;
+ String prefix = "";
+
+ enum OPCode {
+ REGISTER_CONSUMER, REGISTER_PRODUCER,
+ PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP,
+ CONSUMER_READY, PRODUCER_READY,
+ PRODUCER_START,
+ RECEIVED_END_MSG, CONSUMER_STOP,
+ RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS,
+ CONTINUE_TEST, STOP_TEST
+ };
+
+ enum MessageType {
+ BYTES, TEXT, MAP, OBJECT;
+
+ public static MessageType getType(String s) throws Exception
+ {
+ if ("text".equalsIgnoreCase(s))
+ {
+ return TEXT;
+ }
+ else if ("bytes".equalsIgnoreCase(s))
+ {
+ return BYTES;
+ }
+ /*else if ("map".equalsIgnoreCase(s))
+ {
+ return MAP;
+ }
+ else if ("object".equalsIgnoreCase(s))
+ {
+ return OBJECT;
+ }*/
+ else
+ {
+ throw new Exception("Unsupported message type");
+ }
+ }
+ };
+
+ MessageType msgType = MessageType.BYTES;
- public PerfBase()
+ public PerfBase(String prefix)
{
params = new TestParams();
+ String host = "";
+ try
+ {
+ host = InetAddress.getLocalHost().getHostName();
+ }
+ catch (Exception e)
+ {
+ }
+ id = host + "-" + UUID.randomUUID().toString();
+ this.prefix = prefix;
+ this.myControlQueueAddr = id + ";{create: always}";
}
public void setUp() throws Exception
- {
-
+ {
if (params.getHost().equals("") || params.getPort() == -1)
{
con = new AMQConnection(params.getUrl());
@@ -62,7 +139,78 @@ public class PerfBase
session = con.createSession(params.isTransacted(),
params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
- dest = new AMQAnyDestination(params.getAddress());
+ controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ dest = createDestination();
+ controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR);
+ myControlQueue = session.createQueue(myControlQueueAddr);
+ msgType = MessageType.getType(params.getMessageType());
+ System.out.println("Using " + msgType + " messages");
+
+ sendToController = controllerSession.createProducer(controllerQueue);
+ receiveFromController = controllerSession.createConsumer(myControlQueue);
+ }
+
+ private Destination createDestination() throws Exception
+ {
+ if (params.isUseUniqueDests())
+ {
+ System.out.println("Prefix : " + prefix);
+ Address addr = Address.parse(params.getAddress());
+ AMQAnyDestination temp = new AMQAnyDestination(params.getAddress());
+ int type = ((AMQSession_0_10)session).resolveAddressType(temp);
+
+ if ( type == AMQDestination.TOPIC_TYPE)
+ {
+ addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions());
+ System.out.println("Setting subject : " + addr);
+ }
+ else
+ {
+ addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions());
+ System.out.println("Setting name : " + addr);
+ }
+
+ return new AMQAnyDestination(addr);
+ }
+ else
+ {
+ return new AMQAnyDestination(params.getAddress());
+ }
+ }
+
+ public synchronized void sendMessageToController(MapMessage m) throws Exception
+ {
+ m.setString(ID, id);
+ m.setString(REPLY_ADDR,myControlQueueAddr);
+ sendToController.send(m);
+ }
+
+ public void receiveFromController(OPCode expected) throws Exception
+ {
+ MapMessage m = (MapMessage)receiveFromController.receive();
+ OPCode code = OPCode.values()[m.getInt(CODE)];
+ System.out.println("Received Code : " + code);
+ if (expected != code)
+ {
+ throw new Exception("Expected OPCode : " + expected + " but received : " + code);
+ }
+
+ }
+
+ public boolean continueTest() throws Exception
+ {
+ MapMessage m = (MapMessage)receiveFromController.receive();
+ OPCode code = OPCode.values()[m.getInt(CODE)];
+ System.out.println("Received Code : " + code);
+ return (code == OPCode.CONTINUE_TEST);
+ }
+
+ public void tearDown() throws Exception
+ {
+ session.close();
+ controllerSession.close();
+ con.close();
}
public void handleError(Exception e,String msg)
Modified: qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java (original)
+++ qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java Thu Oct 20 18:42:46 2011
@@ -20,13 +20,17 @@
*/
package org.apache.qpid.tools;
-import javax.jms.Destination;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
import javax.jms.TextMessage;
+import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
/**
@@ -47,7 +51,7 @@ import org.apache.qpid.thread.Threading;
* b) They are on separate machines that have their time synced via a Time Server
*
* In order to calculate latency the producer inserts a timestamp
- * hen the message is sent. The consumer will note the current time the message is
+ * when the message is sent. The consumer will note the current time the message is
* received and will calculate the latency as follows
* latency = rcvdTime - msg.getJMSTimestamp()
*
@@ -55,13 +59,9 @@ import org.apache.qpid.thread.Threading;
* variance in latencies.
*
* Avg latency is measured by adding all latencies and dividing by the total msgs.
- * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount
*
* Throughput
* ===========
- * System throughput is calculated as follows
- * rcvdMsgCount/(rcvdTime - testStartTime)
- *
* Consumer rate is calculated as
* rcvdMsgCount/(rcvdTime - startTime)
*
@@ -81,130 +81,160 @@ public class PerfConsumer extends PerfBa
long minLatency = Long.MAX_VALUE;
long totalLatency = 0; // to calculate avg latency.
int rcvdMsgCount = 0;
- long testStartTime = 0; // to measure system throughput
long startTime = 0; // to measure consumer throughput
long rcvdTime = 0;
boolean transacted = false;
int transSize = 0;
+ boolean printStdDev = false;
+ List<Long> sample;
+
final Object lock = new Object();
- public PerfConsumer()
+ public PerfConsumer(String prefix)
{
- super();
+ super(prefix);
+ System.out.println("Consumer ID : " + id);
}
public void setUp() throws Exception
{
super.setUp();
consumer = session.createConsumer(dest);
+ System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n");
// Storing the following two for efficiency
transacted = params.isTransacted();
transSize = params.getTransactionSize();
+ printStdDev = params.isPrintStdDev();
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
+ sendMessageToController(m);
}
public void warmup()throws Exception
{
- System.out.println("Warming up......");
-
- boolean start = false;
- while (!start)
+ receiveFromController(OPCode.CONSUMER_STARTWARMUP);
+ Message msg = consumer.receive();
+ // This is to ensure we drain the queue before we start the actual test.
+ while ( msg != null)
{
- Message msg = consumer.receive();
- if (msg instanceof TextMessage)
+ if (msg.getBooleanProperty("End") == true)
{
- if (((TextMessage)msg).getText().equals("End"))
- {
- start = true;
- MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
- temp.send(session.createMessage());
- if (params.isTransacted())
- {
- session.commit();
- }
- temp.close();
- }
+ // It's more realistic for the consumer to signal this.
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
+ sendMessageToController(m);
}
+ msg = consumer.receive(1000);
+ }
+
+ if (params.isTransacted())
+ {
+ session.commit();
}
+
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
+ sendMessageToController(m);
+ consumer.setMessageListener(this);
}
public void startTest() throws Exception
{
- System.out.println("Starting test......");
- consumer.setMessageListener(this);
+ System.out.println("Consumer: " + id + " Starting test......" + "\n");
+ resetCounters();
}
- public void printResults() throws Exception
+ public void resetCounters()
{
- synchronized (lock)
+ rcvdMsgCount = 0;
+ maxLatency = 0;
+ minLatency = Long.MAX_VALUE;
+ totalLatency = 0;
+ if (printStdDev)
{
- lock.wait();
+ sample = null;
+ sample = new ArrayList<Long>(params.getMsgCount());
}
+ }
+
+ public void sendResults() throws Exception
+ {
+ receiveFromController(OPCode.CONSUMER_STOP);
double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
- double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000;
- double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000;
+ double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime);
+ double stdDev = 0.0;
+ if (printStdDev)
+ {
+ stdDev = calculateStdDev(avgLatency);
+ }
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal());
+ m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs());
+ m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs());
+ m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs());
+ m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs());
+ m.setDouble(CONS_RATE, consRate);
+ m.setLong(MSG_COUNT, rcvdMsgCount);
+ sendMessageToController(m);
+
System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
System.out.println(new StringBuilder("Consumer rate : ").
append(df.format(consRate)).
append(" msg/sec").toString());
- System.out.println(new StringBuilder("System Throughput : ").
- append(df.format(throughput)).
- append(" msg/sec").toString());
System.out.println(new StringBuilder("Avg Latency : ").
- append(df.format(avgLatency)).
+ append(df.format(avgLatency/Clock.convertToMiliSecs())).
append(" ms").toString());
System.out.println(new StringBuilder("Min Latency : ").
- append(minLatency).
+ append(df.format(minLatency/Clock.convertToMiliSecs())).
append(" ms").toString());
System.out.println(new StringBuilder("Max Latency : ").
- append(maxLatency).
+ append(df.format(maxLatency/Clock.convertToMiliSecs())).
append(" ms").toString());
- System.out.println("Completed the test......\n");
- }
-
- public void notifyCompletion(Destination replyTo) throws Exception
- {
- MessageProducer tmp = session.createProducer(replyTo);
- Message endMsg = session.createMessage();
- tmp.send(endMsg);
- if (params.isTransacted())
+ if (printStdDev)
{
- session.commit();
+ System.out.println(new StringBuilder("Std Dev : ").
+ append(stdDev/Clock.convertToMiliSecs()).toString());
}
- tmp.close();
}
- public void tearDown() throws Exception
+ public double calculateStdDev(double mean)
{
- consumer.close();
- session.close();
- con.close();
+ double v = 0;
+ for (double latency: sample)
+ {
+ v = v + Math.pow((latency-mean), 2);
+ }
+ v = v/sample.size();
+ return Math.round(Math.sqrt(v));
}
public void onMessage(Message msg)
{
try
{
- if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
+ // To figure out the decoding overhead of text
+ if (msgType == MessageType.TEXT)
{
- notifyCompletion(msg.getJMSReplyTo());
+ ((TextMessage)msg).getText();
+ }
- synchronized (lock)
- {
- lock.notifyAll();
- }
+ if (msg.getBooleanProperty("End"))
+ {
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
+ sendMessageToController(m);
}
else
{
- rcvdTime = System.currentTimeMillis();
+ rcvdTime = Clock.getTime();
rcvdMsgCount ++;
if (rcvdMsgCount == 1)
{
startTime = rcvdTime;
- testStartTime = msg.getJMSTimestamp();
}
if (transacted && (rcvdMsgCount % transSize == 0))
@@ -212,10 +242,14 @@ public class PerfConsumer extends PerfBa
session.commit();
}
- long latency = rcvdTime - msg.getJMSTimestamp();
+ long latency = rcvdTime - msg.getLongProperty(TIMESTAMP);
maxLatency = Math.max(maxLatency, latency);
minLatency = Math.min(minLatency, latency);
totalLatency = totalLatency + latency;
+ if (printStdDev)
+ {
+ sample.add(latency);
+ }
}
}
@@ -226,14 +260,21 @@ public class PerfConsumer extends PerfBa
}
- public void test()
+ public void run()
{
try
{
setUp();
warmup();
- startTest();
- printResults();
+ boolean nextIteration = true;
+ while (nextIteration)
+ {
+ System.out.println("=========================================================\n");
+ System.out.println("Consumer: " + id + " starting a new iteration ......\n");
+ startTest();
+ sendResults();
+ nextIteration = continueTest();
+ }
tearDown();
}
catch(Exception e)
@@ -242,26 +283,43 @@ public class PerfConsumer extends PerfBa
}
}
- public static void main(String[] args)
+ @Override
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public static void main(String[] args) throws InterruptedException
{
- final PerfConsumer cons = new PerfConsumer();
- Runnable r = new Runnable()
+ String scriptId = (args.length == 1) ? args[0] : "";
+ int conCount = Integer.getInteger("con_count",1);
+ final CountDownLatch testCompleted = new CountDownLatch(conCount);
+ for (int i=0; i < conCount; i++)
{
- public void run()
+
+ final PerfConsumer cons = new PerfConsumer(scriptId + i);
+ Runnable r = new Runnable()
{
- cons.test();
+ public void run()
+ {
+ cons.run();
+ testCompleted.countDown();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
}
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating consumer thread",e);
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
+ t.start();
+
}
- t.start();
+ testCompleted.await();
+ System.out.println("Consumers have completed the test......\n");
}
}
\ No newline at end of file
Modified: qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java (original)
+++ qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java Thu Oct 20 18:42:46 2011
@@ -23,13 +23,15 @@ package org.apache.qpid.tools;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
+import javax.jms.MapMessage;
import javax.jms.Message;
-import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
/**
@@ -51,38 +53,52 @@ import org.apache.qpid.thread.Threading;
* System throughput and latencies calculated by the PerfConsumer are more realistic
* numbers.
*
+ * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs
+ * I have done so far, it seems quite useful to compute the producer rate as it gives an
+ * indication of how the system behaves. For ex if there is a gap between producer and consumer rates
+ * you could clearly see the higher latencies and when producer and consumer rates are very close,
+ * latency is good.
+ *
*/
public class PerfProducer extends PerfBase
{
+ private static long SEC = 60000;
+
MessageProducer producer;
Message msg;
- byte[] payload;
- List<byte[]> payloads;
+ Object payload;
+ List<Object> payloads;
boolean cacheMsg = false;
boolean randomMsgSize = false;
boolean durable = false;
Random random;
int msgSizeRange = 1024;
-
- public PerfProducer()
+ boolean rateLimitProducer = false;
+ double rateFactor = 0.4;
+ double rate = 0.0;
+
+ public PerfProducer(String prefix)
{
- super();
+ super(prefix);
+ System.out.println("Producer ID : " + id);
}
public void setUp() throws Exception
{
super.setUp();
- feedbackDest = session.createTemporaryQueue();
-
durable = params.isDurable();
-
+ rateLimitProducer = params.getRate() > 0 ? true : false;
+ if (rateLimitProducer)
+ {
+ System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec");
+ }
+
// if message caching is enabled we pre create the message
// else we pre create the payload
if (params.isCacheMessage())
{
cacheMsg = true;
-
- msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
+ msg = createMessage(createPayload(params.getMsgSize()));
msg.setJMSDeliveryMode(durable?
DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT
@@ -93,21 +109,52 @@ public class PerfProducer extends PerfBa
random = new Random(20080921);
randomMsgSize = true;
msgSizeRange = params.getMsgSize();
- payloads = new ArrayList<byte[]>(msgSizeRange);
-
+ payloads = new ArrayList<Object>(msgSizeRange);
+
for (int i=0; i < msgSizeRange; i++)
{
- payloads.add(MessageFactory.createMessagePayload(i).getBytes());
+ payloads.add(createPayload(i));
}
- }
+ }
else
{
- payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
+ payload = createPayload(params.getMsgSize());
}
producer = session.createProducer(dest);
+ System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName());
producer.setDisableMessageID(params.isDisableMessageID());
producer.setDisableMessageTimestamp(params.isDisableTimestamp());
+
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
+ sendMessageToController(m);
+ }
+
+ Object createPayload(int size)
+ {
+ if (msgType == MessageType.TEXT)
+ {
+ return MessageFactory.createMessagePayload(size);
+ }
+ else
+ {
+ return MessageFactory.createMessagePayload(size).getBytes();
+ }
+ }
+
+ Message createMessage(Object payload) throws Exception
+ {
+ if (msgType == MessageType.TEXT)
+ {
+ return session.createTextMessage((String)payload);
+ }
+ else
+ {
+ BytesMessage m = session.createBytesMessage();
+ m.writeBytes((byte[])payload);
+ return m;
+ }
}
protected Message getNextMessage() throws Exception
@@ -117,117 +164,130 @@ public class PerfProducer extends PerfBa
return msg;
}
else
- {
- msg = session.createBytesMessage();
-
+ {
+ Message m;
+
if (!randomMsgSize)
{
- ((BytesMessage)msg).writeBytes(payload);
+ m = createMessage(payload);
}
else
{
- ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange)));
+ m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
}
- msg.setJMSDeliveryMode(durable?
+ m.setJMSDeliveryMode(durable?
DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT
);
- return msg;
+ return m;
}
}
public void warmup()throws Exception
{
- System.out.println("Warming up......");
- MessageConsumer tmp = session.createConsumer(feedbackDest);
+ receiveFromController(OPCode.PRODUCER_STARTWARMUP);
+ System.out.println("Producer: " + id + " Warming up......");
for (int i=0; i < params.getWarmupCount() -1; i++)
{
producer.send(getNextMessage());
}
- Message msg = session.createTextMessage("End");
- msg.setJMSReplyTo(feedbackDest);
- producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.receive();
+ sendEndMessage();
if (params.isTransacted())
{
session.commit();
}
-
- tmp.close();
}
public void startTest() throws Exception
{
- System.out.println("Starting test......");
+ resetCounters();
+ receiveFromController(OPCode.PRODUCER_START);
int count = params.getMsgCount();
boolean transacted = params.isTransacted();
int tranSize = params.getTransactionSize();
- long start = System.currentTimeMillis();
+ long limit = (long)(params.getRate() * rateFactor); // in msecs
+ long timeLimit = (long)(SEC * rateFactor); // in msecs
+
+ long start = Clock.getTime(); // defaults to nano secs
+ long interval = start;
for(int i=0; i < count; i++ )
{
Message msg = getNextMessage();
- msg.setJMSTimestamp(System.currentTimeMillis());
+ msg.setLongProperty(TIMESTAMP, Clock.getTime());
producer.send(msg);
if ( transacted && ((i+1) % tranSize == 0))
{
session.commit();
}
+
+ if (rateLimitProducer && i%limit == 0)
+ {
+ long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs
+ if (elapsed < timeLimit)
+ {
+ Thread.sleep(elapsed);
+ }
+ interval = Clock.getTime();
+
+ }
+ }
+ sendEndMessage();
+ if ( transacted)
+ {
+ session.commit();
}
- long time = System.currentTimeMillis() - start;
- double rate = ((double)count/(double)time)*1000;
+ long time = Clock.getTime() - start;
+ rate = (double)count*Clock.convertToSecs()/(double)time;
System.out.println(new StringBuilder("Producer rate: ").
append(df.format(rate)).
append(" msg/sec").
toString());
}
- public void waitForCompletion() throws Exception
+ public void resetCounters()
{
- MessageConsumer tmp = session.createConsumer(feedbackDest);
- Message msg = session.createTextMessage("End");
- msg.setJMSReplyTo(feedbackDest);
- producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
- tmp.receive();
+ }
- if (params.isTransacted())
- {
- session.commit();
- }
+ public void sendEndMessage() throws Exception
+ {
+ Message msg = session.createMessage();
+ msg.setBooleanProperty("End", true);
+ producer.send(msg);
+ }
- tmp.close();
- System.out.println("Consumer has completed the test......");
+ public void sendResults() throws Exception
+ {
+ MapMessage msg = controllerSession.createMapMessage();
+ msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal());
+ msg.setDouble(PROD_RATE, rate);
+ sendMessageToController(msg);
}
+ @Override
public void tearDown() throws Exception
{
- producer.close();
- session.close();
- con.close();
+ super.tearDown();
}
- public void test()
+ public void run()
{
try
{
setUp();
warmup();
- startTest();
- waitForCompletion();
+ boolean nextIteration = true;
+ while (nextIteration)
+ {
+ System.out.println("=========================================================\n");
+ System.out.println("Producer: " + id + " starting a new iteration ......\n");
+ startTest();
+ sendResults();
+ nextIteration = continueTest();
+ }
tearDown();
}
catch(Exception e)
@@ -236,27 +296,63 @@ public class PerfProducer extends PerfBa
}
}
-
- public static void main(String[] args)
+ public void startControllerIfNeeded()
{
- final PerfProducer prod = new PerfProducer();
- Runnable r = new Runnable()
+ if (!params.isExternalController())
{
- public void run()
+ final PerfTestController controller = new PerfTestController();
+ Runnable r = new Runnable()
{
- prod.test();
+ public void run()
+ {
+ controller.run();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
}
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
+ catch(Exception e)
+ {
+ throw new Error("Error creating controller thread",e);
+ }
+ t.start();
}
- catch(Exception e)
- {
- throw new Error("Error creating producer thread",e);
+ }
+
+
+ public static void main(String[] args) throws InterruptedException
+ {
+ String scriptId = (args.length == 1) ? args[0] : "";
+ int conCount = Integer.getInteger("con_count",1);
+ final CountDownLatch testCompleted = new CountDownLatch(conCount);
+ for (int i=0; i < conCount; i++)
+ {
+ final PerfProducer prod = new PerfProducer(scriptId + i);
+ prod.startControllerIfNeeded();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ prod.run();
+ testCompleted.countDown();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating producer thread",e);
+ }
+ t.start();
}
- t.start();
+ testCompleted.await();
+ System.out.println("Producers have completed the test......");
}
}
\ No newline at end of file
Modified: qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java (original)
+++ qpid/branches/QPID-2519/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java Thu Oct 20 18:42:46 2011
@@ -25,25 +25,25 @@ import javax.jms.Session;
public class TestParams
{
/*
- * By default the connection URL is used.
+ * By default the connection URL is used.
* This allows a user to easily specify a fully fledged URL any given property.
* Ex. SSL parameters
- *
+ *
* By providing a host & port allows a user to simply override the URL.
* This allows to create multiple clients in test scripts easily,
- * without having to deal with the long URL format.
+ * without having to deal with the long URL format.
*/
private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
-
+
private String host = "";
-
+
private int port = -1;
private String address = "queue; {create : always}";
private int msg_size = 1024;
- private int msg_type = 1; // not used yet
+ private int random_msg_size_start_from = 1;
private boolean cacheMessage = false;
@@ -62,19 +62,28 @@ public class TestParams
private int msg_count = 10;
private int warmup_count = 1;
-
+
private boolean random_msg_size = false;
+ private String msgType = "bytes";
+
+ private boolean printStdDev = false;
+
+ private long rate = -1;
+
+ private boolean externalController = false;
+
+ private boolean useUniqueDest = false; // useful when using multiple connections.
+
public TestParams()
{
-
+
url = System.getProperty("url",url);
host = System.getProperty("host","");
port = Integer.getInteger("port", -1);
- address = System.getProperty("address","queue");
+ address = System.getProperty("address",address);
msg_size = Integer.getInteger("msg_size", 1024);
- msg_type = Integer.getInteger("msg_type",1);
cacheMessage = Boolean.getBoolean("cache_msg");
disableMessageID = Boolean.getBoolean("disableMessageID");
disableTimestamp = Boolean.getBoolean("disableTimestamp");
@@ -85,6 +94,12 @@ public class TestParams
msg_count = Integer.getInteger("msg_count",msg_count);
warmup_count = Integer.getInteger("warmup_count",warmup_count);
random_msg_size = Boolean.getBoolean("random_msg_size");
+ msgType = System.getProperty("msg_type","bytes");
+ printStdDev = Boolean.getBoolean("print_std_dev");
+ rate = Long.getLong("rate",-1);
+ externalController = Boolean.getBoolean("ext_controller");
+ useUniqueDest = Boolean.getBoolean("use_unique_dest");
+ random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1);
}
public String getUrl()
@@ -122,9 +137,9 @@ public class TestParams
return msg_size;
}
- public int getMsgType()
+ public int getRandomMsgSizeStartFrom()
{
- return msg_type;
+ return random_msg_size_start_from;
}
public boolean isDurable()
@@ -161,10 +176,39 @@ public class TestParams
{
return disableTimestamp;
}
-
+
public boolean isRandomMsgSize()
{
return random_msg_size;
}
+ public String getMessageType()
+ {
+ return msgType;
+ }
+
+ public boolean isPrintStdDev()
+ {
+ return printStdDev;
+ }
+
+ public long getRate()
+ {
+ return rate;
+ }
+
+ public boolean isExternalController()
+ {
+ return externalController;
+ }
+
+ public void setAddress(String addr)
+ {
+ address = addr;
+ }
+
+ public boolean isUseUniqueDests()
+ {
+ return useUniqueDest;
+ }
}
Propchange: qpid/branches/QPID-2519/packaging/windows/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 20 18:42:46 2011
@@ -1,4 +1,7 @@
+/qpid/branches/0.10/qpid/packaging/windows:1103083
/qpid/branches/0.5.x-dev/qpid/packaging/windows:892761,894875
/qpid/branches/0.6-release-windows-installer/packaging/windows:926803
-/qpid/branches/0.6-release-windows-installer/qpid/packaging/windows:926803,926865,927233
+/qpid/branches/0.6-release-windows-installer/qpid/packaging/windows:926865,927233
/qpid/branches/java-network-refactor/qpid/packaging/windows:805429-825319
+/qpid/branches/qpid-2935/qpid/packaging/windows:1061302-1072333
+/qpid/trunk/qpid/packaging/windows:1072051-1185907
Modified: qpid/branches/QPID-2519/packaging/windows/INSTALL_NOTES.html
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/packaging/windows/INSTALL_NOTES.html?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/packaging/windows/INSTALL_NOTES.html (original)
+++ qpid/branches/QPID-2519/packaging/windows/INSTALL_NOTES.html Thu Oct 20 18:42:46 2011
@@ -1,11 +1,11 @@
<html>
<head>
-<title>Apache Qpid C++ 0.9 Installation Notes</title>
+<title>Apache Qpid C++ 0.10 Installation Notes</title>
</head>
<body>
-<H1>Apache Qpid C++ 0.9 Installation Notes</H1>
+<H1>Apache Qpid C++ 0.10 Installation Notes</H1>
-<p>Thank you for installing Apache Qpid version 0.9 for Windows.
+<p>Thank you for installing Apache Qpid version 0.10 for Windows.
If the requisite features were installed, you can now run a broker,
use the example programs, and design your own messaging programs while
reading the Qpid C++ API reference documentation.</p>
Modified: qpid/branches/QPID-2519/packaging/windows/installer.proj
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/packaging/windows/installer.proj?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/packaging/windows/installer.proj (original)
+++ qpid/branches/QPID-2519/packaging/windows/installer.proj Thu Oct 20 18:42:46 2011
@@ -20,7 +20,7 @@
<!--
Packaging script for Apache Qpid on Windows
- Builds the C++ and WCF components, and packages those along with user
+ Builds the C++, .NET, and WCF components, and packages those along with user
documentation and the python pieces needed to generate QMF stuff.
-->
@@ -87,7 +87,8 @@
<ItemGroup>
<WcfProjects Include="$(source_root)\wcf\src\Apache\Qpid\**\*.csproj"/>
- <WcfArtifacts Include="$(source_root)\wcf\src\Apache\Qpid\Channel\bin\Release\*.dll"/>
+ <WcfArtifacts Include="$(source_root)\wcf\src\Apache\Qpid\Channel\bin\Release\Apache.Qpid.Channel.dll"/>
+ <WcfArtifacts Include="$(source_root)\wcf\src\Apache\Qpid\Channel\bin\Release\Apache.Qpid.Interop.dll"/>
<WcfExamples Include="$(source_root)\wcf\samples\**\*"
Exclude="$(source_root)\wcf\samples\**\.svn\**"/>
</ItemGroup>
@@ -217,9 +218,9 @@
OutputFile="boost_dlls.wxs" />
<Candle
ToolPath="$(WixToolPath)"
- DefineConstants="qpidc_version=0.9"
+ DefineConstants="qpidc_version=0.13"
InstallerPlatform="x64"
- OutputFile="qpidc-0.9-x64.msi" />
+ OutputFile="qpidc-0.13-x64.msi" />
-->
<Exec
Command="heat dir $(staging_dir)\include\qpid -var var.qpid_headers_dir -dr QpidInclude -gg -cg group_QpidHeaders -out qpid_headers.wxs" />
@@ -229,13 +230,19 @@
<Exec
Command="heat dir $(staging_dir)\bin\boost -var var.boost_dll_dir -dr QpidBin -srd -gg -cg group_BoostDlls -sw5150 -out boost_dlls.wxs" />
<Exec
- Command="heat dir $(staging_dir)\examples -var var.examples_dir -dr INSTALLLOCATION -gg -cg group_Examples -out examples.wxs" />
+ Command="heat file $(staging_dir)\examples\README.txt -var var.examples_dir -dr INSTALLLOCATION -gg -cg group_Examples -out examples_README.wxs" />
+ <Exec
+ Command="heat file $(staging_dir)\examples\examples.sln -var var.examples_dir -dr INSTALLLOCATION -gg -cg group_Examples -out examples_examples.wxs" />
+ <Exec
+ Command="heat dir $(staging_dir)\examples\messaging -var var.examples_dir -dr INSTALLLOCATION -gg -cg group_Examples -out examples_messaging.wxs" />
+ <Exec
+ Command="heat dir $(staging_dir)\examples\qmf-console -var var.examples_dir -dr INSTALLLOCATION -gg -cg group_Examples -out examples_qmf-console.wxs" />
<Exec
Command="heat dir $(staging_dir)\docs\api -var var.api_docs_dir -dr QpidDoc -gg -cg group_APIDocs -out api_docs.wxs" />
<Exec
- Command="candle -dqpidc_version=0.9 -dProgramFiles=$(ProgramFiles) -dstaging_dir=$(staging_dir) -dqpid_headers_dir=$(staging_dir)\include\qpid -dboost_headers_dir=$(staging_dir)\include\boost -dboost_dll_dir=$(staging_dir)\bin\boost -dexamples_dir=$(staging_dir)\examples -dapi_docs_dir=$(staging_dir)\docs\api -ext WiXNetFxExtension qpidc.wxs qpid_headers.wxs boost_headers.wxs boost_dlls.wxs examples.wxs api_docs.wxs -arch $(Architecture)" />
+ Command="candle -dqpidc_version=0.13 -dProgramFiles=$(ProgramFiles) -dstaging_dir=$(staging_dir) -dqpid_headers_dir=$(staging_dir)\include\qpid -dboost_headers_dir=$(staging_dir)\include\boost -dboost_dll_dir=$(staging_dir)\bin\boost -dexamples_dir=$(staging_dir)\examples -dapi_docs_dir=$(staging_dir)\docs\api -ext WiXNetFxExtension qpidc.wxs qpid_headers.wxs boost_headers.wxs boost_dlls.wxs examples.wxs api_docs.wxs -arch $(Architecture)" />
<Exec
- Command="light -ext WiXNetFxExtension -ext WixUtilExtension -ext WixUIExtension -cultures:en-us -out qpidc-0.9-$(Architecture).msi qpidc.wixobj qpid_headers.wixobj boost_headers.wixobj boost_dlls.wixobj examples.wixobj api_docs.wixobj" />
+ Command="light -ext WiXNetFxExtension -ext WixUtilExtension -ext WixUIExtension -cultures:en-us -out qpidc-0.13-$(Architecture).msi qpidc.wixobj qpid_headers.wixobj boost_headers.wixobj boost_dlls.wixobj examples_README.wixobj examples_examples.wixobj examples_messaging.wixobj examples_qmf-console.wixobj api_docs.wixobj" />
</Target>
</Project>
Propchange: qpid/branches/QPID-2519/python/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 20 18:42:46 2011
@@ -1,3 +1,5 @@
/qpid/branches/0.5.x-dev/qpid/python:892761,894875
/qpid/branches/java-network-refactor/qpid/python:805429-825319
/qpid/branches/qmfv2/qpid/python:902858,902894
+/qpid/branches/qpid-2935/qpid/python:1061302-1072333
+/qpid/trunk/qpid/python:1072051-1185907
Propchange: qpid/branches/QPID-2519/python/examples/api/spout
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 20 18:42:46 2011
@@ -1,2 +1,4 @@
/qpid/branches/qmfv2/qpid/python/examples/api/spout:902858,902894
+/qpid/branches/qpid-2935/qpid/python/examples/api/spout:1061302-1072333
/qpid/branches/qpid.rnr/python/examples/api/spout:894071-896158
+/qpid/trunk/qpid/python/examples/api/spout:1072051-1185907
Modified: qpid/branches/QPID-2519/python/qpid/client.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/python/qpid/client.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/python/qpid/client.py (original)
+++ qpid/branches/QPID-2519/python/qpid/client.py Thu Oct 20 18:42:46 2011
@@ -106,7 +106,7 @@ class Client:
try:
id = None
for i in xrange(1, 64*1024):
- if not self.sessions.has_key(id):
+ if not self.sessions.has_key(i):
id = i
break
finally:
Modified: qpid/branches/QPID-2519/python/qpid/codec010.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/python/qpid/codec010.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/python/qpid/codec010.py (original)
+++ qpid/branches/QPID-2519/python/qpid/codec010.py Thu Oct 20 18:42:46 2011
@@ -17,7 +17,7 @@
# under the License.
#
-import datetime
+import datetime, string
from packer import Packer
from datatypes import serial, timestamp, RangedSet, Struct, UUID
from ops import Compound, PRIMITIVE, COMPOUND
@@ -241,15 +241,20 @@ class Codec(Packer):
v = sc.read_primitive(type)
result[k] = v
return result
+
+ def _write_map_elem(self, k, v):
+ type = self.encoding(v)
+ sc = StringCodec()
+ sc.write_str8(k)
+ sc.write_uint8(type.CODE)
+ sc.write_primitive(type, v)
+ return sc.encoded
+
def write_map(self, m):
sc = StringCodec()
if m is not None:
sc.write_uint32(len(m))
- for k, v in m.items():
- type = self.encoding(v)
- sc.write_str8(k)
- sc.write_uint8(type.CODE)
- sc.write_primitive(type, v)
+ sc.write(string.joinfields(map(self._write_map_elem, m.keys(), m.values()), ""))
self.write_vbin32(sc.encoded)
def read_array(self):
Propchange: qpid/branches/QPID-2519/python/qpid/concurrency.py
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 20 18:42:46 2011
@@ -1,2 +1,4 @@
/qpid/branches/qmfv2/qpid/python/qpid/concurrency.py:902858,902894
+/qpid/branches/qpid-2935/qpid/python/qpid/concurrency.py:1061302-1072333
/qpid/branches/qpid.rnr/python/qpid/concurrency.py:894071-896158
+/qpid/trunk/qpid/python/qpid/concurrency.py:1072051-1185907
Modified: qpid/branches/QPID-2519/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/python/qpid/messaging/driver.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/python/qpid/messaging/driver.py (original)
+++ qpid/branches/QPID-2519/python/qpid/messaging/driver.py Thu Oct 20 18:42:46 2011
@@ -66,7 +66,7 @@ class Attachment:
# XXX
-DURABLE_DEFAULT=True
+DURABLE_DEFAULT=False
# XXX
@@ -526,7 +526,7 @@ class Driver:
rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
trans = transports.TRANSPORTS.get(self.connection.transport)
if trans:
- self._transport = trans(host, port)
+ self._transport = trans(self.connection, host, port)
else:
raise ConnectError("no such transport: %s" % self.connection.transport)
if self._retrying and self._reconnect_log:
@@ -828,8 +828,9 @@ class Engine:
self._closing = True
def attach(self, ssn):
+ if ssn.closed: return
sst = self._attachments.get(ssn)
- if sst is None and not ssn.closed:
+ if sst is None:
for i in xrange(0, self.channel_max):
if not self._sessions.has_key(i):
ch = i
@@ -930,6 +931,7 @@ class Engine:
def resolve_declare(self, sst, lnk, dir, action):
declare = lnk.options.get("create") in ("always", dir)
+ assrt = lnk.options.get("assert") in ("always", dir)
def do_resolved(type, subtype):
err = None
if type is None:
@@ -938,7 +940,12 @@ class Engine:
else:
err = NotFound(text="no such queue: %s" % lnk.name)
else:
- action(type, subtype)
+ if assrt:
+ expected = lnk.options.get("node", {}).get("type")
+ if expected and type != expected:
+ err = AssertionFailed(text="expected %s, got %s" % (expected, type))
+ if err is None:
+ action(type, subtype)
if err:
tgt = lnk.target
Modified: qpid/branches/QPID-2519/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/python/qpid/messaging/endpoints.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/python/qpid/messaging/endpoints.py (original)
+++ qpid/branches/QPID-2519/python/qpid/messaging/endpoints.py Thu Oct 20 18:42:46 2011
@@ -158,6 +158,7 @@ class Connection(Endpoint):
self.reconnect_log = options.get("reconnect_log", True)
self.address_ttl = options.get("address_ttl", 60)
+ self.tcp_nodelay = options.get("tcp_nodelay", False)
self.options = options
@@ -197,7 +198,7 @@ class Connection(Endpoint):
return result
def check_closed(self):
- if self.closed:
+ if not self._connected:
self._condition.gc()
raise ConnectionClosed()
@@ -1006,9 +1007,9 @@ class Receiver(Endpoint, object):
self.draining = True
self._wakeup()
self._ecwait(lambda: not self.draining)
+ msg = self.session._get(self, timeout=0)
self._grant()
self._wakeup()
- msg = self.session._get(self, timeout=0)
if msg is None:
raise Empty()
elif self._capacity not in (0, UNLIMITED.value):
Modified: qpid/branches/QPID-2519/python/qpid/messaging/transports.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/python/qpid/messaging/transports.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/python/qpid/messaging/transports.py (original)
+++ qpid/branches/QPID-2519/python/qpid/messaging/transports.py Thu Oct 20 18:42:46 2011
@@ -17,18 +17,23 @@
# under the License.
#
+import socket
from qpid.util import connect
TRANSPORTS = {}
-class tcp:
+class SocketTransport:
- def __init__(self, host, port):
+ def __init__(self, conn, host, port):
self.socket = connect(host, port)
+ if conn.tcp_nodelay:
+ self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
def fileno(self):
return self.socket.fileno()
+class tcp(SocketTransport):
+
def reading(self, reading):
return reading
@@ -52,17 +57,14 @@ try:
except ImportError:
pass
else:
- class tls:
+ class tls(SocketTransport):
- def __init__(self, host, port):
- self.socket = connect(host, port)
+ def __init__(self, conn, host, port):
+ SocketTransport.__init__(self, conn, host, port)
self.tls = wrap_socket(self.socket)
self.socket.setblocking(0)
self.state = None
- def fileno(self):
- return self.socket.fileno()
-
def reading(self, reading):
if self.state is None:
return reading
Modified: qpid/branches/QPID-2519/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/python/qpid/tests/messaging/endpoints.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/branches/QPID-2519/python/qpid/tests/messaging/endpoints.py Thu Oct 20 18:42:46 2011
@@ -46,6 +46,10 @@ class SetupTests(Base):
self.conn.open()
self.ping(self.conn.session())
+ def testTcpNodelay(self):
+ self.conn = Connection.establish(self.broker, tcp_nodelay=True)
+ assert self.conn._driver._transport.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
+
def testConnectError(self):
try:
# Specifying port 0 yields a bad address on Windows; port 4 is unassigned
@@ -111,8 +115,8 @@ class SetupTests(Base):
class flaky:
- def __init__(self, host, port):
- self.real = real(host, port)
+ def __init__(self, conn, host, port):
+ self.real = real(conn, host, port)
self.sent_count = 0
self.recv_count = 0
@@ -186,6 +190,9 @@ class ConnectionTests(Base):
def setup_connection(self):
return Connection.establish(self.broker, **self.connection_options())
+ def testCheckClosed(self):
+ assert not self.conn.check_closed()
+
def testSessionAnon(self):
ssn1 = self.conn.session()
ssn2 = self.conn.session()
@@ -248,8 +255,8 @@ class ConnectionTests(Base):
class hangable:
- def __init__(self, host, port):
- self.tcp = TRANSPORTS["tcp"](host, port)
+ def __init__(self, conn, host, port):
+ self.tcp = TRANSPORTS["tcp"](conn, host, port)
self.hung = False
def hang(self):
@@ -1179,6 +1186,16 @@ test-link-bindings-queue; {
snd.send(m)
self.drain(qrcv, expected=msgs)
+ def testAssert1(self):
+ try:
+ snd = self.ssn.sender("amq.topic; {assert: always, node: {type: queue}}")
+ assert 0, "assertion failed to trigger"
+ except AssertionFailed, e:
+ pass
+
+ def testAssert2(self):
+ snd = self.ssn.sender("amq.topic; {assert: always}")
+
NOSUCH_Q = "this-queue-should-not-exist"
UNPARSEABLE_ADDR = "name/subject; {bad options"
UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
Modified: qpid/branches/QPID-2519/python/qpid/util.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/python/qpid/util.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/python/qpid/util.py (original)
+++ qpid/branches/QPID-2519/python/qpid/util.py Thu Oct 20 18:42:46 2011
@@ -39,12 +39,17 @@ except ImportError:
self.sock.close()
def connect(host, port):
- sock = socket.socket()
- sock.connect((host, port))
- sock.setblocking(1)
- # XXX: we could use this on read, but we'd have to put write in a
- # loop as well
- # sock.settimeout(1)
+ for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
+ af, socktype, proto, canonname, sa = res
+ sock = socket.socket(af, socktype, proto)
+ try:
+ sock.connect(sa)
+ break
+ except socket.error, msg:
+ sock.close
+ else:
+ # If we got here then we couldn't connect (yet)
+ raise
return sock
def listen(host, port, predicate = lambda: True, bound = lambda: None):
@@ -101,15 +106,23 @@ def fill(text, indent, heading = None):
class URL:
RE = re.compile(r"""
- # [ <scheme>:// ] [ <user> [ / <password> ] @] <host> [ :<port> ]
- ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: / ([^:/@]+) )? @)? ([^@:/]+) (?: :([0-9]+))?$
-""", re.X)
+ # [ <scheme>:// ] [ <user> [ / <password> ] @] ( <host4> | \[ <host6> \] ) [ :<port> ]
+ ^ (?: ([^:/@]+)://)? (?: ([^:/@]+) (?: / ([^:/@]+) )? @)? (?: ([^@:/\[]+) | \[ ([a-f0-9:.]+) \] ) (?: :([0-9]+))?$
+""", re.X | re.I)
AMQPS = "amqps"
AMQP = "amqp"
- def __init__(self, s):
- if isinstance(s, URL):
+ def __init__(self, s=None, **kwargs):
+ if s is None:
+ self.scheme = kwargs.get('scheme', None)
+ self.user = kwargs.get('user', None)
+ self.password = kwargs.get('password', None)
+ self.host = kwargs.get('host', None)
+ self.port = kwargs.get('port', None)
+ if self.host is None:
+ raise ValueError('Host required for url')
+ elif isinstance(s, URL):
self.scheme = s.scheme
self.user = s.user
self.password = s.password
@@ -119,7 +132,8 @@ class URL:
match = URL.RE.match(s)
if match is None:
raise ValueError(s)
- self.scheme, self.user, self.password, self.host, port = match.groups()
+ self.scheme, self.user, self.password, host4, host6, port = match.groups()
+ self.host = host4 or host6
if port is None:
self.port = None
else:
@@ -137,11 +151,25 @@ class URL:
if self.password:
s += "/%s" % self.password
s += "@"
- s += self.host
+ if ':' not in self.host:
+ s += self.host
+ else:
+ s += "[%s]" % self.host
if self.port:
s += ":%s" % self.port
return s
+ def __eq__(self, url):
+ if isinstance(url, basestring):
+ url = URL(url)
+ return \
+ self.scheme==url.scheme and \
+ self.user==url.user and self.password==url.password and \
+ self.host==url.host and self.port==url.port
+
+ def __ne__(self, url):
+ return not self.__eq__(url)
+
def default(value, default):
if value is None:
return default
Modified: qpid/branches/QPID-2519/python/setup.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/python/setup.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/python/setup.py (original)
+++ qpid/branches/QPID-2519/python/setup.py Thu Oct 20 18:42:46 2011
@@ -298,7 +298,7 @@ class install_lib(_install_lib):
return outfiles + extra
setup(name="qpid-python",
- version="0.9",
+ version="0.13",
author="Apache Qpid",
author_email="dev@qpid.apache.org",
packages=["mllib", "qpid", "qpid.messaging", "qpid.tests",
Modified: qpid/branches/QPID-2519/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/specs/management-schema.xml?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/specs/management-schema.xml (original)
+++ qpid/branches/QPID-2519/specs/management-schema.xml Thu Oct 20 18:42:46 2011
@@ -92,6 +92,7 @@
<arg name="srcQueue" dir="I" type="sstr" desc="Source queue"/>
<arg name="destQueue" dir="I" type="sstr" desc="Destination queue"/>
<arg name="qty" dir="I" type="uint32" desc="# of messages to move. 0 means all messages"/>
+ <arg name="filter" dir="I" type="map" default="{}" desc="if specified, move only those messages matching this filter"/>
</method>
<method name="setLogLevel" desc="Set the log level">
@@ -102,6 +103,34 @@
<arg name="level" dir="O" type="sstr"/>
</method>
+ <method name="getTimestampConfig" desc="Get the message timestamping configuration">
+ <arg name="receive" dir="O" type="bool" desc="True if received messages are timestamped."/>
+ </method>
+
+ <method name="setTimestampConfig" desc="Set the message timestamping configuration">
+ <arg name="receive" dir="I" type="bool" desc="Set true to enable timestamping received messages."/>
+ </method>
+
+ <method name="create" desc="Create an object of the specified type">
+ <arg name="type" dir="I" type="sstr" desc="The type of object to create"/>
+ <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/>
+ <arg name="properties" dir="I" type="map" desc="Type specific object properties"/>
+ <arg name="strict" dir="I" type="bool" desc="If specified, treat unrecognised object properties as an error"/>
+ </method>
+
+ <method name="delete" desc="Delete an object of the specified type">
+ <arg name="type" dir="I" type="sstr" desc="The type of object to delete"/>
+ <arg name="name" dir="I" type="sstr" desc="The name of the object to delete"/>
+ <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/>
+ </method>
+
+ <method name="query" desc="Query the current state of an object.">
+ <arg name="type" dir="I" type="sstr" desc="The type of object to query."/>
+ <arg name="name" dir="I" type="sstr" desc="The name of the object to query"/>
+ <arg name="results" dir="O" type="map" desc="A snapshot of the object's state."/>
+ </method>
+
+
</class>
<!--
@@ -150,8 +179,8 @@
<statistic name="msgTxnDequeues" type="count64" unit="message" desc="Transactional messages dequeued"/>
<statistic name="msgPersistEnqueues" type="count64" unit="message" desc="Persistent messages enqueued"/>
<statistic name="msgPersistDequeues" type="count64" unit="message" desc="Persistent messages dequeued"/>
- <statistic name="msgDepth" type="count32" unit="message" desc="Current size of queue in messages" assign="msgTotalEnqueues - msgTotalDequeues"/>
- <statistic name="byteDepth" type="count32" unit="octet" desc="Current size of queue in bytes" assign="byteTotalEnqueues - byteTotalDequeues"/>
+ <statistic name="msgDepth" type="count64" unit="message" desc="Current size of queue in messages" assign="msgTotalEnqueues - msgTotalDequeues"/>
+ <statistic name="byteDepth" type="count64" unit="octet" desc="Current size of queue in bytes" assign="byteTotalEnqueues - byteTotalDequeues"/>
<statistic name="byteTotalEnqueues" type="count64" unit="octet" desc="Total messages enqueued"/>
<statistic name="byteTotalDequeues" type="count64" unit="octet" desc="Total messages dequeued"/>
<statistic name="byteTxnEnqueues" type="count64" unit="octet" desc="Transactional messages enqueued"/>
@@ -162,15 +191,19 @@
<statistic name="bindingCount" type="hilo32" unit="binding" desc="Current bindings"/>
<statistic name="unackedMessages" type="hilo32" unit="message" desc="Messages consumed but not yet acked"/>
<statistic name="messageLatency" type="mmaTime" unit="nanosecond" desc="Broker latency through this queue"/>
+ <statistic name="flowStopped" type="bool" desc="Flow control active."/>
+ <statistic name="flowStoppedCount" type="count32" desc="Number of times flow control was activated for this queue"/>
<method name="purge" desc="Discard all or some messages on a queue">
<arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/>
+ <arg name="filter" dir="I" type="map" default="{}" desc="if specified, purge only those messages matching this filter"/>
</method>
<method name="reroute" desc="Remove all or some messages on this queue and route them to an exchange">
<arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/>
<arg name="useAltExchange" dir="I" type="bool" desc="Iff true, use the queue's configured alternate exchange; iff false, use exchange named in the 'exchange' argument"/>
<arg name="exchange" dir="I" type="sstr" desc="Name of the exchange to route the messages through"/>
+ <arg name="filter" dir="I" type="map" default="{}" desc="if specified, reroute only those messages matching this filter"/>
</method>
</class>
@@ -239,13 +272,16 @@
<property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/>
<property name="address" type="sstr" access="RC" index="y"/>
<property name="incoming" type="bool" access="RC"/>
- <property name="SystemConnection" type="bool" access="RC" desc="Infrastucture/ Inter-system connection (Cluster, Federation, ...)"/>
+ <property name="SystemConnection" type="bool" access="RC" desc="Infrastructure/ Inter-system connection (Cluster, Federation, ...)"/>
+ <property name="userProxyAuth" type="bool" access="RO" desc="Authorization to proxy for users not on broker"/>
<property name="federationLink" type="bool" access="RO" desc="Is this a federation link"/>
<property name="authIdentity" type="sstr" access="RO" desc="authId of connection if authentication enabled"/>
<property name="remoteProcessName" type="lstr" access="RO" optional="y" desc="Name of executable running as remote client"/>
<property name="remotePid" type="uint32" access="RO" optional="y" desc="Process ID of remote client"/>
<property name="remoteParentPid" type="uint32" access="RO" optional="y" desc="Parent Process ID of remote client"/>
<property name="shadow" type="bool" access="RO" desc="True for shadow connections"/>
+ <property name="saslMechanism" type="sstr" access="RO" desc="SASL mechanism"/>
+ <property name="saslSsf" type="uint16" access="RO" desc="SASL security strength factor"/>
<statistic name="closing" type="bool" desc="This client is closing by management request"/>
<statistic name="framesFromClient" type="count64"/>
<statistic name="framesToClient" type="count64"/>
@@ -380,8 +416,8 @@
<arg name="reason" type="lstr" desc="Reason for a failure"/>
<arg name="rhost" type="sstr" desc="Address (i.e. DNS name, IP address, etc.) of a remotely connected host"/>
<arg name="user" type="sstr" desc="Authentication identity"/>
- <arg name="msgDepth" type="count32" desc="Current size of queue in messages"/>
- <arg name="byteDepth" type="count32" desc="Current size of queue in bytes"/>
+ <arg name="msgDepth" type="count64" desc="Current size of queue in messages"/>
+ <arg name="byteDepth" type="count64" desc="Current size of queue in bytes"/>
</eventArguments>
<event name="clientConnect" sev="inform" args="rhost, user"/>
Modified: qpid/branches/QPID-2519/tests/setup.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tests/setup.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tests/setup.py (original)
+++ qpid/branches/QPID-2519/tests/setup.py Thu Oct 20 18:42:46 2011
@@ -20,7 +20,7 @@
from distutils.core import setup
setup(name="qpid-tests",
- version="0.9",
+ version="0.13",
author="Apache Qpid",
author_email="dev@qpid.apache.org",
packages=["qpid_tests", "qpid_tests.broker_0_10", "qpid_tests.broker_0_9",
Modified: qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/__init__.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/__init__.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/__init__.py (original)
+++ qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/__init__.py Thu Oct 20 18:42:46 2011
@@ -33,3 +33,4 @@ from lvq import *
from priority import *
from threshold import *
from extensions import *
+from msg_groups import *
Modified: qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py (original)
+++ qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py Thu Oct 20 18:42:46 2011
@@ -18,7 +18,7 @@
#
import traceback
from qpid.queue import Empty
-from qpid.datatypes import Message
+from qpid.datatypes import Message, RangedSet
from qpid.testlib import TestBase010
from qpid.session import SessionException
@@ -77,13 +77,7 @@ class AlternateExchangeTests(TestBase010
"""
session = self.session
#set up a 'dead letter queue':
- session.exchange_declare(exchange="dlq", type="fanout")
- session.queue_declare(queue="deleted", exclusive=True, auto_delete=True)
- session.exchange_bind(exchange="dlq", queue="deleted")
- session.message_subscribe(destination="dlq", queue="deleted")
- session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL)
- session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
- dlq = session.incoming("dlq")
+ dlq = self.setup_dlq()
#create a queue using the dlq as its alternate exchange:
session.queue_declare(queue="delete-me", alternate_exchange="dlq")
@@ -236,6 +230,121 @@ class AlternateExchangeTests(TestBase010
self.assertEqual("Three", dlq.get(timeout=1).body)
self.assertEmpty(dlq)
+ def test_queue_delete_loop(self):
+ """
+ Test that if a queue is bound to its own alternate exchange,
+ then on deletion there is no infinite looping
+ """
+ session = self.session
+ dlq = self.setup_dlq()
+
+ #create a queue using the dlq as its alternate exchange:
+ session.queue_declare(queue="delete-me", alternate_exchange="dlq")
+ #bind that queue to the dlq as well:
+ session.exchange_bind(exchange="dlq", queue="delete-me")
+ #send it some messages:
+ dp=self.session.delivery_properties(routing_key="delete-me")
+ for m in ["One", "Two", "Three"]:
+ session.message_transfer(message=Message(dp, m))
+ #delete it:
+ session.queue_delete(queue="delete-me")
+ #cleanup:
+ session.exchange_delete(exchange="dlq")
+
+ #check the messages were delivered to the dlq:
+ for m in ["One", "Two", "Three"]:
+ self.assertEqual(m, dlq.get(timeout=1).body)
+ self.assertEmpty(dlq)
+
+ def test_queue_delete_no_match(self):
+ """
+ Test that on queue deletion, if the queues own alternate
+ exchange cannot find a match for the message, the
+ alternate-exchange of that exchange will be tried. Note:
+ though the spec rules out going to the alternate-exchanges
+ alternate exchange when sending to an exchange, it does not
+ cover this case.
+ """
+ session = self.session
+ dlq = self.setup_dlq()
+
+ #setu up an 'intermediary' exchange
+ session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq")
+
+ #create a queue using the intermediary as its alternate exchange:
+ session.queue_declare(queue="delete-me", alternate_exchange="my-exchange")
+ #bind that queue to the dlq as well:
+ session.exchange_bind(exchange="dlq", queue="delete-me")
+ #send it some messages:
+ dp=self.session.delivery_properties(routing_key="delete-me")
+ for m in ["One", "Two", "Three"]:
+ session.message_transfer(message=Message(dp, m))
+
+ #delete it:
+ session.queue_delete(queue="delete-me")
+ #cleanup:
+ session.exchange_delete(exchange="my-exchange")
+ session.exchange_delete(exchange="dlq")
+
+ #check the messages were delivered to the dlq:
+ for m in ["One", "Two", "Three"]:
+ self.assertEqual(m, dlq.get(timeout=1).body)
+ self.assertEmpty(dlq)
+
+ def test_reject_no_match(self):
+ """
+ Test that on rejecting a message, if the queues own alternate
+ exchange cannot find a match for the message, the
+ alternate-exchange of that exchange will be tried. Note:
+ though the spec rules out going to the alternate-exchanges
+ alternate exchange when sending to an exchange, it does not
+ cover this case.
+ """
+ session = self.session
+ dlq = self.setup_dlq()
+
+ #setu up an 'intermediary' exchange
+ session.exchange_declare(exchange="my-exchange", type="direct", alternate_exchange="dlq")
+
+ #create a queue using the intermediary as its alternate exchange:
+ session.queue_declare(queue="delivery-queue", alternate_exchange="my-exchange", auto_delete=True)
+ #bind that queue to the dlq as well:
+ session.exchange_bind(exchange="dlq", queue="delivery-queue")
+ #send it some messages:
+ dp=self.session.delivery_properties(routing_key="delivery-queue")
+ for m in ["One", "Two", "Three"]:
+ session.message_transfer(message=Message(dp, m))
+
+ #get and reject those messages:
+ session.message_subscribe(destination="a", queue="delivery-queue")
+ session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ incoming = session.incoming("a")
+ for m in ["One", "Two", "Three"]:
+ msg = incoming.get(timeout=1)
+ self.assertEqual(m, msg.body)
+ session.message_reject(RangedSet(msg.id))
+ session.message_cancel(destination="a")
+
+ #check the messages were delivered to the dlq:
+ for m in ["One", "Two", "Three"]:
+ self.assertEqual(m, dlq.get(timeout=1).body)
+ self.assertEmpty(dlq)
+ #cleanup:
+ session.exchange_delete(exchange="my-exchange")
+ session.exchange_delete(exchange="dlq")
+
+ def setup_dlq(self):
+ session = self.session
+ #set up 'dead-letter' handling:
+ session.exchange_declare(exchange="dlq", type="fanout")
+ session.queue_declare(queue="deleted", exclusive=True, auto_delete=True)
+ session.exchange_bind(exchange="dlq", queue="deleted")
+ session.message_subscribe(destination="dlq", queue="deleted")
+ session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ dlq = session.incoming("dlq")
+ return dlq
def assertEmpty(self, queue):
try:
Modified: qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/dtx.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/dtx.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/dtx.py (original)
+++ qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/dtx.py Thu Oct 20 18:42:46 2011
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -36,7 +36,7 @@ class DtxTests(TestBase010):
and the appropriate result verified.
The other tests enforce more specific rules and behaviour on a
- per-method or per-field basis.
+ per-method or per-field basis.
"""
XA_RBROLLBACK = 1
@@ -49,8 +49,8 @@ class DtxTests(TestBase010):
self.session = self.conn.session("dtx-session", 1)
def test_simple_commit(self):
- """
- Test basic one-phase commit behaviour.
+ """
+ Test basic one-phase commit behaviour.
"""
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
session = self.session
@@ -73,8 +73,8 @@ class DtxTests(TestBase010):
self.assertMessageId("commit", "queue-b")
def test_simple_prepare_commit(self):
- """
- Test basic two-phase commit behaviour.
+ """
+ Test basic two-phase commit behaviour.
"""
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
session = self.session
@@ -100,8 +100,8 @@ class DtxTests(TestBase010):
def test_simple_rollback(self):
- """
- Test basic rollback behaviour.
+ """
+ Test basic rollback behaviour.
"""
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
session = self.session
@@ -123,8 +123,8 @@ class DtxTests(TestBase010):
self.assertMessageId("rollback", "queue-a")
def test_simple_prepare_rollback(self):
- """
- Test basic rollback behaviour after the transaction has been prepared.
+ """
+ Test basic rollback behaviour after the transaction has been prepared.
"""
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
session = self.session
@@ -146,18 +146,18 @@ class DtxTests(TestBase010):
#check result
self.assertMessageCount(1, "queue-a")
self.assertMessageCount(0, "queue-b")
- self.assertMessageId("prepare-rollback", "queue-a")
+ self.assertMessageId("prepare-rollback", "queue-a")
def test_select_required(self):
"""
check that an error is flagged if select is not issued before
- start or end
+ start or end
"""
session = self.session
tx = self.xid("dummy")
try:
session.dtx_start(xid=tx)
-
+
#if we get here we have failed, but need to do some cleanup:
session.dtx_end(xid=tx)
session.dtx_rollback(xid=tx)
@@ -197,10 +197,10 @@ class DtxTests(TestBase010):
other.close()
session1.dtx_end(xid=tx)
session1.dtx_rollback(xid=tx)
-
+
#verification:
if failed: self.assertEquals(530, error.args[0].error_code)
- else: self.fail("Xid already known, expected exception!")
+ else: self.fail("Xid already known, expected exception!")
def test_forget_xid_on_completion(self):
"""
@@ -210,8 +210,8 @@ class DtxTests(TestBase010):
#do some transactional work & complete the transaction
self.test_simple_commit()
# session has been reset, so reselect for use with dtx
- self.session.dtx_select()
-
+ self.session.dtx_select()
+
#start association for the same xid as the previously completed txn
tx = self.xid("my-xid")
self.session.dtx_start(xid=tx)
@@ -237,9 +237,9 @@ class DtxTests(TestBase010):
self.assertEquals(503, e.args[0].error_code)
def test_start_join(self):
- """
+ """
Verify 'join' behaviour, where a session is associated with a
- transaction that is already associated with another session.
+ transaction that is already associated with another session.
"""
guard = self.keepQueuesAlive(["one", "two"])
#create two sessions & select them for use with dtx:
@@ -269,14 +269,14 @@ class DtxTests(TestBase010):
#mark end on both sessions
session1.dtx_end(xid=tx)
session2.dtx_end(xid=tx)
-
+
#commit and check
session1.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "one")
self.assertMessageCount(1, "two")
self.assertMessageId("a", "two")
self.assertMessageId("b", "one")
-
+
def test_suspend_resume(self):
"""
@@ -300,7 +300,7 @@ class DtxTests(TestBase010):
session.dtx_start(xid=tx, resume=True)
self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
session.dtx_end(xid=tx)
-
+
#commit and check
session.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "one")
@@ -308,7 +308,7 @@ class DtxTests(TestBase010):
self.assertMessageId("a", "two")
self.assertMessageId("b", "one")
- def test_suspend_start_end_resume(self):
+ def test_suspend_start_end_resume(self):
"""
Test suspension and resumption of an association with work
done on another transaction when the first transaction is
@@ -332,7 +332,7 @@ class DtxTests(TestBase010):
session.dtx_start(xid=tx, resume=True)
self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
session.dtx_end(xid=tx)
-
+
#commit and check
session.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "one")
@@ -341,10 +341,10 @@ class DtxTests(TestBase010):
self.assertMessageId("b", "one")
def test_end_suspend_and_fail(self):
- """
+ """
Verify that the correct error is signalled if the suspend and
fail flag are both set when disassociating a transaction from
- the session
+ the session
"""
session = self.session
session.dtx_select()
@@ -356,16 +356,16 @@ class DtxTests(TestBase010):
except SessionException, e:
self.assertEquals(503, e.args[0].error_code)
- #cleanup
+ #cleanup
other = self.connect()
session = other.session("cleanup", 1)
session.dtx_rollback(xid=tx)
session.close()
other.close()
-
+
def test_end_unknown_xid(self):
- """
+ """
Verifies that the correct exception is thrown when an attempt
is made to end the association for a xid not previously
associated with the session
@@ -382,7 +382,7 @@ class DtxTests(TestBase010):
def test_end(self):
"""
Verify that the association is terminated by end and subsequent
- operations are non-transactional
+ operations are non-transactional
"""
guard = self.keepQueuesAlive(["tx-queue"])
session = self.conn.session("alternate", 1)
@@ -408,7 +408,7 @@ class DtxTests(TestBase010):
session.message_accept(RangedSet(msg.id))
session.close()
- session = self.session
+ session = self.session
#commit the transaction and check that the first message (and
#only the first message) is then delivered
session.dtx_commit(xid=tx, one_phase=True)
@@ -418,7 +418,7 @@ class DtxTests(TestBase010):
def test_invalid_commit_one_phase_true(self):
"""
Test that a commit with one_phase = True is rejected if the
- transaction in question has already been prepared.
+ transaction in question has already been prepared.
"""
other = self.connect()
tester = other.session("tester", 1)
@@ -447,7 +447,7 @@ class DtxTests(TestBase010):
def test_invalid_commit_one_phase_false(self):
"""
Test that a commit with one_phase = False is rejected if the
- transaction in question has not yet been prepared.
+ transaction in question has not yet been prepared.
"""
other = self.connect()
tester = other.session("tester", 1)
@@ -474,7 +474,7 @@ class DtxTests(TestBase010):
def test_invalid_commit_not_ended(self):
"""
- Test that a commit fails if the xid is still associated with a session.
+ Test that a commit fails if the xid is still associated with a session.
"""
other = self.connect()
tester = other.session("tester", 1)
@@ -502,7 +502,7 @@ class DtxTests(TestBase010):
def test_invalid_rollback_not_ended(self):
"""
- Test that a rollback fails if the xid is still associated with a session.
+ Test that a rollback fails if the xid is still associated with a session.
"""
other = self.connect()
tester = other.session("tester", 1)
@@ -531,7 +531,7 @@ class DtxTests(TestBase010):
def test_invalid_prepare_not_ended(self):
"""
- Test that a prepare fails if the xid is still associated with a session.
+ Test that a prepare fails if the xid is still associated with a session.
"""
other = self.connect()
tester = other.session("tester", 1)
@@ -586,9 +586,9 @@ class DtxTests(TestBase010):
session1.dtx_rollback(xid=tx)
def test_get_timeout(self):
- """
+ """
Check that get-timeout returns the correct value, (and that a
- transaction with a timeout can complete normally)
+ transaction with a timeout can complete normally)
"""
session = self.session
tx = self.xid("dummy")
@@ -599,12 +599,12 @@ class DtxTests(TestBase010):
session.dtx_set_timeout(xid=tx, timeout=60)
self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout)
self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status)
- self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
-
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
+
def test_set_timeout(self):
- """
+ """
Test the timeout of a transaction results in the expected
- behaviour
+ behaviour
"""
guard = self.keepQueuesAlive(["queue-a", "queue-b"])
@@ -627,7 +627,7 @@ class DtxTests(TestBase010):
self.assertMessageId("timeout", "queue-a")
#check the correct codes are returned when we try to complete the txn
self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status)
- self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status)
+ self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status)
@@ -649,20 +649,20 @@ class DtxTests(TestBase010):
if i in [2, 5, 6, 8]:
session.dtx_prepare(xid=tx)
prepared.append(tx)
- else:
+ else:
session.dtx_rollback(xid=tx)
xids = session.dtx_recover().in_doubt
-
+
#rollback the prepared transactions returned by recover
for x in xids:
- session.dtx_rollback(xid=x)
+ session.dtx_rollback(xid=x)
#validate against the expected list of prepared transactions
actual = set([x.global_id for x in xids]) #TODO: come up with nicer way to test these
expected = set([x.global_id for x in prepared])
intersection = actual.intersection(expected)
-
+
if intersection != expected:
missing = expected.difference(actual)
extra = actual.difference(expected)
@@ -723,7 +723,7 @@ class DtxTests(TestBase010):
session.message_transfer(message=Message(dp, mp, "DtxMessage"))
#start the transaction:
- session.dtx_select()
+ session.dtx_select()
self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)
#'swap' the message from one queue to the other, under that transaction:
@@ -760,7 +760,7 @@ class DtxTests(TestBase010):
def getMessageProperty(self, msg, prop):
for h in msg.headers:
if hasattr(h, prop): return getattr(h, prop)
- return None
+ return None
def keepQueuesAlive(self, names):
session = self.conn.session("nasty", 99)
@@ -768,7 +768,7 @@ class DtxTests(TestBase010):
session.queue_declare(queue=n, auto_delete=True)
session.message_subscribe(destination=n, queue=n)
return session
-
+
def createMessage(self, session, key, id, body):
dp=session.delivery_properties(routing_key=key)
mp=session.message_properties(correlation_id=id)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org