You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/09/28 12:42:11 UTC
svn commit: r580293 [2/2] - in /incubator/qpid/branches/M2: ./ java/
java/broker/ java/broker/etc/
java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/server/queue/
java/broker/src/test/java/org/apache/qpid/serve...
Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Fri Sep 28 03:41:49 2007
@@ -149,19 +149,21 @@
{
int port = details.getPort();
- if (!_inVmPipeAddress.containsKey(port))
+ synchronized (_inVmPipeAddress)
{
- if (AutoCreate)
+ if (!_inVmPipeAddress.containsKey(port))
{
- createVMBroker(port);
- }
- else
- {
- throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
- + " does not exist. Auto create disabled.", null);
+ if (AutoCreate)
+ {
+ createVMBroker(port);
+ }
+ else
+ {
+ throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
+ + " does not exist. Auto create disabled.", null);
+ }
}
}
-
return new VmPipeTransportConnection(port);
}
@@ -176,69 +178,71 @@
config.setThreadModel(ReadWriteThreadModel.getInstance());
}
- if (!_inVmPipeAddress.containsKey(port))
+ synchronized (_inVmPipeAddress)
{
- _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
- IoHandlerAdapter provider = null;
- try
+ if (!_inVmPipeAddress.containsKey(port))
{
- VmPipeAddress pipe = new VmPipeAddress(port);
-
- provider = createBrokerInstance(port);
-
- _acceptor.bind(pipe, provider);
-
- _inVmPipeAddress.put(port, pipe);
- _logger.info("Created InVM Qpid.AMQP listening on port " + port);
- }
- catch (IOException e)
- {
- _logger.error("Got IOException.", e);
-
- // Try and unbind provider
+ _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
+ IoHandlerAdapter provider = null;
try
{
VmPipeAddress pipe = new VmPipeAddress(port);
- try
- {
- _acceptor.unbind(pipe);
- }
- catch (Exception ignore)
- {
- // ignore
- }
-
- if (provider == null)
- {
- provider = createBrokerInstance(port);
- }
+ provider = createBrokerInstance(port);
_acceptor.bind(pipe, provider);
+
_inVmPipeAddress.put(port, pipe);
_logger.info("Created InVM Qpid.AMQP listening on port " + port);
}
- catch (IOException justUseFirstException)
+ catch (IOException e)
{
- String because;
- if (e.getCause() == null)
+ _logger.error("Got IOException.", e);
+
+ // Try and unbind provider
+ try
{
- because = e.toString();
+ VmPipeAddress pipe = new VmPipeAddress(port);
+
+ try
+ {
+ _acceptor.unbind(pipe);
+ }
+ catch (Exception ignore)
+ {
+ // ignore
+ }
+
+ if (provider == null)
+ {
+ provider = createBrokerInstance(port);
+ }
+
+ _acceptor.bind(pipe, provider);
+ _inVmPipeAddress.put(port, pipe);
+ _logger.info("Created InVM Qpid.AMQP listening on port " + port);
}
- else
+ catch (IOException justUseFirstException)
{
- because = e.getCause().toString();
- }
+ String because;
+ if (e.getCause() == null)
+ {
+ because = e.toString();
+ }
+ else
+ {
+ because = e.getCause().toString();
+ }
- throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
+ throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
+ }
}
}
+ else
+ {
+ _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
+ }
}
- else
- {
- _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
- }
-
}
private static IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException
@@ -285,25 +289,29 @@
{
_logger.info("Killing all VM Brokers");
_acceptor.unbindAll();
-
- Iterator keys = _inVmPipeAddress.keySet().iterator();
-
- while (keys.hasNext())
+ synchronized (_inVmPipeAddress)
{
- int id = (Integer) keys.next();
- _inVmPipeAddress.remove(id);
- }
+ Iterator keys = _inVmPipeAddress.keySet().iterator();
+ while (keys.hasNext())
+ {
+ int id = (Integer) keys.next();
+ _inVmPipeAddress.remove(id);
+ }
+ }
}
public static void killVMBroker(int port)
{
- VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
- if (pipe != null)
+ synchronized (_inVmPipeAddress)
{
- _logger.info("Killing VM Broker:" + port);
- _inVmPipeAddress.remove(port);
- _acceptor.unbind(pipe);
+ VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
+ if (pipe != null)
+ {
+ _logger.info("Killing VM Broker:" + port);
+ _inVmPipeAddress.remove(port);
+ _acceptor.unbind(pipe);
+ }
}
}
Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java Fri Sep 28 03:41:49 2007
@@ -68,6 +68,7 @@
protected void tearDown() throws Exception
{
super.tearDown();
+ TransportConnection.killAllVMBrokers();
}
private void init(AMQConnection connection) throws Exception
Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java Fri Sep 28 03:41:49 2007
@@ -48,6 +48,7 @@
protected void tearDown() throws Exception
{
super.tearDown();
+ TransportConnection.killAllVMBrokers();
}
/**
Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java Fri Sep 28 03:41:49 2007
@@ -125,6 +125,7 @@
protected void tearDown() throws Exception
{
closeConnection();
+ TransportConnection.killAllVMBrokers();
super.tearDown();
}
Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java Fri Sep 28 03:41:49 2007
@@ -20,8 +20,11 @@
*/
package org.apache.qpid.test.unit.client.connection;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import junit.framework.TestCase;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -30,14 +33,20 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
-import junit.framework.TestCase;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.transport.TransportConnection;
-
+/**
+ * ConnectionStartTest:
+ * This test verifies that a fresh connection is not started and no messages are delivered until the connection is
+ * started.
+ *
+ * After the connection is started then the message should be there, and the connection started.
+ *
+ * This Test verifies that using receive() and a messageListener does not cause message delivery before start is called.
+ *
+ */
public class ConnectionStartTest extends TestCase
{
@@ -54,11 +63,18 @@
try
{
+ // Create Consumer Connection
+ _connection = new AMQConnection(_broker, "guest", "guest", "fred", "test");
+ _consumerSess = _connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
- AMQConnection pubCon = new AMQConnection(_broker, "guest", "guest", "fred", "test");
+ Queue queue = _consumerSess.createQueue("ConnectionStartTest");
- AMQQueue queue = new AMQQueue(pubCon,"ConnectionStartTest");
+ _consumer = _consumerSess.createConsumer(queue);
+
+
+ // Create Producer Connection to send message
+ AMQConnection pubCon = new AMQConnection(_broker, "guest", "guest", "fred", "test");
Session pubSess = pubCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
@@ -66,12 +82,6 @@
pub.send(pubSess.createTextMessage("Initial Message"));
- _connection = new AMQConnection(_broker, "guest", "guest", "fred", "test");
-
- _consumerSess = _connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
-
- _consumer = _consumerSess.createConsumer(queue);
-
pubCon.close();
}
@@ -85,6 +95,7 @@
{
_connection.close();
TransportConnection.killVMBroker(1);
+ super.tearDown();
}
public void testSimpleReceiveConnection()
@@ -94,9 +105,9 @@
assertTrue("Connection should not be started", !_connection.started());
//Note that this next line will start the dispatcher in the session
// should really not be called before _connection start
- assertTrue("There should not be messages waiting for the consumer", _consumer.receiveNoWait() == null);
+ assertNull("There should not be messages waiting for the consumer", _consumer.receiveNoWait());
_connection.start();
- assertTrue("There should be messages waiting for the consumer", _consumer.receive(1000) == null);
+ assertNotNull("There should be messages waiting for the consumer", _consumer.receive(1000));
assertTrue("Connection should be started", _connection.started());
}
@@ -131,7 +142,11 @@
}
});
+ // Ensure that setting a ML doesn't start the connection
assertTrue("Connection should not be started", !_connection.started());
+ // Ensure that the message wasn't delivered while the connection was stopped.
+ assertEquals("Message latch should still be set",1,_gotMessage.getCount());
+
_connection.start();
assertTrue("Connection should be started", _connection.started());
Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Fri Sep 28 03:41:49 2007
@@ -434,6 +434,13 @@
verifyMessages(_consumer.receive(1000));
}
+ /**
+ * This test sends two messages receives on of them but doesn't ack it.
+ * The consumer is then closed
+ * the first message should be returned as redelivered.
+ * the second message should be delivered normally.
+ * @throws Exception
+ */
public void testSend2ThenCloseAfter1andTryAgain() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
Modified: incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Fri Sep 28 03:41:49 2007
@@ -711,7 +711,10 @@
if (trace)
{
_logger.trace("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "...");
- _logger.trace(_properties.toString());
+ if (_properties != null)
+ {
+ _logger.trace(_properties.toString());
+ }
}
EncodingUtils.writeUnsignedInteger(buffer, getEncodedSize());
Modified: incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Fri Sep 28 03:41:49 2007
@@ -885,24 +885,8 @@
synchronized (_sendPauseMonitor)
{
if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize))
- // && (_sendPauseBarrier.getNumberWaiting() == 1))
{
- // log.debug("unreceived size estimate under limit = " + unreceivedSize);
-
- // Wait on the send pause barrier for the limit to be re-established.
- /*try
- {*/
- // _sendPauseBarrier.await();
_sendPauseMonitor.notify();
- /*}
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- catch (BrokenBarrierException e)
- {
- throw new RuntimeException(e);
- }*/
}
}
@@ -1159,12 +1143,23 @@
// If necessary, wait until the max pending message size comes within its limit.
synchronized (_sendPauseMonitor)
{
+ // Used to keep track of the number of times that send has to wait.
+ int numWaits = 0;
+
+ // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with
+ // the test timeout.
+ int waitLimit = (int) (TIMEOUT_DEFAULT / 10000);
+
while ((_maxPendingSize > 0))
{
// Get the size estimate of sent but not yet received messages.
int unreceived = _unreceived.get();
int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
+ // log.debug("unreceived = " + unreceived);
+ // log.debug("unreceivedSize = " + unreceivedSize);
+ // log.debug("_maxPendingSize = " + _maxPendingSize);
+
if (unreceivedSize > _maxPendingSize)
{
// log.debug("unreceived size estimate over limit = " + unreceivedSize);
@@ -1172,8 +1167,8 @@
// Wait on the send pause barrier for the limit to be re-established.
try
{
- // _sendPauseBarrier.await();
- _sendPauseMonitor.wait(1000);
+ _sendPauseMonitor.wait(10000);
+ numWaits++;
}
catch (InterruptedException e)
{
@@ -1181,10 +1176,17 @@
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
- /*catch (BrokenBarrierException e)
+
+ // Fail the test if the send has had to wait more than the maximum allowed number of times.
+ if (numWaits >= waitLimit)
{
- throw new RuntimeException(e);
- }*/
+ String errorMessage =
+ "Send has had to wait for the unreceivedSize (" + unreceivedSize
+ + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit
+ + " times.";
+ log.warn(errorMessage);
+ throw new RuntimeException(errorMessage);
+ }
}
else
{
Modified: incubator/qpid/branches/M2/java/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/pom.xml?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/pom.xml (original)
+++ incubator/qpid/branches/M2/java/pom.xml Fri Sep 28 03:41:49 2007
@@ -506,7 +506,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
- <version>1.0</version>
+ <version>1.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Fri Sep 28 03:41:49 2007
@@ -33,6 +33,14 @@
*/
public class TestableMemoryMessageStore extends MemoryMessageStore
{
+
+ MemoryMessageStore _mms = null;
+
+ public TestableMemoryMessageStore(MemoryMessageStore mms)
+ {
+ _mms = mms;
+ }
+
public TestableMemoryMessageStore()
{
_metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
@@ -41,11 +49,25 @@
public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
{
- return _metaDataMap;
+ if (_mms != null)
+ {
+ return _mms._metaDataMap;
+ }
+ else
+ {
+ return _metaDataMap;
+ }
}
public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
{
- return _contentBodyMap;
+ if (_mms != null)
+ {
+ return _mms._contentBodyMap;
+ }
+ else
+ {
+ return _contentBodyMap;
+ }
}
}
Modified: incubator/qpid/branches/M2/python/tests/basic.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/python/tests/basic.py?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/python/tests/basic.py (original)
+++ incubator/qpid/branches/M2/python/tests/basic.py Fri Sep 28 03:41:49 2007
@@ -339,9 +339,11 @@
channel = self.channel
channel.queue_declare(queue="test-get", exclusive=True)
- #publish some messages (no_ack=True)
+ #publish some messages (no_ack=True) with persistent messaging
for i in range(1, 11):
- channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+ msg=Content("Message %d" % i)
+ msg["delivery mode"] = 2
+ channel.basic_publish(routing_key="test-get",content=msg )
#use basic_get to read back the messages, and check that we get an empty at the end
for i in range(1, 11):
@@ -354,18 +356,53 @@
self.assertEqual(reply.method.klass.name, "basic")
self.assertEqual(reply.method.name, "get-empty")
- #repeat for no_ack=False
+
+ #publish some messages (no_ack=True) transient messaging
for i in range(11, 21):
channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+ #use basic_get to read back the messages, and check that we get an empty at the end
for i in range(11, 21):
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-ok")
+ self.assertEqual("Message %d" % i, reply.content.body)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-empty")
+
+ #repeat for no_ack=False
+
+ #publish some messages (no_ack=False) with persistent messaging
+ for i in range(21, 31):
+ msg=Content("Message %d" % i)
+ msg["delivery mode"] = 2
+ channel.basic_publish(routing_key="test-get",content=msg )
+
+ #use basic_get to read back the messages, and check that we get an empty at the end
+ for i in range(21, 31):
+ reply = channel.basic_get(no_ack=False)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-ok")
+ self.assertEqual("Message %d" % i, reply.content.body)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-empty")
+
+ #public some messages (no_ack=False) with transient messaging
+ for i in range(31, 41):
+ channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+
+ for i in range(31, 41):
reply = channel.basic_get(no_ack=False)
self.assertEqual(reply.method.klass.name, "basic")
self.assertEqual(reply.method.name, "get-ok")
self.assertEqual("Message %d" % i, reply.content.body)
- if(i == 13):
+ if(i == 33):
channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True)
- if(i in [15, 17, 19]):
+ if(i in [35, 37, 39]):
channel.basic_ack(delivery_tag=reply.delivery_tag)
reply = channel.basic_get(no_ack=True)
@@ -375,8 +412,8 @@
#recover(requeue=True)
channel.basic_recover(requeue=True)
- #get the unacked messages again (14, 16, 18, 20)
- for i in [14, 16, 18, 20]:
+ #get the unacked messages again (34, 36, 38, 40)
+ for i in [34, 36, 38, 40]:
reply = channel.basic_get(no_ack=False)
self.assertEqual(reply.method.klass.name, "basic")
self.assertEqual(reply.method.name, "get-ok")