You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/22 12:50:52 UTC
svn commit: r630165 [3/3] - in /incubator/qpid/branches/thegreatmerge: ./
qpid/bin/ qpid/cpp/ qpid/cpp/examples/ qpid/cpp/examples/examples/direct/
qpid/cpp/examples/examples/fanout/ qpid/cpp/examples/examples/pub-sub/
qpid/cpp/examples/examples/reques...
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java?rev=630165&r1=630164&r2=630165&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java Fri Feb 22 03:50:26 2008
@@ -267,7 +267,7 @@
_logger.debug("receiving a message from topic test we expect it to be the second one");
try
{
- TextMessage message = (TextMessage) _consumer.receiveNoWait();
+ TextMessage message = (TextMessage) _consumer.receive(1000);
if (message == null)
{
fail("did not receive second message as expected ");
@@ -298,7 +298,7 @@
_logger.debug("We should now be able to receive the first and second message");
try
{
- TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait();
+ TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
if (message1 == null)
{
fail("did not receive first message as expected ");
@@ -311,7 +311,7 @@
.getLongProperty(_sequenceNumberPropertyName));
}
}
- message1 = (TextMessage) nonXAConsumer.receiveNoWait();
+ message1 = (TextMessage) nonXAConsumer.receive(1000);
if (message1 == null)
{
fail("did not receive first message as expected ");
@@ -327,7 +327,7 @@
_logger.debug("commit transacted session");
nonXASession.commit();
_logger.debug("Test that the topic is now empty");
- message1 = (TextMessage) nonXAConsumer.receiveNoWait();
+ message1 = (TextMessage) nonXAConsumer.receive(1000);
if (message1 != null)
{
fail("receive an unexpected message ");
@@ -390,7 +390,7 @@
_logger.debug("start xid2");
_xaResource.start(xid2, XAResource.TMSUCCESS);
_logger.debug("receive the previously produced message");
- TextMessage message = (TextMessage) xaDurSub.receiveNoWait();
+ TextMessage message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received ");
@@ -432,7 +432,7 @@
_logger.debug("start xid3");
_xaResource.start(xid3, XAResource.TMSUCCESS);
_logger.debug(" receive the previously aborted consumed message");
- TextMessage message = (TextMessage) xaDurSub.receiveNoWait();
+ TextMessage message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received ");
@@ -459,7 +459,7 @@
_logger.debug("start xid4");
_xaResource.start(xid4, XAResource.TMSUCCESS);
_logger.debug("check that topic is empty");
- TextMessage message = (TextMessage) xaDurSub.receiveNoWait();
+ TextMessage message = (TextMessage) xaDurSub.receive(1000);
if (message != null)
{
fail("An unexpected message was received ");
@@ -547,7 +547,7 @@
// receive the 2 first messages
for (int i = 1; i <= 2; i++)
{
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
@@ -563,7 +563,7 @@
// receive the 2 first messages
for (int i = 3; i <= 4; i++)
{
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
@@ -579,7 +579,7 @@
// receive the 2 first messages
for (int i = 5; i <= 6; i++)
{
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
@@ -608,7 +608,7 @@
_logger.debug(" 3, 4 and 7");
for (int i = 1; i <= 3; i++)
{
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + 3);
@@ -651,7 +651,7 @@
for (int i = 1; i <= 5; i++)
{
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
_logger.debug(" received message: " + message.getLongProperty(_sequenceNumberPropertyName));
if (message == null)
{
@@ -678,7 +678,7 @@
// start xid6
_xaResource.start(xid6, XAResource.TMSUCCESS);
// should now be empty
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
if (message != null)
{
fail("An unexpected message was received " + message
@@ -773,7 +773,7 @@
// receive the 2 first messages
for (int i = 1; i <= 2; i++)
{
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
@@ -789,7 +789,7 @@
// receive the 2 first messages
for (int i = 3; i <= 4; i++)
{
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
@@ -805,7 +805,7 @@
// receive the 2 first messages
for (int i = 5; i <= 6; i++)
{
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
@@ -830,6 +830,7 @@
try
{
shutdownServer();
+ init();
}
catch (Exception e)
{
@@ -866,7 +867,7 @@
// receive the 2 first messages
for (int i = 1; i <= 2; i++)
{
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
@@ -883,7 +884,7 @@
// receive 3 message within tx1: 3, 4 and 7
_xaResource.start(xid1, XAResource.TMRESUME);
// receive messages 3, 4 and 7
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + 3);
@@ -893,7 +894,7 @@
fail("wrong sequence number: " + message
.getLongProperty(_sequenceNumberPropertyName) + " 3 was expected");
}
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + 4);
@@ -903,7 +904,7 @@
fail("wrong sequence number: " + message
.getLongProperty(_sequenceNumberPropertyName) + " 4 was expected");
}
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + 7);
@@ -942,7 +943,7 @@
_xaResource.start(xid4, XAResource.TMSUCCESS);
for (int i = 1; i <= 4; i++)
{
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
@@ -957,7 +958,7 @@
_xaResource.start(xid5, XAResource.TMSUCCESS);
for (int i = 7; i <= 10; i++)
{
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
@@ -975,7 +976,7 @@
_xaResource.start(xid5, XAResource.TMRESUME);
for (int i = 1; i <= 4; i++)
{
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received! expected: " + i);
@@ -1001,7 +1002,7 @@
// start xid6
_xaResource.start(xid6, XAResource.TMSUCCESS);
// should now be empty
- message = (TextMessage) xaDurSub.receiveNoWait();
+ message = (TextMessage) xaDurSub.receive(1000);
if (message != null)
{
fail("An unexpected message was received " + message
@@ -1067,7 +1068,7 @@
_message.setLongProperty(_sequenceNumberPropertyName, 1);
_producer.send(_message);
// commit
- _xaResource.end(xid1, XAResource.TMSUSPEND);
+ _xaResource.end(xid1, XAResource.TMSUCCESS);
if (_xaResource.prepare(xid1) != XAResource.XA_OK)
{
fail("Problem when preparing tx1 ");
@@ -1084,7 +1085,7 @@
// start xid2
_xaResource.start(xid2, XAResource.TMSUCCESS);
// receive the previously produced message
- TextMessage message = (TextMessage) xaDurSub.receiveNoWait();
+ TextMessage message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received ");
@@ -1094,7 +1095,7 @@
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
// prepare xid2
- _xaResource.end(xid2, XAResource.TMSUSPEND);
+ _xaResource.end(xid2, XAResource.TMSUCCESS);
if (_xaResource.prepare(xid2) != XAResource.XA_OK)
{
fail("Problem when preparing tx2 ");
@@ -1110,6 +1111,7 @@
try
{
shutdownServer();
+ init();
}
catch (Exception e)
{
@@ -1166,7 +1168,7 @@
// start xid3
_xaResource.start(xid3, XAResource.TMSUCCESS);
// receive the previously produced message and aborted
- TextMessage message = (TextMessage) xaDurSub.receiveNoWait();
+ TextMessage message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
fail("no message received ");
@@ -1176,7 +1178,7 @@
fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
// commit xid3
- _xaResource.end(xid3, XAResource.TMSUSPEND);
+ _xaResource.end(xid3, XAResource.TMSUCCESS);
if (_xaResource.prepare(xid3) != XAResource.XA_OK)
{
fail("Problem when preparing tx3 ");
@@ -1193,14 +1195,14 @@
// start xid4
_xaResource.start(xid4, XAResource.TMSUCCESS);
// should now be empty
- TextMessage message = (TextMessage) xaDurSub.receiveNoWait();
+ TextMessage message = (TextMessage) xaDurSub.receive(1000);
if (message != null)
{
fail("An unexpected message was received " + message
.getLongProperty(_sequenceNumberPropertyName));
}
// commit xid4
- _xaResource.end(xid4, XAResource.TMSUSPEND);
+ _xaResource.end(xid4, XAResource.TMSUCCESS);
_xaResource.commit(xid4, true);
}
catch (Exception e)
@@ -1239,12 +1241,10 @@
{
Xid xid1 = getNewXid();
String durSubName = "test1";
- TopicSession nonXASession1;
try
{
// create a dummy durable subscriber to be sure that messages are persisted!
- nonXASession1 = _nonXASession;
- nonXASession1.createDurableSubscriber(_topic, durSubName);
+ _nonXASession.createDurableSubscriber(_topic, durSubName);
// start the xaResource for xid1
try
{
@@ -1289,6 +1289,7 @@
try
{
shutdownServer();
+ init();
}
catch (Exception e)
{
@@ -1297,7 +1298,7 @@
try
{
- MessageConsumer nonXAConsumer = nonXASession1.createDurableSubscriber(_topic, durSubName);
+ MessageConsumer nonXAConsumer = _nonXASession.createDurableSubscriber(_topic, durSubName);
_topicConnection.start();
// get the list of in doubt transactions
try
@@ -1341,19 +1342,21 @@
fail("exception thrown when recovering transactions " + e.getMessage());
}
_logger.debug("the topic should not be empty");
- TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait();
+ TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
if (message1 == null)
{
fail("The topic is empty! ");
}
}
- catch (JMSException e)
+ catch (Exception e)
{
+ e.printStackTrace();
fail("Exception thrown when testin that queue test is empty: " + e.getMessage());
}
}
catch (JMSException e)
{
+ e.printStackTrace();
fail("cannot create dummy durable subscriber: " + e.getMessage());
}
finally
@@ -1410,7 +1413,7 @@
stSession.commit();
}
_logger.debug("consume the first message with that durable subscriber");
- message = (TextMessage) durSub.receiveNoWait();
+ message = (TextMessage) durSub.receive(1000);
if (message == null)
{
fail("no message received ");
@@ -1428,14 +1431,14 @@
_xaResource.start(xid1, XAResource.TMSUCCESS);
durSub = _session.createDurableSubscriber(_topic, durSubName);
_logger.debug(" consume the second message with that xa durable subscriber and abort it");
- message = (TextMessage) durSub.receiveNoWait();
+ message = (TextMessage) durSub.receive(1000);
if (message == null)
{
fail("no message received ");
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != 2)
{
- fail("wrong sequence number, 2 expected, received: " + message
+ System.out.println("wrong sequence number, 2 expected, received: " + message
.getLongProperty(_sequenceNumberPropertyName));
}
_xaResource.end(xid1, XAResource.TMSUCCESS);
@@ -1450,30 +1453,30 @@
durSub1 = stSession.createDurableSubscriber(_topic, durSubName + "_second");
_logger.debug("Reconnected to durablse subscribers");
_logger.debug(" consume the 2 remaining messages");
- message = (TextMessage) durSub.receiveNoWait();
+ message = (TextMessage) durSub.receive(1000);
if (message == null)
{
fail("no message received ");
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != 2)
{
- fail("wrong sequence number, 2 expected, received: " + message
+ System.out.println("wrong sequence number, 2 expected, received: " + message
.getLongProperty(_sequenceNumberPropertyName));
}
// consume the third message with that xa durable subscriber
- message = (TextMessage) durSub.receiveNoWait();
+ message = (TextMessage) durSub.receive(1000);
if (message == null)
{
fail("no message received ");
}
else if (message.getLongProperty(_sequenceNumberPropertyName) != 3)
{
- fail("wrong sequence number, 3 expected, received: " + message
+ System.out.println("wrong sequence number, 3 expected, received: " + message
.getLongProperty(_sequenceNumberPropertyName));
}
stSession.commit();
_logger.debug("the topic should be empty now");
- message = (TextMessage) durSub.receiveNoWait();
+ message = (TextMessage) durSub.receive(1000);
if (message != null)
{
fail("Received unexpected message ");
@@ -1482,7 +1485,7 @@
_logger.debug(" use dursub1 to receive all the 3 messages");
for (int i = 1; i <= 3; i++)
{
- message = (TextMessage) durSub1.receiveNoWait();
+ message = (TextMessage) durSub1.receive(1000);
if (message == null)
{
_logger.debug("no message received ");
@@ -1499,12 +1502,12 @@
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(_message);
stSession.commit();
- message = (TextMessage) durSub.receiveNoWait();
+ message = (TextMessage) durSub.receive(1000);
if (message == null)
{
fail("message not received ");
}
- message = (TextMessage) durSub1.receiveNoWait();
+ message = (TextMessage) durSub1.receive(1000);
if (message == null)
{
fail("message not received ");
@@ -1528,7 +1531,7 @@
_logger.debug(" use dursub to receive all the 3 messages");
for (int i = 1; i <= 3; i++)
{
- message = (TextMessage) durSub.receiveNoWait();
+ message = (TextMessage) durSub.receive(1000);
if (message == null)
{
System.out.println("no message received ");
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java?rev=630165&r1=630164&r2=630165&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java Fri Feb 22 03:50:26 2008
@@ -43,6 +43,7 @@
// system properties
private static final String BROKER = "broker";
+ private static final String BROKER_CLEAN = "broker.clean";
private static final String BROKER_VERSION = "broker.version";
// values
@@ -52,6 +53,7 @@
private static final String VERSION_010 = "0-10";
private String _broker = System.getProperty(BROKER, VM);
+ private String _brokerClean = System.getProperty(BROKER_CLEAN, null);
private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08);
private Process _brokerProcess;
@@ -82,6 +84,35 @@
}
}
+ private static final class Piper extends Thread
+ {
+
+ private InputStream in;
+
+ public Piper(InputStream in)
+ {
+ this.in = in;
+ }
+
+ public void run()
+ {
+ try
+ {
+ byte[] buf = new byte[4*1024];
+ int n;
+ while ((n = in.read(buf)) != -1)
+ {
+ System.out.write(buf, 0, n);
+ }
+ }
+ catch (IOException e)
+ {
+ // this seems to happen regularly even when
+ // exits are normal
+ }
+ }
+ }
+
public void startBroker() throws Exception
{
if (_broker.equals(VM))
@@ -96,39 +127,48 @@
pb.redirectErrorStream(true);
_brokerProcess = pb.start();
- new Thread()
- {
- private InputStream in = _brokerProcess.getInputStream();
-
- public void run()
- {
- try
- {
- byte[] buf = new byte[4*1024];
- int n;
- while ((n = in.read(buf)) != -1)
- {
- System.out.write(buf, 0, n);
- }
- }
- catch (IOException e)
- {
- // this seems to happen regularly even when
- // exits are normal
- }
- }
- }.start();
+ new Piper(_brokerProcess.getInputStream()).start();
Thread.sleep(1000);
try
{
int exit = _brokerProcess.exitValue();
+ _logger.info("broker aborted: " + exit);
+ cleanBroker();
throw new RuntimeException("broker aborted: " + exit);
}
catch (IllegalThreadStateException e)
{
// this is expect if the broker started succesfully
+ }
+ }
+ }
+
+ public void cleanBroker()
+ {
+ if (_brokerClean != null)
+ {
+ _logger.info("clean: " + _brokerClean);
+
+ try
+ {
+ ProcessBuilder pb = new ProcessBuilder(_brokerClean.split("\\s+"));
+ pb.redirectErrorStream(true);
+ Process clean = pb.start();
+ new Piper(clean.getInputStream()).start();
+
+ clean.waitFor();
+
+ _logger.info("clean exited: " + clean.exitValue());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
}
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java?rev=630165&r1=630164&r2=630165&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java Fri Feb 22 03:50:26 2008
@@ -36,10 +36,12 @@
private final List<Struct> structs;
private ByteBuffer _buf;
+ private boolean _noPayload;
- public Header(List<Struct> structs)
+ public Header(List<Struct> structs, boolean lastframe)
{
this.structs = structs;
+ _noPayload= lastframe;
}
public List<Struct> getStructs()
@@ -78,6 +80,12 @@
{
delegate.header(context, this);
}
+
+ public boolean hasNoPayload()
+ {
+ return _noPayload;
+ }
+
public String toString()
{
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=630165&r1=630164&r2=630165&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java Fri Feb 22 03:50:26 2008
@@ -237,7 +237,7 @@
public Header header(List<Struct> structs)
{
- Header res = new Header(structs);
+ Header res = new Header(structs, false);
header(res);
return res;
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java?rev=630165&r1=630164&r2=630165&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java Fri Feb 22 03:50:26 2008
@@ -154,11 +154,11 @@
if (frame.isLastFrame())
{
clearSegment(frame);
- emit(frame, decode(frame.getType(), segment));
+ emit(frame, decode(frame, frame.getType(), segment));
}
}
- private ProtocolEvent decode(byte type, List<ByteBuffer> segment)
+ private ProtocolEvent decode(Frame frame, byte type, List<ByteBuffer> segment)
{
FragmentDecoder dec = new FragmentDecoder(segment.iterator());
@@ -175,7 +175,7 @@
{
structs.add(dec.readLongStruct());
}
- return new Header(structs);
+ return new Header(structs,frame.isLastFrame() && frame.isLastSegment());
default:
throw new IllegalStateException("unknown frame type: " + type);
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java?rev=630165&r1=630164&r2=630165&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java Fri Feb 22 03:50:26 2008
@@ -90,9 +90,15 @@
return conn;
}
- public void testWriteToClosed() throws Exception
+ public void testClosedNotificationAndWriteToClosed() throws Exception
{
- Connection conn = connect(null);
+ Condition closed = new Condition();
+ Connection conn = connect(closed);
+ if (!closed.get(3000))
+ {
+ fail("never got notified of connection close");
+ }
+
Channel ch = conn.getChannel(0);
Session ssn = new Session();
ssn.attach(ch);
@@ -105,16 +111,6 @@
catch (TransportException e)
{
// expected
- }
- }
-
- public void testClosedNotification() throws Exception
- {
- Condition closed = new Condition();
- Connection conn = connect(closed);
- if (!closed.get(3000))
- {
- fail("never got notified of connection close");
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/module.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/module.xml?rev=630165&r1=630164&r2=630165&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/module.xml (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/module.xml Fri Feb 22 03:50:26 2008
@@ -26,6 +26,7 @@
<globmapper from="${project.root}${file.separator}*" to="*"/>
</map>
+
<property file="${project.root}/build.deps"/>
<property name="module.build" location="${build}/${module}"/>
@@ -166,7 +167,18 @@
<property name="java.naming.factory.initial" value="org.apache.qpid.jndi.PropertiesFileInitialContextFactory"/>
<property name="java.naming.provider.url" value="${project.root}/test-provider.properties"/>
- <condition property="broker" value="${project.root}/../cpp/src/qpidd --data-dir ${build.data} -t" else="vm">
+ <condition property="brokerdefault" value="${project.root}/../cpp/src/qpidd --data-dir ${build.data} -t" else="vm">
+ <isset property="cpp"/>
+ </condition>
+
+ <condition property="broker" value="${brokerdefault} --load-module ${store} --store-async yes" else="${brokerdefault}">
+ <and>
+ <isset property="store"/>
+ <isset property="cpp"/>
+ </and>
+ </condition>
+
+ <condition property="broker.clean" value="${project.root}/clean-dir ${build.data}">
<isset property="cpp"/>
</condition>
@@ -184,6 +196,7 @@
<sysproperty key="java.naming.factory.initial" value="${java.naming.factory.initial}"/>
<sysproperty key="java.naming.provider.url" value="${java.naming.provider.url}"/>
<sysproperty key="broker" value="${broker}"/>
+ <sysproperty key="broker.clean" value="${broker.clean}"/>
<sysproperty key="broker.version" value="${broker.version}"/>
<formatter type="plain"/>
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/test-provider.properties
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/test-provider.properties?rev=630165&r1=630164&r2=630165&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/test-provider.properties (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/test-provider.properties Fri Feb 22 03:50:26 2008
@@ -1 +1,7 @@
connectionfactory.local = qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672
+
+queue.MyQueue = example.MyQueue
+queue.xaQueue = xaQueue
+
+topic.xaTopic = xaTopic
+topic.durableSubscriberTopic = durableSubscriberTopic
Modified: incubator/qpid/branches/thegreatmerge/qpid/python/qpid/connection.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/qpid/connection.py?rev=630165&r1=630164&r2=630165&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/python/qpid/connection.py (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/python/qpid/connection.py Fri Feb 22 03:50:26 2008
@@ -178,6 +178,12 @@
raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
return frame
+ def write_99_0(self, frame):
+ self.write_0_10(frame)
+
+ def read_99_0(self):
+ return self.read_0_10()
+
class Frame:
DECODERS = {}
@@ -233,7 +239,7 @@
def encode(self, c):
version = (c.spec.major, c.spec.minor)
- if version == (0, 10):
+ if version == (0, 10) or version == (99, 0):
c.encode_octet(self.method.klass.id)
c.encode_octet(self.method.id)
else:
@@ -244,7 +250,7 @@
def decode(spec, c, size):
version = (c.spec.major, c.spec.minor)
- if version == (0, 10):
+ if version == (0, 10) or version == (99, 0):
klass = spec.classes.byid[c.decode_octet()]
meth = klass.methods.byid[c.decode_octet()]
else:
@@ -315,7 +321,7 @@
return "[%s] Response(%s,%s,%s) %s" % (self.channel, self.id, self.request_id, self.batch_offset, self.method)
def uses_struct_encoding(spec):
- return (spec.major == 0 and spec.minor == 10)
+ return (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0)
class Header(Frame):
Modified: incubator/qpid/branches/thegreatmerge/qpid/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/qpid/peer.py?rev=630165&r1=630164&r2=630165&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/python/qpid/peer.py (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/python/qpid/peer.py Fri Feb 22 03:50:26 2008
@@ -198,7 +198,7 @@
self.invoker = self.invoke_reliable
else:
self.invoker = self.invoke_method
- self.use_execution_layer = (spec.major == 0 and spec.minor == 10)
+ self.use_execution_layer = (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0)
self.synchronous = True
def closed(self, reason):
Modified: incubator/qpid/branches/thegreatmerge/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/qpid/testlib.py?rev=630165&r1=630164&r2=630165&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/python/qpid/testlib.py Fri Feb 22 03:50:26 2008
@@ -141,7 +141,7 @@
self.tests=findmodules("tests")
if self.use08spec():
self.tests+=findmodules("tests_0-8")
- elif self.spec.major == 0 and self.spec.minor == 10:
+ elif (self.spec.major == 0 and self.spec.minor == 10) or (self.spec.major == 99 and self.spec.minor == 0):
self.tests+=findmodules("tests_0-10")
else:
self.tests+=findmodules("tests_0-9")
Modified: incubator/qpid/branches/thegreatmerge/qpid/specs/amqp.0-10-preview.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/specs/amqp.0-10-preview.xml?rev=630165&r1=630164&r2=630165&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/specs/amqp.0-10-preview.xml (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/specs/amqp.0-10-preview.xml Fri Feb 22 03:50:26 2008
@@ -137,7 +137,7 @@
-->
<amqp xmlns="http://www.amqp.org/schema/amqp.xsd"
- major="0" minor="10" port="5672" comment="AMQ Protocol (Working version)">
+ major="99" minor="0" port="5672" comment="AMQ Protocol (Working version)">
<!--
======================================================