You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC
svn commit: r1187150 [32/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Fri Oct 21 01:19:00 2011
@@ -27,7 +27,6 @@ import java.util.Map;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQDestination.Binding;
-import org.apache.qpid.client.messaging.address.Link.Reliability;
import org.apache.qpid.client.messaging.address.Link.Subscription;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
@@ -55,7 +54,7 @@ public class AddressHelper
public static final String EXCLUSIVE = "exclusive";
public static final String AUTO_DELETE = "auto-delete";
public static final String TYPE = "type";
- public static final String ALT_EXCHANGE = "alternate-exchange";
+ public static final String ALT_EXCHANGE = "alt-exchange";
public static final String BINDINGS = "bindings";
public static final String BROWSE = "browse";
public static final String MODE = "mode";
@@ -232,9 +231,14 @@ public class AddressHelper
private boolean getDurability(Map map)
{
- Accessor access = new MapAccessor(map);
- Boolean result = access.getBoolean(DURABLE);
- return (result == null) ? false : result.booleanValue();
+ if (map != null && map.get(DURABLE) != null)
+ {
+ return Boolean.parseBoolean((String)map.get(DURABLE));
+ }
+ else
+ {
+ return false;
+ }
}
/**
@@ -258,7 +262,7 @@ public class AddressHelper
}
}
- public Link getLink() throws Exception
+ public Link getLink()
{
Link link = new Link();
link.setSubscription(new Subscription());
@@ -268,25 +272,6 @@ public class AddressHelper
: linkProps.getBoolean(DURABLE));
link.setName(linkProps.getString(NAME));
- String reliability = linkProps.getString(RELIABILITY);
- if ( reliability != null)
- {
- if (reliability.equalsIgnoreCase("unreliable"))
- {
- link.setReliability(Reliability.UNRELIABLE);
- }
- else if (reliability.equalsIgnoreCase("at-least-once"))
- {
- link.setReliability(Reliability.AT_LEAST_ONCE);
- }
- else
- {
- throw new Exception("The reliability mode '" +
- reliability + "' is not yet supported");
- }
-
- }
-
if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
{
MapAccessor capacityProps = new MapAccessor(
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java Fri Oct 21 01:19:00 2011
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.client.messaging.address;
-import static org.apache.qpid.client.messaging.address.Link.Reliability.UNSPECIFIED;
-
import java.util.HashMap;
import java.util.Map;
@@ -31,8 +29,6 @@ public class Link
{
public enum FilterType { SQL92, XQUERY, SUBJECT }
- public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED }
-
protected String name;
protected String _filter;
protected FilterType _filterType = FilterType.SUBJECT;
@@ -42,18 +38,7 @@ public class Link
protected int _producerCapacity = 0;
protected Node node;
protected Subscription subscription;
- protected Reliability reliability = UNSPECIFIED;
- public Reliability getReliability()
- {
- return reliability;
- }
-
- public void setReliability(Reliability reliability)
- {
- this.reliability = reliability;
- }
-
public Node getNode()
{
return node;
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.client.protocol;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -30,8 +28,10 @@ import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -46,7 +46,6 @@ import org.apache.qpid.client.state.AMQS
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
@@ -58,13 +57,16 @@ import org.apache.qpid.framing.Heartbeat
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -162,22 +164,20 @@ public class AMQProtocolHandler implemen
private FailoverException _lastFailoverException;
/** Defines the default timeout to use for synchronous protocol commands. */
- private final long DEFAULT_SYNC_TIMEOUT = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
- Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
- ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
+ private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30);
/** Object to lock on when changing the latch */
private Object _failoverLatchChange = new Object();
private AMQCodecFactory _codecFactory;
-
+ private Job _readJob;
+ private Job _writeJob;
+ private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+ private NetworkDriver _networkDriver;
private ProtocolVersion _suggestedProtocolVersion;
private long _writtenBytes;
private long _readBytes;
- private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
-
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -189,10 +189,43 @@ public class AMQProtocolHandler implemen
_protocolSession = new AMQProtocolSession(this, _connection);
_stateManager = new AMQStateManager(_protocolSession);
_codecFactory = new AMQCodecFactory(false, _protocolSession);
+ _poolReference.setThreadFactory(new ThreadFactory()
+ {
+
+ public Thread newThread(final Runnable runnable)
+ {
+ try
+ {
+ return Threading.getThreadFactory().createThread(runnable);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to create thread", e);
+ }
+ }
+ });
+ _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
+ _poolReference.acquireExecutorService();
_failoverHandler = new FailoverHandler(this);
}
/**
+ * Called when we want to create a new IoTransport session
+ * @param brokerDetail
+ */
+ public void createIoTransportSession(BrokerDetails brokerDetail)
+ {
+ _protocolSession = new AMQProtocolSession(this, _connection);
+ _stateManager.setProtocolSession(_protocolSession);
+ IoTransport.connect_0_9(getProtocolSession(),
+ brokerDetail.getHost(),
+ brokerDetail.getPort(),
+ brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL));
+ _protocolSession.init();
+ }
+
+ /**
* Called when the network connection is closed. This can happen, either because the client explicitly requested
* that the connection be closed, in which case nothing is done, or because the connection died. In the case
* where the connection died, an attempt to failover automatically to a new connection may be started. The failover
@@ -282,7 +315,7 @@ public class AMQProtocolHandler implemen
// failover:
HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
- _network.close();
+ _networkDriver.close();
}
public void writerIdle()
@@ -304,12 +337,22 @@ public class AMQProtocolHandler implemen
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
// this will attempt failover
- _network.close();
+ _networkDriver.close();
closed();
}
else
{
+
+ if (cause instanceof ProtocolCodecException)
+ {
+ _logger.info("Protocol Exception caught NOT going to attempt failover as " +
+ "cause isn't AMQConnectionClosedException: " + cause, cause);
+
+ AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
+ propagateExceptionToAllWaiters(amqe);
+ }
_connection.exceptionReceived(cause);
+
}
// FIXME Need to correctly handle other exceptions. Things like ...
@@ -403,63 +446,76 @@ public class AMQProtocolHandler implemen
public void received(ByteBuffer msg)
{
- _readBytes += msg.remaining();
try
{
+ _readBytes += msg.remaining();
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- // Decode buffer
-
- for (AMQDataBlock message : dataBlocks)
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
{
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
- }
+ public void run()
+ {
+ // Decode buffer
- if(message instanceof AMQFrame)
+ for (AMQDataBlock message : dataBlocks)
{
- final boolean debug = _logger.isDebugEnabled();
- final long msgNumber = ++_messageReceivedCount;
- if (debug && ((msgNumber % 1000) == 0))
+ try
{
- _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
+ }
+
+ if(message instanceof AMQFrame)
+ {
+ final boolean debug = _logger.isDebugEnabled();
+ final long msgNumber = ++_messageReceivedCount;
+
+ if (debug && ((msgNumber % 1000) == 0))
+ {
+ _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+ }
+
+ AMQFrame frame = (AMQFrame) message;
+
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+
+ bodyFrame.handle(frame.getChannel(), _protocolSession);
+
+ _connection.bytesReceived(_readBytes);
+ }
+ else if (message instanceof ProtocolInitiation)
+ {
+ // We get here if the server sends a response to our initial protocol header
+ // suggesting an alternate ProtocolVersion; the server will then close the
+ // connection.
+ ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+ _suggestedProtocolVersion = protocolInit.checkVersion();
+ _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
+
+ // get round a bug in old versions of qpid whereby the connection is not closed
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Exception processing frame", e);
+ propagateExceptionToFrameListeners(e);
+ exception(e);
}
-
- AMQFrame frame = (AMQFrame) message;
-
- final AMQBody bodyFrame = frame.getBodyFrame();
-
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
- bodyFrame.handle(frame.getChannel(), _protocolSession);
-
- _connection.bytesReceived(_readBytes);
- }
- else if (message instanceof ProtocolInitiation)
- {
- // We get here if the server sends a response to our initial protocol header
- // suggesting an alternate ProtocolVersion; the server will then close the
- // connection.
- ProtocolInitiation protocolInit = (ProtocolInitiation) message;
- _suggestedProtocolVersion = protocolInit.checkVersion();
- _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
-
- // get round a bug in old versions of qpid whereby the connection is not closed
- _stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
}
+ });
}
catch (Exception e)
{
- _logger.error("Exception processing frame", e);
propagateExceptionToFrameListeners(e);
exception(e);
}
-
-
}
public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
@@ -514,13 +570,28 @@ public class AMQProtocolHandler implemen
return getStateManager().createWaiter(states);
}
- public synchronized void writeFrame(AMQDataBlock frame)
+ /**
+ * Convenience method that writes a frame to the protocol session. Equivalent to calling
+ * getProtocolSession().write().
+ *
+ * @param frame the frame to write
+ */
+ public void writeFrame(AMQDataBlock frame)
{
- final ByteBuffer buf = asByteBuffer(frame);
- _writtenBytes += buf.remaining();
- _sender.send(buf);
- _sender.flush();
+ writeFrame(frame, false);
+ }
+ public void writeFrame(AMQDataBlock frame, boolean wait)
+ {
+ final ByteBuffer buf = frame.toNioByteBuffer();
+ _writtenBytes += buf.remaining();
+ Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
+ {
+ public void run()
+ {
+ _networkDriver.send(buf);
+ }
+ });
if (PROTOCOL_DEBUG)
{
_protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
@@ -537,41 +608,12 @@ public class AMQProtocolHandler implemen
_connection.bytesSent(_writtenBytes);
- }
-
- private ByteBuffer asByteBuffer(AMQDataBlock block)
- {
- final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
-
- try
+ if (wait)
{
- block.writePayload(new DataOutputStream(new OutputStream()
- {
-
-
- @Override
- public void write(int b) throws IOException
- {
- buf.put((byte) b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException
- {
- buf.put(b, off, len);
- }
- }));
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
+ _networkDriver.flush();
}
-
- buf.flip();
- return buf;
}
-
/**
* Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
* calling getProtocolSession().write() then waiting for the response.
@@ -665,23 +707,24 @@ public class AMQProtocolHandler implemen
* <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed
* anyway.
*
- * @param timeout The timeout to wait for an acknowledgment to the close request.
+ * @param timeout The timeout to wait for an acknowledgement to the close request.
*
* @throws AMQException If the close fails for any reason.
*/
public void closeConnection(long timeout) throws AMQException
{
+ ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client is closing the connection."), 0, 0);
+
+ final AMQFrame frame = body.generateFrame(0);
+
+ //If the connection is already closed then don't do a syncWrite
if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
{
- // Connection is already closed then don't do a syncWrite
try
{
- final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection."), 0, 0);
- final AMQFrame frame = body.generateFrame(0);
-
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _network.close();
+ _networkDriver.close();
closed();
}
catch (AMQTimeoutException e)
@@ -690,9 +733,10 @@ public class AMQProtocolHandler implemen
}
catch (FailoverException e)
{
- _logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway.");
+ _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
}
}
+ _poolReference.releaseExecutorService();
}
/** @return the number of bytes read from this protocol session */
@@ -800,23 +844,17 @@ public class AMQProtocolHandler implemen
public SocketAddress getRemoteAddress()
{
- return _network.getRemoteAddress();
+ return _networkDriver.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _network.getLocalAddress();
- }
-
- public void setNetworkConnection(NetworkConnection network)
- {
- setNetworkConnection(network, network.getSender());
+ return _networkDriver.getLocalAddress();
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkDriver(NetworkDriver driver)
{
- _network = network;
- _sender = sender;
+ _networkDriver = driver;
}
/** @param delay delay in seconds (not ms) */
@@ -824,15 +862,15 @@ public class AMQProtocolHandler implemen
{
if (delay > 0)
{
- _network.setMaxWriteIdle(delay);
- _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+ getNetworkDriver().setMaxWriteIdle(delay);
+ getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
- public NetworkConnection getNetworkConnection()
+ public NetworkDriver getNetworkDriver()
{
- return _network;
+ return _networkDriver;
}
public ProtocolVersion getSuggestedProtocolVersion()
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Oct 21 01:19:00 2011
@@ -20,36 +20,27 @@
*/
package org.apache.qpid.client.protocol;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.security.sasl.SaslClient;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.ConnectionTuneParameters;
-import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.TransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
/**
* Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
@@ -157,6 +148,16 @@ public class AMQProtocolSession implemen
return getAMQConnection().getVirtualHost();
}
+ public String getUsername()
+ {
+ return getAMQConnection().getUsername();
+ }
+
+ public String getPassword()
+ {
+ return getAMQConnection().getPassword();
+ }
+
public SaslClient getSaslClient()
{
return _saslClient;
@@ -298,11 +299,22 @@ public class AMQProtocolSession implemen
return _connection.getSession(channelId);
}
+ /**
+ * Convenience method that writes a frame to the protocol session. Equivalent to calling
+ * getProtocolSession().write().
+ *
+ * @param frame the frame to write
+ */
public void writeFrame(AMQDataBlock frame)
{
_protocolHandler.writeFrame(frame);
}
+ public void writeFrame(AMQDataBlock frame, boolean wait)
+ {
+ _protocolHandler.writeFrame(frame, wait);
+ }
+
/**
* Starts the process of closing a session
*
@@ -363,15 +375,7 @@ public class AMQProtocolSession implemen
public void closeProtocolSession() throws AMQException
{
- try
- {
- _protocolHandler.getNetworkConnection().close();
- }
- catch(TransportException e)
- {
- //ignore such exceptions, they were already logged
- //and this is a forcible close.
- }
+ _protocolHandler.closeConnection(0);
}
public void failover(String host, int port)
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/AMQCallbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/AMQCallbackHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/AMQCallbackHandler.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/AMQCallbackHandler.java Fri Oct 21 01:19:00 2011
@@ -22,9 +22,9 @@ package org.apache.qpid.client.security;
import javax.security.auth.callback.CallbackHandler;
-import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
public interface AMQCallbackHandler extends CallbackHandler
{
- void initialise(ConnectionURL connectionURL);
+ void initialise(AMQProtocolSession protocolSession);
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java Fri Oct 21 01:19:00 2011
@@ -20,22 +20,17 @@
*/
package org.apache.qpid.client.security;
+import org.apache.qpid.util.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.io.InputStream;
-import java.util.Collection;
-import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
-import java.util.StringTokenizer;
-import java.util.TreeMap;
-
-import org.apache.qpid.util.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* CallbackHandlerRegistry is a registry for call back handlers for user authentication and interaction during user
@@ -47,7 +42,7 @@ import org.slf4j.LoggerFactory;
* "amp.callbackhandler.properties". The format of the properties file is:
*
* <p/><pre>
- * CallbackHanlder.n.mechanism=fully.qualified.class.name where n is an ordinal
+ * CallbackHanlder.mechanism=fully.qualified.class.name
* </pre>
*
* <p/>Where mechanism is an IANA-registered mechanism name and the fully qualified class name refers to a
@@ -71,15 +66,51 @@ public class CallbackHandlerRegistry
public static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/client/security/CallbackHandlerRegistry.properties";
/** A static reference to the singleton instance of this registry. */
- private static final CallbackHandlerRegistry _instance;
+ private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry();
/** Holds a map from SASL mechanism names to call back handlers. */
- private Map<String, Class<AMQCallbackHandler>> _mechanismToHandlerClassMap = new HashMap<String, Class<AMQCallbackHandler>>();
+ private Map<String, Class> _mechanismToHandlerClassMap = new HashMap<String, Class>();
+
+ /** Holds a space delimited list of mechanisms that callback handlers exist for. */
+ private String _mechanisms;
+
+ /**
+ * Gets the singleton instance of this registry.
+ *
+ * @return The singleton instance of this registry.
+ */
+ public static CallbackHandlerRegistry getInstance()
+ {
+ return _instance;
+ }
- /** Ordered collection of mechanisms for which callback handlers exist. */
- private Collection<String> _mechanisms;
+ /**
+ * Gets the callback handler class for a given SASL mechanism name.
+ *
+ * @param mechanism The SASL mechanism name.
+ *
+ * @return The callback handler class for the mechanism, or null if none is configured for that mechanism.
+ */
+ public Class getCallbackHandlerClass(String mechanism)
+ {
+ return (Class) _mechanismToHandlerClassMap.get(mechanism);
+ }
- static
+ /**
+ * Gets a space delimited list of supported SASL mechanisms.
+ *
+ * @return A space delimited list of supported SASL mechanisms.
+ */
+ public String getMechanisms()
+ {
+ return _mechanisms;
+ }
+
+ /**
+ * Creates the call back handler registry from its configuration resource or file. This also has the side effect
+ * of configuring and registering the SASL client factory implementations using {@link DynamicSaslRegistrar}.
+ */
+ private CallbackHandlerRegistry()
{
// Register any configured SASL client factories.
DynamicSaslRegistrar.registerSaslProviders();
@@ -89,12 +120,12 @@ public class CallbackHandlerRegistry
FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME,
CallbackHandlerRegistry.class.getClassLoader());
- final Properties props = new Properties();
-
try
{
-
+ Properties props = new Properties();
props.load(is);
+ parseProperties(props);
+ _logger.info("Callback handlers available for SASL mechanisms: " + _mechanisms);
}
catch (IOException e)
{
@@ -115,68 +146,32 @@ public class CallbackHandlerRegistry
}
}
}
-
- _instance = new CallbackHandlerRegistry(props);
- _logger.info("Callback handlers available for SASL mechanisms: " + _instance._mechanisms);
-
}
- /**
- * Gets the singleton instance of this registry.
- *
- * @return The singleton instance of this registry.
- */
- public static CallbackHandlerRegistry getInstance()
- {
- return _instance;
- }
-
- public AMQCallbackHandler createCallbackHandler(final String mechanism)
+ /*private InputStream openPropertiesInputStream(String filename)
{
- final Class<AMQCallbackHandler> mechanismClass = _mechanismToHandlerClassMap.get(mechanism);
-
- if (mechanismClass == null)
+ boolean useDefault = true;
+ InputStream is = null;
+ if (filename != null)
{
- throw new IllegalArgumentException("Mechanism " + mechanism + " not known");
+ try
+ {
+ is = new BufferedInputStream(new FileInputStream(new File(filename)));
+ useDefault = false;
+ }
+ catch (FileNotFoundException e)
+ {
+ _logger.error("Unable to read from file " + filename + ": " + e, e);
+ }
}
- try
- {
- return mechanismClass.newInstance();
- }
- catch (InstantiationException e)
- {
- throw new IllegalArgumentException("Unable to create an instance of mechanism " + mechanism, e);
- }
- catch (IllegalAccessException e)
+ if (useDefault)
{
- throw new IllegalArgumentException("Unable to create an instance of mechanism " + mechanism, e);
+ is = CallbackHandlerRegistry.class.getResourceAsStream(DEFAULT_RESOURCE_NAME);
}
- }
- /**
- * Gets collections of supported SASL mechanism names, ordered by preference
- *
- * @return collection of SASL mechanism names.
- */
- public Collection<String> getMechanisms()
- {
- return Collections.unmodifiableCollection(_mechanisms);
- }
-
- /**
- * Creates the call back handler registry from its configuration resource or file.
- *
- * This also has the side effect of configuring and registering the SASL client factory
- * implementations using {@link DynamicSaslRegistrar}.
- *
- * This constructor is default protection to allow for effective unit testing. Clients must use
- * {@link #getInstance()} to obtain the singleton instance.
- */
- CallbackHandlerRegistry(final Properties props)
- {
- parseProperties(props);
- }
+ return is;
+ }*/
/**
* Scans the specified properties as a mapping from IANA registered SASL mechanism to call back handler
@@ -188,20 +183,20 @@ public class CallbackHandlerRegistry
*/
private void parseProperties(Properties props)
{
-
- final Map<Integer, String> mechanisms = new TreeMap<Integer, String>();
-
Enumeration e = props.propertyNames();
while (e.hasMoreElements())
{
- final String propertyName = (String) e.nextElement();
- final String[] parts = propertyName.split("\\.", 2);
+ String propertyName = (String) e.nextElement();
+ int period = propertyName.indexOf(".");
+ if (period < 0)
+ {
+ _logger.warn("Unable to parse property " + propertyName + " when configuring SASL providers");
- checkPropertyNameFormat(propertyName, parts);
+ continue;
+ }
- final String mechanism = parts[0];
- final int ordinal = getPropertyOrdinal(propertyName, parts);
- final String className = props.getProperty(propertyName);
+ String mechanism = propertyName.substring(period + 1);
+ String className = props.getProperty(propertyName);
Class clazz = null;
try
{
@@ -210,11 +205,20 @@ public class CallbackHandlerRegistry
{
_logger.warn("SASL provider " + clazz + " does not implement " + AMQCallbackHandler.class
+ ". Skipping");
+
continue;
}
- _mechanismToHandlerClassMap.put(mechanism, clazz);
- mechanisms.put(ordinal, mechanism);
+ _mechanismToHandlerClassMap.put(mechanism, clazz);
+ if (_mechanisms == null)
+ {
+ _mechanisms = mechanism;
+ }
+ else
+ {
+ // one time cost
+ _mechanisms = _mechanisms + " " + mechanism;
+ }
}
catch (ClassNotFoundException ex)
{
@@ -223,91 +227,5 @@ public class CallbackHandlerRegistry
continue;
}
}
-
- _mechanisms = mechanisms.values(); // order guaranteed by keys of treemap (i.e. our ordinals)
-
-
- }
-
- private void checkPropertyNameFormat(final String propertyName, final String[] parts)
- {
- if (parts.length != 2)
- {
- throw new IllegalArgumentException("Unable to parse property " + propertyName + " when configuring SASL providers");
- }
- }
-
- private int getPropertyOrdinal(final String propertyName, final String[] parts)
- {
- try
- {
- return Integer.parseInt(parts[1]);
- }
- catch(NumberFormatException nfe)
- {
- throw new IllegalArgumentException("Unable to parse property " + propertyName + " when configuring SASL providers", nfe);
- }
- }
-
- /**
- * Selects a SASL mechanism that is mutually available to both parties. If more than one
- * mechanism is mutually available the one appearing first (by ordinal) will be returned.
- *
- * @param peerMechanismList space separated list of mechanisms
- * @return selected mechanism, or null if none available
- */
- public String selectMechanism(final String peerMechanismList)
- {
- final Set<String> peerList = mechListToSet(peerMechanismList);
-
- return selectMechInternal(peerList, Collections.<String>emptySet());
- }
-
- /**
- * Selects a SASL mechanism that is mutually available to both parties.
- *
- * @param peerMechanismList space separated list of mechanisms
- * @param restrictionList space separated list of mechanisms
- * @return selected mechanism, or null if none available
- */
- public String selectMechanism(final String peerMechanismList, final String restrictionList)
- {
- final Set<String> peerList = mechListToSet(peerMechanismList);
- final Set<String> restrictionSet = mechListToSet(restrictionList);
-
- return selectMechInternal(peerList, restrictionSet);
- }
-
- private String selectMechInternal(final Set<String> peerSet, final Set<String> restrictionSet)
- {
- for (final String mech : _mechanisms)
- {
- if (peerSet.contains(mech))
- {
- if (restrictionSet.isEmpty() || restrictionSet.contains(mech))
- {
- return mech;
- }
- }
- }
-
- return null;
- }
-
- private Set<String> mechListToSet(final String mechanismList)
- {
- if (mechanismList == null)
- {
- return Collections.emptySet();
- }
-
- final StringTokenizer tokenizer = new StringTokenizer(mechanismList, " ");
- final Set<String> mechanismSet = new HashSet<String>(tokenizer.countTokens());
- while (tokenizer.hasMoreTokens())
- {
- mechanismSet.add(tokenizer.nextToken());
- }
- return Collections.unmodifiableSet(mechanismSet);
}
-
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties Fri Oct 21 01:19:00 2011
@@ -16,17 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#
-
-#
-# Format:
-# <mechanism name>.ordinal=<implementation>
-#
-# @see CallbackHandlerRegistry
-#
-
-EXTERNAL.1=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-GSSAPI.2=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-CRAM-MD5-HASHED.3=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
-CRAM-MD5.4=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-AMQPLAIN.5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-PLAIN.6=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+CallbackHandler.CRAM-MD5-HASHED=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
+CallbackHandler.CRAM-MD5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+CallbackHandler.AMQPLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+CallbackHandler.PLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties Fri Oct 21 01:19:00 2011
@@ -18,4 +18,3 @@
#
AMQPLAIN=org.apache.qpid.client.security.amqplain.AmqPlainSaslClientFactory
CRAM-MD5-HASHED=org.apache.qpid.client.security.crammd5hashed.CRAMMD5HashedSaslClientFactory
-ANONYMOUS=org.apache.qpid.client.security.anonymous.AnonymousSaslClientFactory
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java Fri Oct 21 01:19:00 2011
@@ -20,29 +20,30 @@
*/
package org.apache.qpid.client.security;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
-import org.apache.qpid.jms.ConnectionURL;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
public class UsernameHashedPasswordCallbackHandler implements AMQCallbackHandler
{
- private ConnectionURL _connectionURL;
+ private static final Logger _logger = LoggerFactory.getLogger(UsernameHashedPasswordCallbackHandler.class);
- /**
- * @see org.apache.qpid.client.security.AMQCallbackHandler#initialise(org.apache.qpid.jms.ConnectionURL)
- */
- @Override
- public void initialise(ConnectionURL connectionURL)
+ private AMQProtocolSession _protocolSession;
+
+ public void initialise(AMQProtocolSession protocolSession)
{
- _connectionURL = connectionURL;
+ _protocolSession = protocolSession;
}
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
@@ -52,13 +53,13 @@ public class UsernameHashedPasswordCallb
Callback cb = callbacks[i];
if (cb instanceof NameCallback)
{
- ((NameCallback) cb).setName(_connectionURL.getUsername());
+ ((NameCallback) cb).setName(_protocolSession.getUsername());
}
else if (cb instanceof PasswordCallback)
{
try
{
- ((PasswordCallback) cb).setPassword(getHash(_connectionURL.getPassword()));
+ ((PasswordCallback) cb).setPassword(getHash(_protocolSession.getPassword()));
}
catch (NoSuchAlgorithmException e)
{
@@ -98,5 +99,4 @@ public class UsernameHashedPasswordCallb
return hash;
}
-
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java Fri Oct 21 01:19:00 2011
@@ -27,19 +27,15 @@ import javax.security.auth.callback.Name
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
-import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
public class UsernamePasswordCallbackHandler implements AMQCallbackHandler
{
- private ConnectionURL _connectionURL;
+ private AMQProtocolSession _protocolSession;
- /**
- * @see org.apache.qpid.client.security.AMQCallbackHandler#initialise(org.apache.qpid.jms.ConnectionURL)
- */
- @Override
- public void initialise(final ConnectionURL connectionURL)
+ public void initialise(AMQProtocolSession protocolSession)
{
- _connectionURL = connectionURL;
+ _protocolSession = protocolSession;
}
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
@@ -49,11 +45,11 @@ public class UsernamePasswordCallbackHan
Callback cb = callbacks[i];
if (cb instanceof NameCallback)
{
- ((NameCallback)cb).setName(_connectionURL.getUsername());
+ ((NameCallback)cb).setName(_protocolSession.getUsername());
}
else if (cb instanceof PasswordCallback)
{
- ((PasswordCallback)cb).setPassword(_connectionURL.getPassword().toCharArray());
+ ((PasswordCallback)cb).setPassword(_protocolSession.getPassword().toCharArray());
}
else
{
@@ -61,5 +57,4 @@ public class UsernamePasswordCallbackHan
}
}
}
-
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Fri Oct 21 01:19:00 2011
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.io.IOException;
/**
* The state manager is responsible for managing the state of the protocol session. <p/>
@@ -47,7 +48,7 @@ import java.util.concurrent.CopyOnWriteA
*
* The two step process is required as there is an inherit race condition between starting a process that will cause
* the state to change and then attempting to wait for that change. The interest in the change must be first set up so
- * that any asynchronous errors that occur can be delivered to the correct waiters.
+ * that any asynchrous errors that occur can be delivered to the correct waiters.
*/
public class AMQStateManager implements AMQMethodListener
{
@@ -83,10 +84,7 @@ public class AMQStateManager implements
public AMQState getCurrentState()
{
- synchronized (_stateLock)
- {
- return _currentState;
- }
+ return _currentState;
}
public void changeState(AMQState newState)
@@ -116,7 +114,7 @@ public class AMQStateManager implements
}
/**
- * Setting of the ProtocolSession will be required when Failover has been successfully completed.
+ * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted.
*
* The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the
* connection to the network.
@@ -133,9 +131,9 @@ public class AMQStateManager implements
}
/**
- * Propagate error to waiters
+ * Propogate error to waiters
*
- * @param error The error to propagate.
+ * @param error The error to propogate.
*/
public void error(Exception error)
{
@@ -179,7 +177,7 @@ public class AMQStateManager implements
}
/**
- * Create and add a new waiter to the notification list.
+ * Create and add a new waiter to the notifcation list.
*
* @param states The waiter will attempt to wait for one of these desired set states to be achived.
*
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java Fri Oct 21 01:19:00 2011
@@ -34,7 +34,7 @@ import java.util.Set;
*
* On construction the current state and a set of States to await for is provided.
*
- * When await() is called the state at construction is compared against the awaitStates. If the state at construction is
+ * When await() is called the state at constuction is compared against the awaitStates. If the state at construction is
* a desired state then await() returns immediately.
*
* Otherwise it will block for the set timeout for a desired state to be achieved.
@@ -48,9 +48,9 @@ public class StateWaiter extends Blockin
{
private static final Logger _logger = LoggerFactory.getLogger(StateWaiter.class);
- private final Set<AMQState> _awaitStates;
- private final AMQState _startState;
- private final AMQStateManager _stateManager;
+ Set<AMQState> _awaitStates;
+ private AMQState _startState;
+ private AMQStateManager _stateManager;
/**
*
@@ -78,9 +78,9 @@ public class StateWaiter extends Blockin
}
/**
- * Await for the required State to be achieved within the default timeout.
+ * Await for the requried State to be achieved within the default timeout.
* @return The achieved state that was requested.
- * @throws AMQException The exception that prevented the required state from being achieved.
+ * @throws AMQException The exception that prevented the required state from being achived.
*/
public AMQState await() throws AMQException
{
@@ -88,13 +88,13 @@ public class StateWaiter extends Blockin
}
/**
- * Await for the required State to be achieved.
+ * Await for the requried State to be achieved.
*
* <b>It is the responsibility of this class to remove the waiter from the StateManager
*
- * @param timeout The time in milliseconds to wait for any of the states to be achieved.
+ * @param timeout The time in milliseconds to wait for any of the states to be achived.
* @return The achieved state that was requested.
- * @throws AMQException The exception that prevented the required state from being achieved.
+ * @throws AMQException The exception that prevented the required state from being achived.
*/
public AMQState await(long timeout) throws AMQException
{
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java Fri Oct 21 01:19:00 2011
@@ -45,7 +45,7 @@ public class URLParser
private void parseURL(String fullURL) throws URLSyntaxException
{
// Connection URL format
- // amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';tcp://host:port?option=\'value\'',failover='method?option=\'value\',option='value''"
+ // amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';vm://:3/virtualpath?option=\'value\'',failover='method?option=\'value\',option='value''"
// Options are of course optional except for requiring a single broker in the broker list.
try
{
@@ -195,7 +195,7 @@ public class URLParser
{
String brokerlist = _url.getOptions().get(AMQConnectionURL.OPTIONS_BROKERLIST);
- // brokerlist tcp://host:port?option='value',option='value';tcp://host:port/virtualpath?option='value'
+ // brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'
StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR);
while (st.hasMoreTokens())
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java Fri Oct 21 01:19:00 2011
@@ -28,8 +28,9 @@ import java.util.concurrent.locks.Reentr
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
/**
* BlockingWaiter is a 'rendezvous' which delegates handling of
@@ -63,8 +64,6 @@ import org.slf4j.LoggerFactory;
*/
public abstract class BlockingWaiter<T>
{
- private static final Logger _logger = LoggerFactory.getLogger(BlockingWaiter.class);
-
/** This flag is used to indicate that the blocked for method has been received. */
private volatile boolean _ready = false;
@@ -181,7 +180,7 @@ public abstract class BlockingWaiter<T>
}
catch (InterruptedException e)
{
- _logger.error(e.getMessage(), e);
+ System.err.println(e.getMessage());
// IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
// if (!_ready && timeout != -1)
// {
@@ -229,12 +228,12 @@ public abstract class BlockingWaiter<T>
}
/**
- * This is a callback, called when an error has occurred that should interrupt any waiter.
+ * This is a callback, called when an error has occured that should interupt any waiter.
* It is also called from within this class to avoid code repetition but it should only be called by the MINA threads.
*
* Once closed any notification of an exception will be ignored.
*
- * @param e The exception being propagated.
+ * @param e The exception being propogated.
*/
public void error(Exception e)
{
@@ -256,7 +255,7 @@ public abstract class BlockingWaiter<T>
}
else
{
- _logger.error("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
+ System.err.println("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
}
if (_waiting.get())
@@ -273,7 +272,7 @@ public abstract class BlockingWaiter<T>
}
catch (InterruptedException e1)
{
- _logger.error(e1.getMessage(), e1);
+ System.err.println(e.getMessage());
}
}
_errorAck = false;
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java Fri Oct 21 01:19:00 2011
@@ -37,9 +37,9 @@ public class JMSSelectorFilter implement
public JMSSelectorFilter(String selector) throws AMQInternalException
{
_selector = selector;
- if (_logger.isDebugEnabled())
+ if (JMSSelectorFilter._logger.isDebugEnabled())
{
- _logger.debug("Created JMSSelectorFilter with selector:" + _selector);
+ JMSSelectorFilter._logger.debug("Created JMSSelectorFilter with selector:" + _selector);
}
_matcher = new SelectorParser().parse(selector);
}
@@ -49,16 +49,16 @@ public class JMSSelectorFilter implement
try
{
boolean match = _matcher.matches(message);
- if (_logger.isDebugEnabled())
+ if (JMSSelectorFilter._logger.isDebugEnabled())
{
- _logger.debug(message + " match(" + match + ") selector(" + System
+ JMSSelectorFilter._logger.debug(message + " match(" + match + ") selector(" + System
.identityHashCode(_selector) + "):" + _selector);
}
return match;
}
catch (AMQInternalException e)
{
- _logger.warn("Caught exception when evaluating message selector for message " + message, e);
+ JMSSelectorFilter._logger.warn("Caght exception when evaluating message selector for message " + message, e);
}
return false;
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java Fri Oct 21 01:19:00 2011
@@ -19,7 +19,6 @@ package org.apache.qpid.filter;
import java.util.HashMap;
-import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import org.apache.qpid.AMQInternalException;
@@ -33,7 +32,7 @@ import org.slf4j.LoggerFactory;
public class PropertyExpression implements Expression
{
// Constants - defined the same as JMS
- private static enum JMSDeliveryMode { NON_PERSISTENT, PERSISTENT }
+ private static final int NON_PERSISTENT = 1;
private static final int DEFAULT_PRIORITY = 4;
private static final Logger _logger = LoggerFactory.getLogger(PropertyExpression.class);
@@ -80,24 +79,22 @@ public class PropertyExpression implemen
{
public Object evaluate(AbstractJMSMessage message)
{
-
- JMSDeliveryMode mode = JMSDeliveryMode.NON_PERSISTENT;
try
{
- mode = message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ?
- JMSDeliveryMode.PERSISTENT : JMSDeliveryMode.NON_PERSISTENT;
-
+ int mode = message.getJMSDeliveryMode();
if (_logger.isDebugEnabled())
{
_logger.debug("JMSDeliveryMode is :" + mode);
}
+
+ return mode;
}
catch (JMSException e)
{
_logger.warn("Error evaluating property",e);
}
- return mode.toString();
+ return NON_PERSISTENT;
}
});
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java Fri Oct 21 01:19:00 2011
@@ -22,7 +22,7 @@ package org.apache.qpid.jms;
import java.util.Map;
-import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.client.SSLConfiguration;
public interface BrokerDetails
{
@@ -52,7 +52,9 @@ public interface BrokerDetails
public static final int DEFAULT_PORT = 5672;
+ public static final String SOCKET = "socket";
public static final String TCP = "tcp";
+ public static final String VM = "vm";
public static final String DEFAULT_TRANSPORT = TCP;
@@ -104,12 +106,14 @@ public interface BrokerDetails
long getTimeout();
void setTimeout(long timeout);
+
+ SSLConfiguration getSSLConfiguration();
+
+ void setSSLConfiguration(SSLConfiguration sslConfiguration);
boolean getBooleanProperty(String propName);
String toString();
boolean equals(Object o);
-
- ConnectionSettings buildConnectionSettings();
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Fri Oct 21 01:19:00 2011
@@ -27,7 +27,7 @@ import java.util.List;
/**
Connection URL format
- amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\';tcp://host:port/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''"
+ amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\';vm://:3/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''"
Options are of course optional except for requiring a single broker in the broker list.
The option seperator is defined to be either '&' or ','
*/
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java Fri Oct 21 01:19:00 2011
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.jms;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.failover.FailoverExchangeMethod;
import org.apache.qpid.jms.failover.FailoverMethod;
import org.apache.qpid.jms.failover.FailoverRoundRobinServers;
@@ -50,7 +51,7 @@ public class FailoverPolicy
private long _lastMethodTime;
private long _lastFailTime;
- public FailoverPolicy(ConnectionURL connectionDetails, Connection conn)
+ public FailoverPolicy(ConnectionURL connectionDetails, AMQConnection conn)
{
FailoverMethod method;
@@ -82,7 +83,7 @@ public class FailoverPolicy
*/
if (failoverMethod.equals(FailoverMethod.SINGLE_BROKER))
{
- method = new FailoverSingleServer(connectionDetails);
+ method = new FailoverRoundRobinServers(connectionDetails);
}
else
{
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java Fri Oct 21 01:19:00 2011
@@ -51,4 +51,7 @@ public interface MessageProducer extends
int priority, long timeToLive, boolean mandatory, boolean immediate)
throws JMSException;
+ void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
+ boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException;
+
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java Fri Oct 21 01:19:00 2011
@@ -32,9 +32,9 @@ import javax.jms.Session;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQBrokerDetails;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.Connection;
import org.apache.qpid.jms.ConnectionURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +58,7 @@ public class FailoverExchangeMethod impl
private static final Logger _logger = LoggerFactory.getLogger(FailoverExchangeMethod.class);
/** This is not safe to use until attainConnection is called */
- private Connection _conn;
+ private AMQConnection _conn;
/** Protects the broker list when modifications happens */
private Object _brokerListLock = new Object();
@@ -80,7 +80,7 @@ public class FailoverExchangeMethod impl
/** Denotes the number of failed attempts **/
private int _failedAttemps = 0;
- public FailoverExchangeMethod(ConnectionURL connectionDetails, Connection conn)
+ public FailoverExchangeMethod(ConnectionURL connectionDetails, AMQConnection conn)
{
_connectionDetails = connectionDetails;
_originalBrokerDetail = _connectionDetails.getBrokerDetails(0);
@@ -140,6 +140,7 @@ public class FailoverExchangeMethod impl
broker.setHost(tokens[1]);
broker.setPort(Integer.parseInt(tokens[2]));
broker.setProperties(_originalBrokerDetail.getProperties());
+ broker.setSSLConfiguration(_originalBrokerDetail.getSSLConfiguration());
brokerList.add(broker);
if (currentBrokerIP.equals(broker.getHost()) &&
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Fri Oct 21 01:19:00 2011
@@ -36,7 +36,6 @@ import javax.jms.Queue;
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.NamingException;
-import javax.naming.ConfigurationException;
import javax.naming.spi.InitialContextFactory;
import org.apache.qpid.client.AMQConnectionFactory;
@@ -140,7 +139,7 @@ public class PropertiesFileInitialContex
return new ReadOnlyContext(environment, data);
}
- protected void createConnectionFactories(Map data, Hashtable environment) throws ConfigurationException
+ protected void createConnectionFactories(Map data, Hashtable environment)
{
for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
{
@@ -158,7 +157,7 @@ public class PropertiesFileInitialContex
}
}
- protected void createDestinations(Map data, Hashtable environment) throws ConfigurationException
+ protected void createDestinations(Map data, Hashtable environment)
{
for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
{
@@ -226,7 +225,7 @@ public class PropertiesFileInitialContex
/**
* Factory method to create new Connection Factory instances
*/
- protected ConnectionFactory createFactory(String url) throws ConfigurationException
+ protected ConnectionFactory createFactory(String url)
{
try
{
@@ -234,18 +233,16 @@ public class PropertiesFileInitialContex
}
catch (URLSyntaxException urlse)
{
- _logger.warn("Unable to create factory:" + urlse);
-
- ConfigurationException ex = new ConfigurationException("Failed to parse entry: " + urlse + " due to : " + urlse.getMessage());
- ex.initCause(urlse);
- throw ex;
+ _logger.warn("Unable to createFactories:" + urlse);
}
+
+ return null;
}
/**
* Factory method to create new Destination instances from an AMQP BindingURL
*/
- protected Destination createDestination(String str) throws ConfigurationException
+ protected Destination createDestination(String str)
{
try
{
@@ -255,9 +252,7 @@ public class PropertiesFileInitialContex
{
_logger.warn("Unable to create destination:" + e, e);
- ConfigurationException ex = new ConfigurationException("Failed to parse entry: " + str + " due to : " + e.getMessage());
- ex.initCause(e);
- throw ex;
+ return null;
}
}
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java Fri Oct 21 01:19:00 2011
@@ -23,6 +23,7 @@ package org.apache.qpid.client;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.url.URLSyntaxException;
@@ -36,18 +37,53 @@ public class MockAMQConnection extends A
super(broker, username, password, clientName, virtualHost);
}
+ public MockAMQConnection(String broker, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig)
+ throws AMQException, URLSyntaxException
+ {
+ super(broker, username, password, clientName, virtualHost, sslConfig);
+ }
+
public MockAMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
throws AMQException, URLSyntaxException
{
super(host, port, username, password, clientName, virtualHost);
}
+ public MockAMQConnection(String host, int port, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig)
+ throws AMQException, URLSyntaxException
+ {
+ super(host, port, username, password, clientName, virtualHost, sslConfig);
+ }
+
+ public MockAMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig)
+ throws AMQException, URLSyntaxException
+ {
+ super(host, port, useSSL, username, password, clientName, virtualHost, sslConfig);
+ }
+
public MockAMQConnection(String connection)
throws AMQException, URLSyntaxException
{
super(connection);
}
+ public MockAMQConnection(String connection, SSLConfiguration sslConfig)
+ throws AMQException, URLSyntaxException
+ {
+ super(connection, sslConfig);
+ }
+
+ public MockAMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig)
+ throws AMQException
+ {
+ super(connectionURL, sslConfig);
+ }
+
+ protected MockAMQConnection(String username, String password, String clientName, String virtualHost)
+ {
+ super(username, password, clientName, virtualHost);
+ }
+
@Override
public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException
{
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java Fri Oct 21 01:19:00 2011
@@ -43,9 +43,4 @@ public class TestMessageHelper
{
return new JMSStreamMessage(AMQMessageDelegateFactory.FACTORY_0_8);
}
-
- public static JMSObjectMessage newJMSObjectMessage()
- {
- return new JMSObjectMessage(AMQMessageDelegateFactory.FACTORY_0_8);
- }
}
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java Fri Oct 21 01:19:00 2011
@@ -20,24 +20,23 @@
*/
package org.apache.qpid.client.protocol;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
import junit.framework.TestCase;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQAuthenticationException;
-import org.apache.qpid.client.MockAMQConnection;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl;
+import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.TestNetworkConnection;
+import org.apache.qpid.transport.TestNetworkDriver;
+import org.apache.qpid.client.MockAMQConnection;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.client.state.AMQState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
/**
* This is a test address QPID-1431 where frame listeners would fail to be notified of an incomming exception.
*
@@ -73,8 +72,8 @@ public class AMQProtocolHandlerTest exte
public void setUp() throws Exception
{
//Create a new ProtocolHandler with a fake connection.
- _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'"));
- _handler.setNetworkConnection(new TestNetworkConnection());
+ _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"));
+ _handler.setNetworkDriver(new TestNetworkDriver());
AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1);
_blockFrame = new AMQFrame(0, body);
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java Fri Oct 21 01:19:00 2011
@@ -43,6 +43,15 @@ public class BrokerDetailsTest extends T
assertTrue(broker.getProperty("immediatedelivery").equals("true"));
}
+ public void testVMBroker() throws URLSyntaxException
+ {
+ String url = "vm://:2";
+
+ AMQBrokerDetails broker = new AMQBrokerDetails(url);
+ assertTrue(broker.getTransport().equals("vm"));
+ assertEquals(broker.getPort(), 2);
+ }
+
public void testTransportsDefaultToTCP() throws URLSyntaxException
{
String url = "localhost:5672";
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java Fri Oct 21 01:19:00 2011
@@ -73,7 +73,7 @@ public class ChannelCloseMethodHandlerNo
{
throw new AMQNoRouteException("Error: " + reason, null, null);
}
- else if (errorCode == AMQConstant.ARGUMENT_INVALID)
+ else if (errorCode == AMQConstant.INVALID_ARGUMENT)
{
_logger.debug("Broker responded with Invalid Argument.");
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org