You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gr...@apache.org on 2010/10/13 17:06:27 UTC
svn commit: r1022127 [8/15] - in
/qpid/branches/grkvlt-network-20101013/qpid/java: ./
broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/
broker-plugins/access-control/src/test/java/org/apache/qpid/server/securit...
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Wed Oct 13 15:05:29 2010
@@ -47,7 +47,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer
+public abstract class BasicMessageConsumer<U extends UnprocessedMessage & AMQSession.Dispatchable> extends Closeable implements MessageConsumer
{
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
@@ -805,21 +805,13 @@ public abstract class BasicMessageConsum
*/
Long getLastDelivered()
{
- if (!_receivedDeliveryTags.isEmpty())
+ Long lastDeliveryTag = null;
+ while (!_receivedDeliveryTags.isEmpty());
{
- Long lastDeliveryTag = _receivedDeliveryTags.poll();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- lastDeliveryTag = _receivedDeliveryTags.poll();
- }
-
- assert _receivedDeliveryTags.isEmpty();
-
- return lastDeliveryTag;
+ lastDeliveryTag = _receivedDeliveryTags.poll();
}
- return null;
+ return lastDeliveryTag;
}
/**
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Wed Oct 13 15:05:29 2010
@@ -19,6 +19,7 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Wed Oct 13 15:05:29 2010
@@ -274,93 +274,55 @@ public abstract class BasicMessageProduc
public void send(Message message) throws JMSException
{
- checkPreConditions();
- checkInitialDestination();
-
-
- synchronized (_connection.getFailoverMutex())
- {
- sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
- }
+ send(message, _deliveryMode);
}
public void send(Message message, int deliveryMode) throws JMSException
{
- checkPreConditions();
- checkInitialDestination();
-
- synchronized (_connection.getFailoverMutex())
- {
- sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
- }
+ send(message, deliveryMode, _immediate);
}
public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
{
- checkPreConditions();
- checkInitialDestination();
- synchronized (_connection.getFailoverMutex())
- {
- sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate);
- }
+ send(message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate);
}
public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
{
+ send(message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
+ }
+
+ public void send(Message message, int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException
+ {
checkPreConditions();
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
+ sendImpl(_destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
}
}
public void send(Destination destination, Message message) throws JMSException
{
- checkPreConditions();
- checkDestination(destination);
- synchronized (_connection.getFailoverMutex())
- {
- validateDestination(destination);
- sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory,
- _immediate);
- }
+ send(destination, message, _deliveryMode, _messagePriority, _timeToLive);
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
throws JMSException
{
- checkPreConditions();
- checkDestination(destination);
- synchronized (_connection.getFailoverMutex())
- {
- validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
- }
+ send((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory);
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
boolean mandatory) throws JMSException
{
- checkPreConditions();
- checkDestination(destination);
- synchronized (_connection.getFailoverMutex())
- {
- validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate);
- }
+ send((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate);
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
boolean mandatory, boolean immediate) throws JMSException
{
- checkPreConditions();
- checkDestination(destination);
- synchronized (_connection.getFailoverMutex())
- {
- validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate);
- }
+ send((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
@@ -439,12 +401,6 @@ public abstract class BasicMessageProduc
}
}
- protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate) throws JMSException
- {
- sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
- }
-
/**
* The caller of this method must hold the failover mutex.
*
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Wed Oct 13 15:05:29 2010
@@ -231,14 +231,7 @@ public class FailoverHandler implements
{
_logger.info("Failover process failed - exception being propagated by protocol handler");
_amqProtocolHandler.setFailoverState(FailoverState.FAILED);
- /*try
- {*/
_amqProtocolHandler.exception(e);
- /*}
- catch (Exception ex)
- {
- _logger.error("Error notifying protocol session of error: " + ex, ex);
- }*/
}
}
}
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Wed Oct 13 15:05:29 2010
@@ -77,7 +77,7 @@ public class ConnectionCloseMethodHandle
{
_logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName());
- error = new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null);
+ error = new AMQAuthenticationException(reason == null ? null : reason.toString(), null);
}
else if (errorCode == AMQConstant.ACCESS_REFUSED)
{
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Wed Oct 13 15:05:29 2010
@@ -21,6 +21,7 @@
package org.apache.qpid.client.message;
+import java.lang.ref.SoftReference;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
@@ -38,7 +39,6 @@ import javax.jms.Session;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.collections.ReferenceMap;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_10;
@@ -61,7 +61,7 @@ import org.apache.qpid.transport.ReplyTo
*/
public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
{
- private static final Map<ReplyTo, Destination> _destinationCache = Collections.synchronizedMap(new ReferenceMap());
+ private static final Map<ReplyTo, SoftReference<Destination>> _destinationCache = Collections.synchronizedMap(new HashMap<ReplyTo, SoftReference<Destination>>());
public static final String JMS_TYPE = "x-jms-type";
@@ -216,7 +216,8 @@ public class AMQMessageDelegate_0_10 ext
}
else
{
- Destination dest = _destinationCache.get(replyTo);
+ SoftReference<Destination> ref = _destinationCache.get(replyTo);
+ Destination dest = ref.get();
if (dest == null)
{
String exchange = replyTo.getExchange();
@@ -225,11 +226,7 @@ public class AMQMessageDelegate_0_10 ext
dest = generateDestination(exchange == null ? null : new AMQShortString(exchange),
routingKey == null ? null : new AMQShortString(routingKey));
-
-
-
-
- _destinationCache.put(replyTo, dest);
+ _destinationCache.put(replyTo, new SoftReference<Destination>(dest));
}
return dest;
@@ -276,7 +273,7 @@ public class AMQMessageDelegate_0_10 ext
}
final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString());
- _destinationCache.put(replyTo, destination);
+ _destinationCache.put(replyTo, new SoftReference<Destination>(destination));
_messageProps.setReplyTo(replyTo);
}
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Wed Oct 13 15:05:29 2010
@@ -21,9 +21,11 @@
package org.apache.qpid.client.message;
+import java.lang.ref.SoftReference;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Enumeration;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -38,7 +40,6 @@ import org.apache.qpid.client.AMQSession
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.client.JMSAMQException;
-import org.apache.qpid.collections.ReferenceMap;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderProperties;
@@ -47,7 +48,7 @@ import org.apache.qpid.url.BindingURL;
public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
- private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
+ private static final Map<String, SoftReference<Destination>> _destinationCache = Collections.synchronizedMap(new HashMap<String, SoftReference<Destination>>());
public static final String JMS_TYPE = "x-jms-type";
@@ -181,7 +182,8 @@ public class AMQMessageDelegate_0_8 exte
}
else
{
- Destination dest = (Destination) _destinationCache.get(replyToEncoding);
+ SoftReference<Destination> ref = _destinationCache.get(replyToEncoding);
+ Destination dest = ref.get();
if (dest == null)
{
try
@@ -194,7 +196,7 @@ public class AMQMessageDelegate_0_8 exte
throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
}
- _destinationCache.put(replyToEncoding, dest);
+ _destinationCache.put(replyToEncoding, new SoftReference<Destination>(dest));
}
return dest;
@@ -218,7 +220,7 @@ public class AMQMessageDelegate_0_8 exte
final AMQDestination amqd = (AMQDestination) destination;
final AMQShortString encodedDestination = amqd.getEncodedName();
- _destinationCache.put(encodedDestination, destination);
+ _destinationCache.put(encodedDestination.asString(), new SoftReference<Destination>(amqd));
getContentHeaderProperties().setReplyTo(encodedDestination);
}
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Wed Oct 13 15:05:29 2010
@@ -56,42 +56,44 @@ import org.apache.qpid.framing.Heartbeat
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.Job;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.network.io.IoTransport;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.NetworkTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
- * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
+ * network by MINA.
+ *
+ * The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
* specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the
* event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP,
* expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in
* terms of "message received" and so on.
- *
- * <p/>There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is
+ * <p>
+ * There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is
* exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public
* API calls through which an individual connection can be manipulated. This protocol handler talks to the network
* through MINA, in a behind the scenes role; it is not an exposed part of the client API.
- *
- * <p/>There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level,
+ * <p>
+ * There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level,
* there is one session per connection. At the AMQP level there can be many channels which are also called sessions in
* JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol
* session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions
* in the event of failover. See below for more information about this.
- *
- * <p/>Mina provides a session container that can be used to store/retrieve arbitrary objects as String named
+ * <p>
+ * Mina provides a session container that can be used to store/retrieve arbitrary objects as String named
* attributes. A more convenient, type-safe, container for session data is provided in the form of
* {@link AMQProtocolSession}.
- *
- * <p/>A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session
+ * <p>
+ * A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session
* object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper
* as described above). This event handler is different, because dealing with failover complicates things. To the
* end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but
@@ -99,8 +101,8 @@ import org.slf4j.LoggerFactory;
* be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old
* connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection
* and the protocol session data is held outside of the MINA IOSession.
- *
- * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through.
+ * <p>
+ * This handler is responsibile for setting up the filter chain to filter all events for this handler through.
* The filter chain is set up as a stack of event handers that perform the following functions (working upwards from
* the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work,
* optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.
@@ -118,7 +120,7 @@ import org.slf4j.LoggerFactory;
* held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
* that lifecycles of the fields match lifecycles of their containing objects.
*/
-public class AMQProtocolHandler implements ProtocolEngine
+public class AMQProtocolHandler implements Receiver<java.nio.ByteBuffer>
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
@@ -170,7 +172,9 @@ public class AMQProtocolHandler implemen
private Job _readJob;
private Job _writeJob;
private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
- private NetworkDriver _networkDriver;
+ private Sender<ByteBuffer> _sender;
+ private NetworkConnection _network;
+ private NetworkTransport _transport;
private ProtocolVersion _suggestedProtocolVersion;
private long _writtenBytes;
@@ -194,21 +198,6 @@ public class AMQProtocolHandler implemen
}
/**
- * Called when we want to create a new IoTransport session
- * @param brokerDetail
- */
- public void createIoTransportSession(BrokerDetails brokerDetail)
- {
- _protocolSession = new AMQProtocolSession(this, _connection);
- _stateManager.setProtocolSession(_protocolSession);
- IoTransport.connect_0_9(getProtocolSession(),
- brokerDetail.getHost(),
- brokerDetail.getPort(),
- brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL));
- _protocolSession.init();
- }
-
- /**
* Called when the network connection is closed. This can happen, either because the client explicitly requested
* that the connection be closed, in which case nothing is done, or because the connection died. In the case
* where the connection died, an attempt to failover automatically to a new connection may be started. The failover
@@ -290,7 +279,7 @@ public class AMQProtocolHandler implemen
// failover:
HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
- _networkDriver.close();
+ _sender.close();
}
public void writerIdle()
@@ -312,7 +301,10 @@ public class AMQProtocolHandler implemen
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
// this will attempt failover
- _networkDriver.close();
+ if (_sender != null)
+ {
+ _sender.close();
+ }
closed();
}
else
@@ -564,7 +556,7 @@ public class AMQProtocolHandler implemen
{
public void run()
{
- _networkDriver.send(buf);
+ _sender.send(buf);
}
});
if (PROTOCOL_DEBUG)
@@ -585,7 +577,7 @@ public class AMQProtocolHandler implemen
if (wait)
{
- _networkDriver.flush();
+ _sender.flush();
}
}
@@ -699,7 +691,7 @@ public class AMQProtocolHandler implemen
try
{
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _networkDriver.close();
+ _sender.close();
closed();
}
catch (AMQTimeoutException e)
@@ -714,18 +706,6 @@ public class AMQProtocolHandler implemen
_poolReference.releaseExecutorService();
}
- /** @return the number of bytes read from this protocol session */
- public long getReadBytes()
- {
- return _readBytes;
- }
-
- /** @return the number of bytes written to this protocol session */
- public long getWrittenBytes()
- {
- return _writtenBytes;
- }
-
public void failover(String host, int port)
{
_failoverHandler.setHost(host);
@@ -819,17 +799,19 @@ public class AMQProtocolHandler implemen
public SocketAddress getRemoteAddress()
{
- return _networkDriver.getRemoteAddress();
+ return _network.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _networkDriver.getLocalAddress();
+ return _transport.getAddress();
}
- public void setNetworkDriver(NetworkDriver driver)
+ public void connect(NetworkTransport transport, NetworkConnection network)
{
- _networkDriver = driver;
+ _transport = transport;
+ _network = network;
+ _sender = network.getSender();
}
/** @param delay delay in seconds (not ms) */
@@ -837,17 +819,13 @@ public class AMQProtocolHandler implemen
{
if (delay > 0)
{
- getNetworkDriver().setMaxWriteIdle(delay);
- getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+// FIXME
+// _sender.setMaxWriteIdle(delay);
+// _sender.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
- public NetworkDriver getNetworkDriver()
- {
- return _networkDriver;
- }
-
public ProtocolVersion getSuggestedProtocolVersion()
{
return _suggestedProtocolVersion;
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Wed Oct 13 15:05:29 2010
@@ -20,27 +20,36 @@
*/
package org.apache.qpid.client.protocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
import javax.security.sasl.SaslClient;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.Sender;
-import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
@@ -79,8 +88,7 @@ public class AMQProtocolSession implemen
private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
/** Counter to ensure unique queue names */
- protected int _queueId = 1;
- protected final Object _queueIdLock = new Object();
+ protected static final AtomicLong _queueIdGenerator = new AtomicLong(0);
private ProtocolVersion _protocolVersion;
// private VersionSpecificRegistry _registry =
@@ -380,11 +388,7 @@ public class AMQProtocolSession implemen
protected AMQShortString generateQueueName()
{
- int id;
- synchronized (_queueIdLock)
- {
- id = _queueId++;
- }
+ long id = _queueIdGenerator.incrementAndGet();
// convert '.', '/', ':' and ';' to single '_', for spec compliance and readability
String localAddress = _protocolHandler.getLocalAddress().toString().replaceAll("[./:;]", "_");
String queueName = "tmp_" + localAddress + "_" + id;
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java Wed Oct 13 15:05:29 2010
@@ -36,7 +36,7 @@ public class CRAMMD5HashedSaslClientFact
public static final String MECHANISM = "CRAM-MD5-HASHED";
- public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map<String, ?> props, CallbackHandler cbh) throws SaslException
+ public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map props, CallbackHandler cbh) throws SaslException
{
for (int i = 0; i < mechanisms.length; i++)
{
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Wed Oct 13 15:05:29 2010
@@ -20,42 +20,21 @@
*/
package org.apache.qpid.client.util;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.LinkedBlockingQueue;
/**
- * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
+ * A {@link BlockingQueue} that emits events above a user specified threshold allowing the caller to take action (e.g. flow
* control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the
- * caller is not obliged to react to the events. <p/> This implementation is <b>only</b> safe where we have a single
- * thread adding items and a single (different) thread removing items.
- *
- * @todo Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted.
+ * caller is not obliged to react to the events.
*/
-public class FlowControllingBlockingQueue
+public class FlowControllingBlockingQueue<E> extends LinkedBlockingQueue<E>
{
- private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class);
-
- /** This queue is bounded and is used to store messages before being dispatched to the consumer */
- private final Queue _queue = new ConcurrentLinkedQueue();
-
private final int _flowControlHighThreshold;
private final int _flowControlLowThreshold;
private final ThresholdListener _listener;
-
- /** We require a separate count so we can track whether we have reached the threshold */
- private int _count;
- private boolean disableFlowControl;
-
- public boolean isEmpty()
- {
- return _queue.isEmpty();
- }
+ private boolean _disableFlowControl;
public interface ThresholdListener
{
@@ -63,6 +42,11 @@ public class FlowControllingBlockingQueu
void underThreshold(int currentValue);
}
+
+ public FlowControllingBlockingQueue()
+ {
+ this(0, null);
+ }
public FlowControllingBlockingQueue(int threshold, ThresholdListener listener)
{
@@ -71,65 +55,52 @@ public class FlowControllingBlockingQueu
public FlowControllingBlockingQueue(int highThreshold, int lowThreshold, ThresholdListener listener)
{
+ super();
+
_flowControlHighThreshold = highThreshold;
_flowControlLowThreshold = lowThreshold;
_listener = listener;
+
if (highThreshold == 0)
{
- disableFlowControl = true;
+ _disableFlowControl = true;
}
}
- public Object take() throws InterruptedException
+ public E take() throws InterruptedException
{
- Object o = _queue.poll();
- if(o == null)
- {
- synchronized(this)
- {
- while((o = _queue.poll())==null)
- {
- wait();
- }
- }
- }
- if (!disableFlowControl && _listener != null)
+ E e = super.take();
+
+ if (!_disableFlowControl && _listener != null)
{
synchronized (_listener)
{
- if (_count-- == _flowControlLowThreshold)
+ if (size() == _flowControlLowThreshold)
{
- _listener.underThreshold(_count);
+ _listener.underThreshold(size());
}
}
}
- return o;
+ return e;
}
- public void add(Object o)
+ public boolean add(E e)
{
- synchronized(this)
- {
- _queue.add(o);
-
- notifyAll();
- }
- if (!disableFlowControl && _listener != null)
+ super.add(e);
+
+ if (!_disableFlowControl && _listener != null)
{
synchronized (_listener)
{
- if (++_count == _flowControlHighThreshold)
+ if (size() == _flowControlHighThreshold)
{
- _listener.aboveThreshold(_count);
+ _listener.aboveThreshold(size());
}
}
}
- }
-
- public Iterator iterator()
- {
- return _queue.iterator();
+
+ return true;
}
}
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java Wed Oct 13 15:05:29 2010
@@ -20,17 +20,19 @@
*/
package org.apache.qpid.jms;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import org.apache.qpid.client.SSLConfiguration;
public interface BrokerDetails
{
-
/*
* Known URL Options
* @see ConnectionURL
- */
+ */
+
public static final String OPTIONS_RETRY = "retries";
public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
@@ -54,6 +56,8 @@ public interface BrokerDetails
public static final String SOCKET = "socket";
public static final String TCP = "tcp";
+ public static final String UDP = "udp";
+ public static final String MULTICAST = "multicast";
public static final String VM = "vm";
public static final String DEFAULT_TRANSPORT = TCP;
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java Wed Oct 13 15:05:29 2010
@@ -39,7 +39,7 @@ import junit.framework.TestCase;
* Tests MINA socket performance. This acceptor simply reads data from the network and writes it back again.
*
*/
-public class AcceptorTest extends TestCase
+public class AcceptorTest extends QpidTestCase
{
private static final Logger _logger = Logger.getLogger(AcceptorTest.class);
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/BlockingAcceptorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/BlockingAcceptorTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/BlockingAcceptorTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/BlockingAcceptorTest.java Wed Oct 13 15:05:29 2010
@@ -30,7 +30,7 @@ import java.net.Socket;
import junit.framework.TestCase;
-public class BlockingAcceptorTest extends TestCase
+public class BlockingAcceptorTest extends QpidTestCase
{
private static final Logger _logger = Logger.getLogger(BlockingAcceptorTest.class);
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/WriterTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/WriterTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/WriterTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/WriterTest.java Wed Oct 13 15:05:29 2010
@@ -32,7 +32,7 @@ import java.util.concurrent.CountDownLat
import junit.framework.TestCase;
-public class WriterTest extends TestCase
+public class WriterTest extends QpidTestCase
{
private static final Logger _logger = Logger.getLogger(WriterTest.class);
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java Wed Oct 13 15:05:29 2010
@@ -47,7 +47,7 @@ import org.apache.qpid.jms.Session;
/**
* Test AMQ.
*/
-public class AMQTest extends TestCase implements ExceptionListener
+public class AMQTest extends QpidTestCase implements ExceptionListener
{
private final static String COMPRESSION_PROPNAME = "_MSGAPI_COMP";
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java Wed Oct 13 15:05:29 2010
@@ -34,7 +34,7 @@ import java.io.InputStream;
import junit.framework.TestCase;
-public class PropertiesFileInitialContextFactoryTest extends TestCase
+public class PropertiesFileInitialContextFactoryTest extends QpidTestCase
{
InitialContextFactory contextFactory;
Properties _properties;
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java Wed Oct 13 15:05:29 2010
@@ -21,10 +21,9 @@
package org.apache.qpid.client;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidTestCase;
-import junit.framework.TestCase;
-
-public class AMQQueueTest extends TestCase
+public class AMQQueueTest extends QpidTestCase
{
AMQShortString exchange = new AMQShortString("test.exchange");
AMQShortString routingkey = new AMQShortString("test-route");
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java Wed Oct 13 15:05:29 2010
@@ -29,6 +29,7 @@ import org.apache.qpid.url.URLSyntaxExce
import java.io.IOException;
+// FIXME
public class MockAMQConnection extends AMQConnection
{
public MockAMQConnection(String broker, String username, String password, String clientName, String virtualHost)
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java Wed Oct 13 15:05:29 2010
@@ -23,9 +23,9 @@ package org.apache.qpid.client.message;
import javax.jms.JMSException;
-import junit.framework.TestCase;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class AbstractJMSMessageTest extends TestCase
+public class AbstractJMSMessageTest extends QpidTestCase
{
public void testSetNullJMSReplyTo08() throws JMSException
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java Wed Oct 13 15:05:29 2010
@@ -27,7 +27,6 @@ import org.apache.qpid.framing.AMQMethod
import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.TestNetworkDriver;
import org.apache.qpid.client.MockAMQConnection;
import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.client.state.AMQState;
@@ -73,7 +72,8 @@ public class AMQProtocolHandlerTest exte
{
//Create a new ProtocolHandler with a fake connection.
_handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"));
- _handler.setNetworkDriver(new TestNetworkDriver());
+ // FIXME
+// _handler.setSender(new TestNetworkDriver());
AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1);
_blockFrame = new AMQFrame(0, body);
@@ -93,16 +93,14 @@ public class AMQProtocolHandlerTest exte
*/
public void testFrameListenerUpdateWithAMQException() throws InterruptedException
{
- AMQException trigger = new AMQAuthenticationException(AMQConstant.ACCESS_REFUSED,
- "AMQPHTest", new RuntimeException());
+ AMQException trigger = new AMQAuthenticationException("AMQPHTest", new RuntimeException());
performWithException(trigger);
AMQException receivedException = (AMQException) _listener.getReceivedException();
- assertEquals("Return exception was not the expected type",
- AMQAuthenticationException.class, receivedException.getClass());
+ assertTrue("Return exception was not the expected type", receivedException instanceof AMQException);
assertEquals("The _Listener did not receive the correct error code",
trigger.getErrorCode(), receivedException.getErrorCode());
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java Wed Oct 13 15:05:29 2010
@@ -26,14 +26,13 @@ import java.util.NoSuchElementException;
import javax.jms.JMSException;
-import junit.framework.TestCase;
-
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.TestMessageHelper;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class FieldTableKeyEnumeratorTest extends TestCase
+public class FieldTableKeyEnumeratorTest extends QpidTestCase
{
public void testTrue()
{
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTablePropertyTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTablePropertyTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTablePropertyTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTablePropertyTest.java Wed Oct 13 15:05:29 2010
@@ -24,12 +24,11 @@ import java.util.Enumeration;
import javax.jms.JMSException;
-import junit.framework.TestCase;
-
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.TestMessageHelper;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class FieldTablePropertyTest extends TestCase
+public class FieldTablePropertyTest extends QpidTestCase
{
public void testPropertyNames()
{
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java Wed Oct 13 15:05:29 2010
@@ -20,18 +20,11 @@
*/
package org.apache.qpid.test.unit.client.BrokerDetails;
-import java.util.HashMap;
-import java.util.Map;
-
-import junit.framework.TestCase;
-
import org.apache.qpid.client.AMQBrokerDetails;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.url.URLSyntaxException;
-public class BrokerDetailsTest extends TestCase
+public class BrokerDetailsTest extends QpidTestCase
{
public void testMultiParameters() throws URLSyntaxException
{
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java Wed Oct 13 15:05:29 2010
@@ -20,15 +20,14 @@
*/
package org.apache.qpid.test.unit.client.connectionurl;
-import junit.framework.TestCase;
-
import org.apache.qpid.client.AMQBrokerDetails;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.url.URLSyntaxException;
-public class ConnectionURLTest extends TestCase
+public class ConnectionURLTest extends QpidTestCase
{
public void testFailoverURL() throws URLSyntaxException
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java Wed Oct 13 15:05:29 2010
@@ -20,16 +20,15 @@
*/
package org.apache.qpid.test.unit.client.destinationurl;
-import junit.framework.TestCase;
+import java.net.URISyntaxException;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.url.AMQBindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URISyntaxException;
-
-public class DestinationURLTest extends TestCase
+public class DestinationURLTest extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(DestinationURLTest.class);
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java Wed Oct 13 15:05:29 2010
@@ -27,12 +27,11 @@ import javax.jms.MessageFormatException;
import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
-import junit.framework.TestCase;
-
import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.message.TestMessageHelper;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class BytesMessageTest extends TestCase
+public class BytesMessageTest extends QpidTestCase
{
/**
* Tests that on creation a call to getBodyLength() throws an exception
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java Wed Oct 13 15:05:29 2010
@@ -24,13 +24,13 @@ import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import junit.framework.Assert;
-import junit.framework.TestCase;
import org.apache.qpid.client.message.JMSMapMessage;
import org.apache.qpid.client.message.TestMessageHelper;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class MapMessageTest extends TestCase
+public class MapMessageTest extends QpidTestCase
{
//Test Lookups
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java Wed Oct 13 15:05:29 2010
@@ -29,15 +29,14 @@ import javax.jms.MessageNotReadableExcep
import javax.jms.MessageNotWriteableException;
import javax.jms.StreamMessage;
-import junit.framework.TestCase;
-
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.TestMessageHelper;
+import org.apache.qpid.test.utils.QpidTestCase;
/**
* @author Apache Software Foundation
*/
-public class StreamMessageTest extends TestCase
+public class StreamMessageTest extends QpidTestCase
{
/**
* Tests that on creation a call to getBodyLength() throws an exception
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java Wed Oct 13 15:05:29 2010
@@ -23,13 +23,13 @@ package org.apache.qpid.test.unit.client
import javax.jms.JMSException;
import junit.framework.Assert;
-import junit.framework.TestCase;
import org.apache.qpid.client.message.JMSMapMessage;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.TestMessageHelper;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class TextMessageTest extends TestCase
+public class TextMessageTest extends QpidTestCase
{
public void testTextOnConstruction() throws Exception
{
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java Wed Oct 13 15:05:29 2010
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.test.unit.jndi;
-import junit.framework.TestCase;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.url.URLSyntaxException;
-public class ConnectionFactoryTest extends TestCase
+public class ConnectionFactoryTest extends QpidTestCase
{
//URL will be returned with the password field swapped for '********'
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java Wed Oct 13 15:05:29 2010
@@ -29,10 +29,9 @@ import javax.naming.InitialContext;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidTestCase;
-import junit.framework.TestCase;
-
-public class JNDIPropertyFileTest extends TestCase
+public class JNDIPropertyFileTest extends QpidTestCase
{
Context ctx;
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java Wed Oct 13 15:05:29 2010
@@ -20,23 +20,24 @@
*/
package org.apache.qpid.test.unit.message;
-import javax.jms.*;
-
-import junit.framework.TestCase;
-
-import org.apache.qpid.client.*;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.*;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.JMSMapMessage;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-
-import java.util.Map;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class MessageConverterTest extends TestCase
+public class MessageConverterTest extends QpidTestCase
{
public static final String JMS_CORR_ID = "QPIDID_01";
Added: qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java?rev=1022127&view=auto
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java (added)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java Wed Oct 13 15:05:29 2010
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.util.NewThreadExecutor;
+
+/**
+ * Extension of {@link SocketConnector} using an existing open socket.
+ */
+public class ExistingSocketConnector extends SocketConnector
+{
+ private static final Map<String, Socket> OPEN_SOCKET_REGISTER = new ConcurrentHashMap<String, Socket>();
+
+ private static final AtomicInteger nextId = new AtomicInteger();
+ private final int id = nextId.getAndIncrement();
+ private final SocketIoProcessor[] ioProcessors;
+ private final int processorCount;
+ private int processorDistributor = 0;
+
+ private Socket _openSocket = null;
+
+ public static void registerOpenSocket(String socketID, Socket openSocket)
+ {
+ OPEN_SOCKET_REGISTER.put(socketID, openSocket);
+ }
+
+ public static Socket removeOpenSocket(String socketID)
+ {
+ return OPEN_SOCKET_REGISTER.remove(socketID);
+ }
+
+ public void setOpenSocket(Socket openSocket)
+ {
+ _openSocket = openSocket;
+ }
+
+ /**
+ * Create a connector with a single processing thread using a NewThreadExecutor
+ */
+ public ExistingSocketConnector()
+ {
+ this(1, new NewThreadExecutor());
+ }
+
+ /**
+ * Create a connector with the desired number of processing threads
+ *
+ * @param processorCount Number of processing threads
+ * @param executor Executor to use for launching threads
+ */
+ public ExistingSocketConnector(int processorCount, Executor executor) {
+ if (processorCount < 1)
+ {
+ throw new IllegalArgumentException("Must have at least one processor");
+ }
+
+ this.processorCount = processorCount;
+ ioProcessors = new SocketIoProcessor[processorCount];
+
+ for (int i = 0; i < processorCount; i++)
+ {
+ ioProcessors[i] = new SocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor);
+ }
+ }
+
+ /**
+ * Changes here from the Mina OpenSocketConnector.
+ *
+ * Ignoring all address as they are not needed.
+ */
+ public ConnectFuture connect(SocketAddress address, SocketAddress localAddress, IoHandler handler, IoServiceConfig config)
+ {
+ if (handler == null)
+ {
+ throw new NullPointerException("handler");
+ }
+ if (config == null)
+ {
+ config = getDefaultConfig();
+ }
+ if (_openSocket == null)
+ {
+ throw new IllegalArgumentException("Specifed Socket not active");
+ }
+
+ boolean success = false;
+
+ try
+ {
+ DefaultConnectFuture future = new DefaultConnectFuture();
+ newSession(_openSocket.getChannel(), handler, config, future);
+ success = true;
+ return future;
+ }
+ catch (IOException e)
+ {
+ return DefaultConnectFuture.newFailedFuture(e);
+ }
+ finally
+ {
+ if (!success && _openSocket != null)
+ {
+ try
+ {
+ _openSocket.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+ }
+
+ private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
+ throws IOException
+ {
+ SocketSessionImpl session = new SocketSessionImpl(this,
+ nextProcessor(), getListeners(), config, ch, handler,
+ ch.socket().getRemoteSocketAddress());
+ try
+ {
+ getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ config.getThreadModel().buildFilterChain(session.getFilterChain());
+ }
+ catch (Throwable e)
+ {
+ throw (IOException) new IOException("Failed to create a session.").initCause(e);
+ }
+
+ // Set the ConnectFuture of the specified session, which will be
+ // removed and notified by AbstractIoFilterChain eventually.
+ session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, connectFuture);
+
+ // Forward the remaining process to the SocketIoProcessor.
+ session.getIoProcessor().addNew(session);
+ }
+
+ private SocketIoProcessor nextProcessor()
+ {
+ if (processorDistributor == Integer.MAX_VALUE)
+ {
+ processorDistributor = Integer.MAX_VALUE % processorCount;
+ }
+
+ return ioProcessors[processorDistributor++ % processorCount];
+ }
+}
\ No newline at end of file
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java Wed Oct 13 15:05:29 2010
@@ -32,6 +32,11 @@ package org.apache.qpid;
*/
public class AMQDisconnectedException extends AMQException
{
+ public AMQDisconnectedException(String msg)
+ {
+ super(null, msg);
+ }
+
public AMQDisconnectedException(String msg, Throwable cause)
{
super(null, msg, cause);
Added: qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/BrokerOptions.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/BrokerOptions.java?rev=1022127&view=auto
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/BrokerOptions.java (added)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/BrokerOptions.java Wed Oct 13 15:05:29 2010
@@ -0,0 +1,230 @@
+package org.apache.qpid;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+public class BrokerOptions extends HashMap<String, List<String>>
+{
+ /** serialVersionUID */
+ private static final long serialVersionUID = 8051825964945442234L;
+
+ public static final Integer DEFAULT_PORT = 5672;
+ public static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
+ public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
+ public static final String QPID_HOME = "QPID_HOME";
+
+ public static final String PORTS = "p";
+ public static final String EXCLUDE_0_10 = "exclude-0-10";
+ public static final String EXCLUDE_0_9_1 = "exclude-0-9-1";
+ public static final String EXCLUDE_0_9 = "exclude-0-9";
+ public static final String EXCLUDE_0_8 = "exclude-0-8";
+ public static final String BIND = "b";
+ public static final String MANAGEMENT = "m";
+ public static final String LOG4J = "l";
+ public static final String WATCH = "w";
+ public static final String CONFIG = "c";
+ public static final String PROTOCOL = "t";
+
+ public static final String[] COMMAND_LINE_OPTIONS = new String[] {
+ PORTS, EXCLUDE_0_10, EXCLUDE_0_9_1, EXCLUDE_0_9, EXCLUDE_0_8,
+ BIND, MANAGEMENT, LOG4J, WATCH, CONFIG, PROTOCOL,
+ };
+
+ public void setPorts(Integer...ports)
+ {
+ put(PORTS, ports);
+ }
+
+ public List<Integer> getPorts()
+ {
+ return getList(PORTS);
+ }
+
+ public void setExclude_0_10Ports(Integer...ports)
+ {
+ put(EXCLUDE_0_10, ports);
+ }
+
+ public List<Integer> getExclude_0_10Ports()
+ {
+ return getList(EXCLUDE_0_10);
+ }
+
+ public void setExclude_0_9_1Ports(Integer...ports)
+ {
+ put(EXCLUDE_0_9_1, ports);
+ }
+
+ public List<Integer> getExclude_0_9_1Ports()
+ {
+ return getList(EXCLUDE_0_9_1);
+ }
+
+ public void setExclude_0_9Ports(Integer...ports)
+ {
+ put(EXCLUDE_0_9, ports);
+ }
+
+ public List<Integer> getExclude_0_9Ports()
+ {
+ return getList(EXCLUDE_0_9);
+ }
+
+ public void setExclude_0_8Ports(Integer...ports)
+ {
+ put(EXCLUDE_0_8, ports);
+ }
+
+ public List<Integer> getExclude_0_8Ports()
+ {
+ return getList(EXCLUDE_0_8);
+ }
+
+ public void setManagementPort(Integer management)
+ {
+ put(MANAGEMENT, Integer.toString(management));
+ }
+
+ public Integer getManagementPort()
+ {
+ return getInteger(MANAGEMENT);
+ }
+
+ public void setBind(String bind)
+ {
+ put(BIND, bind);
+ }
+
+ public String getBind()
+ {
+ return getValue(BIND);
+ }
+
+ public void setLog4JFile(String log4j)
+ {
+ put(LOG4J, log4j);
+ }
+
+ public String getLog4JFile()
+ {
+ return getValue(LOG4J);
+ }
+
+ public void setLog4JWatch(Integer watch)
+ {
+ put(WATCH, Integer.toString(watch));
+ }
+
+ public Integer getLog4JWatch()
+ {
+ return getInteger(WATCH);
+ }
+
+ public void setConfigFile(String config)
+ {
+ put(CONFIG, config);
+ }
+
+ public String getConfigFile()
+ {
+ return getValue(CONFIG);
+ }
+
+ public void setProtocol(String protocol)
+ {
+ put(PROTOCOL, protocol);
+ }
+
+ public String getProtocol()
+ {
+ return getValue(PROTOCOL);
+ }
+
+ public void put(String key, String value)
+ {
+ if (value != null)
+ {
+ put(key, Collections.singletonList(value));
+ }
+ }
+
+ public void put(String key, String...values)
+ {
+ if (values != null)
+ {
+ put(key, Arrays.asList(values));
+ }
+ }
+
+ public void put(String key, Integer...values)
+ {
+ List<String> list = new ArrayList<String>();
+ for (Integer i : values)
+ {
+ list.add(Integer.toString(i));
+ }
+ put(key, list);
+ }
+
+ public Integer getInteger(Object key)
+ {
+ return getInteger(key, null);
+ }
+
+ public Integer getInteger(Object key, Integer defaultValue)
+ {
+ if (!containsKey(key))
+ {
+ return defaultValue;
+ }
+ List<String> values = get(key);
+ return Integer.valueOf(values.get(0));
+ }
+
+ public List<Integer> getList(Object key)
+ {
+ return getList(key, null);
+ }
+
+ public List<Integer> getList(Object key, List<Integer> defaultValues)
+ {
+ if (!containsKey(key))
+ {
+ return defaultValues;
+ }
+ List<String> list = get(key);
+ List<Integer> values = new ArrayList<Integer>();
+ for (String s : list)
+ {
+ values.add(Integer.valueOf(s));
+ }
+ return values;
+ }
+
+ public String getValue(Object key)
+ {
+ return getValue(key, null);
+ }
+
+ public String getValue(Object key, String defaultValue)
+ {
+ if (!containsKey(key))
+ {
+ return defaultValue;
+ }
+ List<String> values = get(key);
+ return values.get(0);
+ }
+
+ public List<String> get(Object key, List<String> defaultValues)
+ {
+ if (!containsKey(key))
+ {
+ return defaultValues;
+ }
+ return get(key);
+ }
+}
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/common/Closeable.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/common/Closeable.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/common/Closeable.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/common/Closeable.java Wed Oct 13 15:05:29 2010
@@ -20,7 +20,11 @@
*/
package org.apache.qpid.common;
-
+/**
+ * Interface indicating an object can be closed.
+ *
+ * Used as a marker for various components of the broker application registry, to allow clean shutdown.
+ */
public interface Closeable
{
public void close();
Added: qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/protocol/ReceiverFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/protocol/ReceiverFactory.java?rev=1022127&view=auto
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/protocol/ReceiverFactory.java (added)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/protocol/ReceiverFactory.java Wed Oct 13 15:05:29 2010
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.NetworkTransport;
+
+public interface ReceiverFactory
+{
+ /**
+ * Returns a new instance of a {@link Receiver}.
+ */
+ Receiver<ByteBuffer> newReceiver(NetworkTransport transport, NetworkConnection network);
+}
\ No newline at end of file
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java Wed Oct 13 15:05:29 2010
@@ -20,49 +20,55 @@ package org.apache.qpid.thread;
*
*/
-
import java.lang.reflect.Constructor;
+import java.util.concurrent.ThreadFactory;
public class RealtimeThreadFactory implements ThreadFactory
{
- private Class threadClass;
- private Constructor threadConstructor;
- private Constructor priorityParameterConstructor;
- private int defaultRTThreadPriority = 20;
+ private Class<?> _threadClass;
+ private Constructor<?> _threadConstructor;
+ private Constructor<?> _priorityParameterConstructor;
+ private int _defaultRTThreadPriority = 20;
public RealtimeThreadFactory() throws Exception
{
- defaultRTThreadPriority = Integer.getInteger("qpid.rt_thread_priority",20);
- threadClass = Class.forName("javax.realtime.RealtimeThread");
+ _defaultRTThreadPriority = Integer.getInteger("qpid.rt_thread_priority", 20);
+ _threadClass = Class.forName("javax.realtime.RealtimeThread");
- Class schedulingParametersClass = Class.forName("javax.realtime.SchedulingParameters");
- Class releaseParametersClass = Class.forName("javax.realtime.ReleaseParameters");
- Class memoryParametersClass = Class.forName("javax.realtime.MemoryParameters");
- Class memoryAreaClass = Class.forName("javax.realtime.MemoryArea");
- Class processingGroupParametersClass = Class.forName("javax.realtime.ProcessingGroupParameters");
+ Class<?> schedulingParametersClass = Class.forName("javax.realtime.SchedulingParameters");
+ Class<?> releaseParametersClass = Class.forName("javax.realtime.ReleaseParameters");
+ Class<?> memoryParametersClass = Class.forName("javax.realtime.MemoryParameters");
+ Class<?> memoryAreaClass = Class.forName("javax.realtime.MemoryArea");
+ Class<?> processingGroupParametersClass = Class.forName("javax.realtime.ProcessingGroupParameters");
- Class[] paramTypes = new Class[]{schedulingParametersClass,
- releaseParametersClass,
- memoryParametersClass,
- memoryAreaClass,
- processingGroupParametersClass,
- java.lang.Runnable.class};
+ Class<?>[] paramTypes = new Class[] { schedulingParametersClass,
+ releaseParametersClass,
+ memoryParametersClass,
+ memoryAreaClass,
+ processingGroupParametersClass,
+ java.lang.Runnable.class };
- threadConstructor = threadClass.getConstructor(paramTypes);
+ _threadConstructor = _threadClass.getConstructor(paramTypes);
- Class priorityParameterClass = Class.forName("javax.realtime.PriorityParameters");
- priorityParameterConstructor = priorityParameterClass.getConstructor(new Class[]{int.class});
+ Class<?> priorityParameterClass = Class.forName("javax.realtime.PriorityParameters");
+ _priorityParameterConstructor = priorityParameterClass.getConstructor(new Class<?>[] { Integer.TYPE });
}
- public Thread createThread(Runnable r) throws Exception
+ public Thread newThread(Runnable r)
{
- return createThread(r,defaultRTThreadPriority);
+ return createThread(r,_defaultRTThreadPriority);
}
- public Thread createThread(Runnable r, int priority) throws Exception
+ public Thread createThread(Runnable r, int priority)
{
- Object priorityParams = priorityParameterConstructor.newInstance(priority);
- return (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r);
+ try
+ {
+ Object priorityParams = _priorityParameterConstructor.newInstance(priority);
+ return (Thread) _threadConstructor.newInstance(priorityParams, null, null, null, null, r);
+ }
+ catch (Exception e)
+ {
+ return null;
+ }
}
-
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org