You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/07/15 18:58:30 UTC
svn commit: r676969 [5/5] - in /incubator/qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/testutil/
client/src/test/java/org/apache/qpid/client/
client/src/test/java/org/apache/qpid/test/unit/ack/
client/src/test/java/org/apache/qpid/test/un...
Added: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ConversationFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ConversationFactory.java?rev=676969&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ConversationFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/ConversationFactory.java Tue Jul 15 09:58:26 2008
@@ -0,0 +1,480 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.utils;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.util.ReflectionUtils;
+
+import javax.jms.*;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation
+ * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant
+ * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids.
+ *
+ * <p/>One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a
+ * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation
+ * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order
+ * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded
+ * conversation (the conversation methods can be called many times in parallel):
+ *
+ * <p/><pre>
+ * class Initiator
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, null,
+ * java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * initiateConversation()
+ * {
+ * try {
+ * // Exchange greetings.
+ * conversation.send(sendDestination, conversation.getSession().createTextMessage("Hello."));
+ * Message greeting = conversation.receive();
+ *
+ * // Exchange goodbyes.
+ * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ * Message goodbye = conversation.receive();
+ * } finally {
+ * conversation.end();
+ * }
+ * }
+ * }
+ *
+ * class Responder
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, receiveDestination,
+ * java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * respondToConversation()
+ * {
+ * try {
+ * // Exchange greetings.
+ * Message greeting = conversation.receive();
+ * conversation.send(conversation.getSession().createTextMessage("Hello."));
+ *
+ * // Exchange goodbyes.
+ * Message goodbye = conversation.receive();
+ * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ * } finally {
+ * conversation.end();
+ * }
+ * }
+ * }
+ * </pre>
+ *
+ * <p/>Conversation correlation id's are generated on a per thread basis.
+ *
+ * <p/>The same controlSession is shared amongst all conversations. Calls to send are therefore synchronized because JMS
+ * sessions are not multi-threaded.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Associate messages to an ongoing conversation using correlation ids.
+ * <tr><td> Auto manage sessions for conversations.
+ * <tr><td> Store messages not in a conversation in dead letter box.
+ * </table>
+ */
+public class ConversationFactory
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(ConversationFactory.class);
+
+ /** Holds a map from correlation id's to queues. */
+ private Map<Long, BlockingQueue<Message>> idsToQueues = new HashMap<Long, BlockingQueue<Message>>();
+
+ /** Holds the connection over which the conversation is conducted. */
+ private Connection connection;
+
+ /** Holds the controlSession over which the conversation is conduxted. */
+ private Session session;
+
+ /** The message consumer for incoming messages. */
+ MessageConsumer consumer;
+
+ /** The message producer for outgoing messages. */
+ MessageProducer producer;
+
+ /** The well-known or temporary destination to receive replies on. */
+ Destination receiveDestination;
+
+ /** Holds the queue implementation class for the reply queue. */
+ Class<? extends BlockingQueue> queueClass;
+
+ /** Used to hold any replies that are received outside of the context of a conversation. */
+ BlockingQueue<Message> deadLetterBox = new LinkedBlockingQueue<Message>();
+
+ /* Used to hold conversation state on a per thread basis. */
+ /*
+ ThreadLocal<Conversation> threadLocals =
+ new ThreadLocal<Conversation>()
+ {
+ protected Conversation initialValue()
+ {
+ Conversation settings = new Conversation();
+ settings.conversationId = conversationIdGenerator.getAndIncrement();
+
+ return settings;
+ }
+ };
+ */
+
+ /** Generates new coversation id's as needed. */
+ AtomicLong conversationIdGenerator = new AtomicLong();
+
+ /**
+ * Creates a conversation helper on the specified connection with the default sending destination, and listening
+ * to the specified receiving destination.
+ *
+ * @param connection The connection to build the conversation helper on.
+ * @param receiveDestination The destination to listen to for incoming messages. This may be null to use a temporary
+ * queue.
+ * @param queueClass The queue implementation class.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public ConversationFactory(Connection connection, Destination receiveDestination,
+ Class<? extends BlockingQueue> queueClass) throws JMSException
+ {
+ log.debug("public ConversationFactory(Connection connection, Destination receiveDestination = " + receiveDestination
+ + ", Class<? extends BlockingQueue> queueClass = " + queueClass + "): called");
+
+ this.connection = connection;
+ this.queueClass = queueClass;
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Check if a well-known receive destination has been provided, or use a temporary queue if not.
+ this.receiveDestination = (receiveDestination != null) ? receiveDestination : session.createTemporaryQueue();
+
+ consumer = session.createConsumer(receiveDestination);
+ producer = session.createProducer(null);
+
+ consumer.setMessageListener(new Receiver());
+ }
+
+ /**
+ * Creates a new conversation context.
+ *
+ * @return A new conversation context.
+ */
+ public Conversation startConversation()
+ {
+ log.debug("public Conversation startConversation(): called");
+
+ Conversation conversation = new Conversation();
+ conversation.conversationId = conversationIdGenerator.getAndIncrement();
+
+ return conversation;
+ }
+
+ /**
+ * Ensures that the reply queue for a conversation exists.
+ *
+ * @param conversationId The conversation correlation id.
+ */
+ private void initQueueForId(long conversationId)
+ {
+ if (!idsToQueues.containsKey(conversationId))
+ {
+ idsToQueues.put(conversationId, ReflectionUtils.<BlockingQueue>newInstance(queueClass));
+ }
+ }
+
+ /**
+ * Clears the dead letter box, returning all messages that were in it.
+ *
+ * @return All messages in the dead letter box.
+ */
+ public Collection<Message> emptyDeadLetterBox()
+ {
+ log.debug("public Collection<Message> emptyDeadLetterBox(): called");
+
+ Collection<Message> result = new ArrayList<Message>();
+ deadLetterBox.drainTo(result);
+
+ return result;
+ }
+
+ /**
+ * Gets the controlSession over which the conversation is conducted.
+ *
+ * @return The controlSession over which the conversation is conducted.
+ */
+ public Session getSession()
+ {
+ // Conversation settings = threadLocals.get();
+
+ return session;
+ }
+
+ /**
+ * Used to hold a conversation context. This consists of a correlating id for the conversation, and a reply
+ * destination automatically updated to the last received reply-to destination.
+ */
+ public class Conversation
+ {
+ /** Holds the correlation id for the context. */
+ long conversationId;
+
+ /**
+ * Holds the send destination for the context. This will automatically be updated to the most recently received
+ * reply-to destination.
+ */
+ Destination sendDestination;
+
+ /**
+ * Sends a message to the default sending location. The correlation id of the message will be assigned by this
+ * method, overriding any previously set value.
+ *
+ * @param sendDestination The destination to send to. This may be null to use the last received reply-to
+ * destination.
+ * @param message The message to send.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through. This will also be thrown if no
+ * send destination is specified and there is no most recent reply-to destination available
+ * to use.
+ */
+ public void send(Destination sendDestination, Message message) throws JMSException
+ {
+ log.debug("public void send(Destination sendDestination = " + sendDestination + ", Message message = " + message
+ + "): called");
+
+ // Conversation settings = threadLocals.get();
+ // long conversationId = conversationId;
+ message.setJMSCorrelationID(Long.toString(conversationId));
+ message.setJMSReplyTo(receiveDestination);
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+
+ // Check if an overriding send to destination has been set or use the last reply-to if not.
+ Destination sendTo = null;
+
+ if (sendDestination != null)
+ {
+ sendTo = sendDestination;
+ }
+ else if (sendDestination != null)
+ {
+ sendTo = sendDestination;
+ }
+ else
+ {
+ throw new JMSException("The send destination was specified, and no most recent reply-to available to use.");
+ }
+
+ // Send the message.
+ synchronized (this)
+ {
+ producer.send(sendTo, message);
+ }
+ }
+
+ /**
+ * Gets the next message in an ongoing conversation. This method may block until such a message is received.
+ *
+ * @return The next incoming message in the conversation.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through. Thrown if the received message
+ * did not have its reply-to destination set up.
+ */
+ public Message receive() throws JMSException
+ {
+ log.debug("public Message receive(): called");
+
+ // Conversation settings = threadLocals.get();
+ // long conversationId = settings.conversationId;
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+
+ try
+ {
+ Message result = queue.take();
+
+ // Keep the reply-to destination to send replies to.
+ sendDestination = result.getJMSReplyTo();
+
+ return result;
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are
+ * received they will be returned. If a timeout is specified, then all messages up to the limit, received within
+ * that timespan will be returned. At least one of the message count or timeout should be set to a value of
+ * 1 or greater.
+ *
+ * @param num The number of messages to receive, or all if this is less than 1.
+ * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1.
+ *
+ * @return All messages received within the count limit and the timeout.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through.
+ */
+ public Collection<Message> receiveAll(int num, long timeout) throws JMSException
+ {
+ log.debug("public Collection<Message> receiveAll(int num = " + num + ", long timeout = " + timeout
+ + "): called");
+
+ // Check that a timeout or message count was set.
+ if ((num < 1) && (timeout < 1))
+ {
+ throw new IllegalArgumentException("At least one of message count (num) or timeout must be set.");
+ }
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+
+ // Used to collect the received messages in.
+ Collection<Message> result = new ArrayList<Message>();
+
+ // Used to indicate when the timeout or message count has expired.
+ boolean receiveMore = true;
+
+ int messageCount = 0;
+
+ // Receive messages until the timeout or message count expires.
+ do
+ {
+ try
+ {
+ Message next = null;
+
+ // Try to receive the message with a timeout if one has been set.
+ if (timeout > 0)
+ {
+ next = queue.poll(timeout, TimeUnit.MILLISECONDS);
+
+ // Check if the timeout expired, and stop receiving if so.
+ if (next == null)
+ {
+ receiveMore = false;
+ }
+ }
+ // Receive the message without a timeout.
+ else
+ {
+ next = queue.take();
+ }
+
+ // Increment the message count if a message was received.
+ messageCount += (next != null) ? 1 : 0;
+
+ // Check if all the requested messages were received, and stop receiving if so.
+ if ((num > 0) && (messageCount >= num))
+ {
+ receiveMore = false;
+ }
+
+ // Keep the reply-to destination to send replies to.
+ sendDestination = (next != null) ? next.getJMSReplyTo() : sendDestination;
+
+ if (next != null)
+ {
+ result.add(next);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Restore the threads interrupted status.
+ Thread.currentThread().interrupt();
+
+ // Stop receiving but return the messages received so far.
+ receiveMore = false;
+ }
+ }
+ while (receiveMore);
+
+ return result;
+ }
+
+ /**
+ * Completes the conversation. Any correlation id's pertaining to the conversation are no longer valid, and any
+ * incoming messages using them will go to the dead letter box.
+ */
+ public void end()
+ {
+ log.debug("public void end(): called");
+
+ // Ensure that the thread local for the current thread is cleaned up.
+ // Conversation settings = threadLocals.get();
+ // long conversationId = settings.conversationId;
+ // threadLocals.remove();
+
+ // Ensure that its queue is removed from the queue map.
+ BlockingQueue<Message> queue = idsToQueues.remove(conversationId);
+
+ // Move any outstanding messages on the threads conversation id into the dead letter box.
+ queue.drainTo(deadLetterBox);
+ }
+ }
+
+ /**
+ * Implements the message listener for this conversation handler.
+ */
+ protected class Receiver implements MessageListener
+ {
+ /**
+ * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id
+ * and placed into queues.
+ *
+ * @param message The incoming message.
+ */
+ public void onMessage(Message message)
+ {
+ log.debug("public void onMessage(Message message = " + message + "): called");
+
+ try
+ {
+ Long conversationId = Long.parseLong(message.getJMSCorrelationID());
+
+ // Find the converstaion queue to place the message on. If there is no conversation for the message id,
+ // the the dead letter box queue is used.
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+ queue = (queue == null) ? deadLetterBox : queue;
+
+ queue.put(message);
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
Copied: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java (from r676963, incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/FailoverBaseCase.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java?p2=incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java&p1=incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/FailoverBaseCase.java&r1=676963&r2=676969&rev=676969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/FailoverBaseCase.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java Tue Jul 15 09:58:26 2008
@@ -18,25 +18,31 @@
* under the License.
*
*/
-package org.apache.qpid.test;
+package org.apache.qpid.test.utils;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.testutil.QpidTestCase;
import javax.jms.Connection;
public class FailoverBaseCase extends QpidTestCase
{
- protected long RECEIVE_TIMEOUT = 1000l;
+ private boolean failedOver = true;
+
protected void setUp() throws java.lang.Exception
{
super.setUp();
- if( _broker.equals(VM) )
+
+ try
+ {
+ TransportConnection.createVMBroker(2);
+ }
+ catch (Exception e)
{
- System.getProperties().setProperty("amqj.AutoCreateVMBroker", "true");
+ fail("Unable to create broker: " + e);
}
+
}
/**
@@ -60,13 +66,24 @@
return conn;
}
+ public void tearDown() throws Exception
+ {
+ if (!failedOver)
+ {
+ TransportConnection.killVMBroker(2);
+ ApplicationRegistry.remove(2);
+ }
+ super.tearDown();
+ }
+
+
/**
* Only used of VM borker.
- * // TODO: update the failover mechanism once 0.10 provides support for failover.
*/
public void failBroker()
{
- TransportConnection.killVMBroker(1);
- ApplicationRegistry.remove(1);
+ failedOver = true;
+ TransportConnection.killVMBroker(2);
+ ApplicationRegistry.remove(2);
}
}
Copied: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidClientConnection.java (from r676963, incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidClientConnection.java?p2=incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidClientConnection.java&p1=incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java&r1=676963&r2=676969&rev=676969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidClientConnection.java Tue Jul 15 09:58:26 2008
@@ -19,13 +19,11 @@
*
*/
-package org.apache.qpid.testutil;
+package org.apache.qpid.test.utils;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.JMSAMQException;
-import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.test.utils.QpidTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Copied: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidClientConnectionHelper.java (from r676963, incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidClientConnectionHelper.java?p2=incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidClientConnectionHelper.java&p1=incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java&r1=676963&r2=676969&rev=676969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidClientConnectionHelper.java Tue Jul 15 09:58:26 2008
@@ -19,7 +19,9 @@
*
*/
-package org.apache.qpid.testutil;
+package org.apache.qpid.test.utils;
+
+import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
@@ -27,9 +29,6 @@
import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.url.URLSyntaxException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
@@ -40,9 +39,16 @@
import javax.jms.Session;
import javax.jms.TextMessage;
-public class QpidClientConnection extends QpidTestCase implements ExceptionListener
+/**
+ * @todo This was originally cut and paste from the client module leading to a duplicate class, then altered very
+ * slightly. To avoid the duplicate class the name was altered slightly to have 'Helper' on the end in order
+ * to distinguish it from the original. Delete this class and use the original instead, just upgrade it to
+ * provide the new features needed.
+ */
+public class QpidClientConnectionHelper implements ExceptionListener
{
- private static final Logger _logger = LoggerFactory.getLogger(QpidClientConnection.class);
+
+ private static final Logger _logger = Logger.getLogger(QpidClientConnectionHelper.class);
private boolean transacted = true;
private int ackMode = Session.CLIENT_ACKNOWLEDGE;
@@ -54,7 +60,7 @@
protected Session session;
protected boolean connected;
- public QpidClientConnection(String broker)
+ public QpidClientConnectionHelper(String broker)
{
super();
setVirtualHost("/test");
@@ -62,12 +68,6 @@
setPrefetch(5000);
}
-
- public Connection getConnection()
- {
- return connection;
- }
-
public void connect() throws JMSException
{
if (!connected)
@@ -81,8 +81,10 @@
String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
try
{
+ AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
_logger.info("connecting to Qpid :" + brokerUrl);
- connection = getConnection("guest", "guest") ;
+ connection = factory.createConnection();
+
// register exception listener
connection.setExceptionListener(this);
@@ -93,14 +95,14 @@
connected = true;
}
- catch (Exception e)
+ catch (URLSyntaxException e)
{
throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e);
}
}
}
- public void disconnect() throws Exception
+ public void disconnect() throws JMSException
{
if (connected)
{
@@ -173,7 +175,7 @@
*
* @throws javax.jms.JMSException any exception that occurs
*/
- public void put(String queueName, String payload, int copies) throws JMSException
+ public void put(String queueName, String payload, int copies, int deliveryMode) throws JMSException
{
if (!connected)
{
@@ -185,6 +187,8 @@
final MessageProducer sender = session.createProducer(queue);
+ sender.setDeliveryMode(deliveryMode);
+
for (int i = 0; i < copies; i++)
{
Message m = session.createTextMessage(payload + i);
Copied: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java (from r676963, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/testutil/QpidTestCase.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java?p2=incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/testutil/QpidTestCase.java&r1=676963&r2=676969&rev=676969&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/testutil/QpidTestCase.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java Tue Jul 15 09:58:26 2008
@@ -15,7 +15,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.qpid.testutil;
+package org.apache.qpid.test.utils;
import junit.framework.TestCase;
import junit.framework.TestResult;
@@ -46,6 +46,8 @@
private static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class);
+ protected long RECEIVE_TIMEOUT = 1000l;
+
/**
* Some tests are excluded when the property test.excludes is set to true.
* An exclusion list is either a file (prop test.excludesfile) which contains one test name
@@ -333,6 +335,7 @@
else if (_broker.equals(VM))
{
TransportConnection.killAllVMBrokers();
+ //ApplicationRegistry.removeAll();
}
_brokerStarted = false;
}
Added: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/protocol/TestIoSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/protocol/TestIoSession.java?rev=676969&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/protocol/TestIoSession.java (added)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/protocol/TestIoSession.java Tue Jul 15 09:58:26 2008
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.utils.protocol;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.common.support.BaseIoSession;
+
+public class TestIoSession extends BaseIoSession {
+
+ private String _stringLocalAddress;
+ private int _localPort;
+
+ public SocketAddress getLocalAddress()
+ {
+ //create a new address for testing purposes using member variables
+ return new InetSocketAddress(_stringLocalAddress,_localPort);
+ }
+
+ protected void updateTrafficMask() {
+ //dummy
+ }
+
+ public IoService getService() {
+ return null;
+ }
+
+ public IoServiceConfig getServiceConfig() {
+ return null;
+ }
+
+ public IoHandler getHandler() {
+ return null;
+ }
+
+ public IoSessionConfig getConfig() {
+ return null;
+ }
+
+ public IoFilterChain getFilterChain() {
+ return null;
+ }
+
+ public TransportType getTransportType() {
+ return null;
+ }
+
+ public SocketAddress getRemoteAddress() {
+ return null;
+ }
+
+ public SocketAddress getServiceAddress() {
+ return null;
+ }
+
+ public int getScheduledWriteRequests() {
+ return 0;
+ }
+
+ public int getScheduledWriteBytes() {
+ return 0;
+ }
+
+ public String getStringLocalAddress() {
+ return _stringLocalAddress;
+ }
+
+ public void setStringLocalAddress(String _stringLocalAddress) {
+ this._stringLocalAddress = _stringLocalAddress;
+ }
+
+ public int getLocalPort() {
+ return _localPort;
+ }
+
+ public void setLocalPort(int _localPort) {
+ this._localPort = _localPort;
+ }
+}