You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/09/19 17:13:38 UTC
svn commit: r1172657 [17/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/
cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/
cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/
cpp/bindings/qpid/dotnet/examples/csha...
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Mon Sep 19 15:13:18 2011
@@ -30,6 +30,8 @@ import static org.apache.qpid.transport.
import static org.apache.qpid.transport.Session.State.NEW;
import static org.apache.qpid.transport.Session.State.OPEN;
import static org.apache.qpid.transport.Session.State.RESUMING;
+
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.transport.network.Frame;
import static org.apache.qpid.transport.util.Functions.mod;
import org.apache.qpid.transport.util.Logger;
@@ -42,7 +44,6 @@ import static org.apache.qpid.util.Seria
import static org.apache.qpid.util.Strings.toUTF8;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -58,7 +59,6 @@ import java.util.concurrent.TimeUnit;
public class Session extends SessionInvoker
{
-
private static final Logger log = Logger.get(Session.class);
public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
@@ -92,7 +92,9 @@ public class Session extends SessionInvo
private int channel;
private SessionDelegate delegate;
private SessionListener listener = new DefaultSessionListener();
- private long timeout = 60000;
+ private final long timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
+ Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
+ ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
private boolean autoSync = false;
private boolean incomingInit;
@@ -120,7 +122,9 @@ public class Session extends SessionInvo
private Thread resumer = null;
private boolean transacted = false;
-
+ private SessionDetachCode detachCode;
+ private final Object stateLock = new Object();
+
protected Session(Connection connection, Binary name, long expiry)
{
this(connection, new SessionDelegate(), name, expiry);
@@ -962,16 +966,29 @@ public class Session extends SessionInvo
public void close()
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Closing [%s] in state [%s]", this, state);
+ }
synchronized (commands)
{
- state = CLOSING;
- setClose(true);
- sessionRequestTimeout(0);
- sessionDetach(name.getBytes());
-
- awaitClose();
-
-
+ switch(state)
+ {
+ case DETACHED:
+ state = CLOSED;
+ delegate.closed(this);
+ connection.removeSession(this);
+ listener.closed(this);
+ break;
+ case CLOSED:
+ break;
+ default:
+ state = CLOSING;
+ setClose(true);
+ sessionRequestTimeout(0);
+ sessionDetach(name.getBytes());
+ awaitClose();
+ }
}
}
@@ -1045,13 +1062,55 @@ public class Session extends SessionInvo
{
return String.format("ssn:%s", name);
}
-
+
public void setTransacted(boolean b) {
this.transacted = b;
}
-
+
public boolean isTransacted(){
return transacted;
}
-
+
+ public void setDetachCode(SessionDetachCode dtc)
+ {
+ this.detachCode = dtc;
+ }
+
+ public SessionDetachCode getDetachCode()
+ {
+ return this.detachCode;
+ }
+
+ public void awaitOpen()
+ {
+ switch (state)
+ {
+ case NEW:
+ synchronized(stateLock)
+ {
+ Waiter w = new Waiter(stateLock, timeout);
+ while (w.hasTime() && state == NEW)
+ {
+ w.await();
+ }
+ }
+
+ if (state != OPEN)
+ {
+ throw new SessionException("Timed out waiting for Session to open");
+ }
+ break;
+ case DETACHED:
+ case CLOSING:
+ case CLOSED:
+ throw new SessionException("Session closed");
+ default :
+ break;
+ }
+ }
+
+ public Object getStateLock()
+ {
+ return stateLock;
+ }
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Mon Sep 19 15:13:18 2011
@@ -76,6 +76,10 @@ public class SessionDelegate
@Override public void sessionAttached(Session ssn, SessionAttached atc)
{
ssn.setState(Session.State.OPEN);
+ synchronized (ssn.getStateLock())
+ {
+ ssn.getStateLock().notifyAll();
+ }
}
@Override public void sessionTimeout(Session ssn, SessionTimeout t)
@@ -203,10 +207,18 @@ public class SessionDelegate
public void closed(Session session)
{
log.debug("CLOSED: [%s]", session);
+ synchronized (session.getStateLock())
+ {
+ session.getStateLock().notifyAll();
+ }
}
public void detached(Session session)
{
log.debug("DETACHED: [%s]", session);
+ synchronized (session.getStateLock())
+ {
+ session.getStateLock().notifyAll();
+ }
}
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java Mon Sep 19 15:13:18 2011
@@ -20,11 +20,12 @@
*/
package org.apache.qpid.transport.network;
+import javax.net.ssl.SSLContext;
+
import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.NetworkTransportConfiguration;
public interface IncomingNetworkTransport extends NetworkTransport
{
- public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContextFactory sslFactory);
+ public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext);
}
\ No newline at end of file
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java Mon Sep 19 15:13:18 2011
@@ -29,6 +29,8 @@ public interface NetworkConnection
{
Sender<ByteBuffer> getSender();
+ void start();
+
void close();
/**
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java Mon Sep 19 15:13:18 2011
@@ -20,9 +20,11 @@
*/
package org.apache.qpid.transport.network;
+/**
+ * A network transport is responsible for the establishment of network connections.
+ * NetworkTransport implementations are pluggable via the {@link Transport} class.
+ */
public interface NetworkTransport
{
public void close();
-
- public NetworkConnection getConnection();
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java Mon Sep 19 15:13:18 2011
@@ -22,11 +22,14 @@ package org.apache.qpid.transport.networ
import java.nio.ByteBuffer;
-import org.apache.qpid.ssl.SSLContextFactory;
+import javax.net.ssl.SSLContext;
+
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Receiver;
public interface OutgoingNetworkTransport extends NetworkTransport
{
- public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory);
+ public NetworkConnection getConnection();
+
+ public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext);
}
\ No newline at end of file
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java Mon Sep 19 15:13:18 2011
@@ -20,7 +20,115 @@
*/
package org.apache.qpid.transport.network;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.transport.TransportException;
+
public class Transport
{
+ public static final String QPID_TRANSPORT_PROPNAME = "qpid.transport";
+ public static final String QPID_TRANSPORT_V0_8_PROPNAME = "qpid.transport.v0_8";
+ public static final String QPID_TRANSPORT_V0_9_PROPNAME = "qpid.transport.v0_9";
+ public static final String QPID_TRANSPORT_V0_9_1_PROPNAME = "qpid.transport.v0_9_1";
+ public static final String QPID_TRANSPORT_V0_10_PROPNAME = "qpid.transport.v0_10";
+ public static final String QPID_BROKER_TRANSPORT_PROPNAME = "qpid.broker.transport";
+
+ // Can't reference the class directly here, as this would preclude the ability to bundle transports separately.
+ private static final String IO_TRANSPORT_CLASSNAME = "org.apache.qpid.transport.network.io.IoNetworkTransport";
+
public static final String TCP = "tcp";
+
+ private final static Map<ProtocolVersion,String> OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP;
+
+ static
+ {
+ final Map<ProtocolVersion,String> map = new HashMap<ProtocolVersion, String>();
+ map.put(ProtocolVersion.v8_0, IO_TRANSPORT_CLASSNAME);
+ map.put(ProtocolVersion.v0_9, IO_TRANSPORT_CLASSNAME);
+ map.put(ProtocolVersion.v0_91, IO_TRANSPORT_CLASSNAME);
+ map.put(ProtocolVersion.v0_10, IO_TRANSPORT_CLASSNAME);
+
+ OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP = Collections.unmodifiableMap(map);
+ }
+
+ public static IncomingNetworkTransport getIncomingTransportInstance()
+ {
+ return (IncomingNetworkTransport) loadTransportClass(
+ System.getProperty(QPID_BROKER_TRANSPORT_PROPNAME, IO_TRANSPORT_CLASSNAME));
+ }
+
+ public static OutgoingNetworkTransport getOutgoingTransportInstance(
+ final ProtocolVersion protocolVersion)
+ {
+
+ final String overrride = getOverrideClassNameFromSystemProperty(protocolVersion);
+ final String networkTransportClassName;
+ if (overrride != null)
+ {
+ networkTransportClassName = overrride;
+ }
+ else
+ {
+ networkTransportClassName = OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP.get(protocolVersion);
+ }
+
+ return (OutgoingNetworkTransport) loadTransportClass(networkTransportClassName);
+ }
+
+ private static NetworkTransport loadTransportClass(final String networkTransportClassName)
+ {
+ if (networkTransportClassName == null)
+ {
+ throw new IllegalArgumentException("transport class name must not be null");
+ }
+
+ try
+ {
+ final Class<?> clazz = Class.forName(networkTransportClassName);
+ return (NetworkTransport) clazz.newInstance();
+ }
+ catch (InstantiationException e)
+ {
+ throw new TransportException("Unable to instantiate transport class " + networkTransportClassName, e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new TransportException("Access exception " + networkTransportClassName, e);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new TransportException("Unable to load transport class " + networkTransportClassName, e);
+ }
+ }
+
+ private static String getOverrideClassNameFromSystemProperty(final ProtocolVersion protocolVersion)
+ {
+ final String protocolSpecificSystemProperty;
+
+ if (ProtocolVersion.v0_10.equals(protocolVersion))
+ {
+ protocolSpecificSystemProperty = QPID_TRANSPORT_V0_10_PROPNAME;
+ }
+ else if (ProtocolVersion.v0_91.equals(protocolVersion))
+ {
+ protocolSpecificSystemProperty = QPID_TRANSPORT_V0_9_1_PROPNAME;
+ }
+ else if (ProtocolVersion.v0_9.equals(protocolVersion))
+ {
+ protocolSpecificSystemProperty = QPID_TRANSPORT_V0_9_PROPNAME;
+ }
+ else if (ProtocolVersion.v8_0.equals(protocolVersion))
+ {
+ protocolSpecificSystemProperty = QPID_TRANSPORT_V0_8_PROPNAME;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unknown ProtocolVersion " + protocolVersion);
+ }
+
+ return System.getProperty(protocolSpecificSystemProperty, System.getProperty(QPID_TRANSPORT_PROPNAME));
+ }
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java Mon Sep 19 15:13:18 2011
@@ -37,7 +37,7 @@ public class IoNetworkConnection impleme
private final long _timeout;
private final IoSender _ioSender;
private final IoReceiver _ioReceiver;
-
+
public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
int sendBufferSize, int receiveBufferSize, long timeout)
{
@@ -45,9 +45,15 @@ public class IoNetworkConnection impleme
_timeout = timeout;
_ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout);
+
_ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout);
+
_ioSender.registerCloseListener(_ioReceiver);
+ }
+
+ public void start()
+ {
_ioReceiver.initiate();
_ioSender.initiate();
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Mon Sep 19 15:13:18 2011
@@ -21,41 +21,35 @@
package org.apache.qpid.transport.network.io;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
+import java.net.*;
import java.nio.ByteBuffer;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.TransportException;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocketFactory;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
import org.apache.qpid.transport.util.Logger;
-public class IoNetworkTransport implements OutgoingNetworkTransport
+public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
{
- static
- {
- org.apache.mina.common.ByteBuffer.setAllocator
- (new org.apache.mina.common.SimpleByteBufferAllocator());
- org.apache.mina.common.ByteBuffer.setUseDirectBuffers
- (Boolean.getBoolean("amqj.enableDirectBuffers"));
- }
private static final Logger LOGGER = Logger.get(IoNetworkTransport.class);
private Socket _socket;
private IoNetworkConnection _connection;
private long _timeout = 60000;
-
- public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory)
+ private AcceptingThread _acceptor;
+
+ public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext)
{
int sendBufferSize = settings.getWriteBufferSize();
int receiveBufferSize = settings.getReadBufferSize();
-
+
try
{
_socket = new Socket();
@@ -83,6 +77,7 @@ public class IoNetworkTransport implemen
try
{
_connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout);
+ _connection.start();
}
catch(Exception e)
{
@@ -103,11 +98,134 @@ public class IoNetworkTransport implemen
public void close()
{
- _connection.close();
+ if(_connection != null)
+ {
+ _connection.close();
+ }
+ if(_acceptor != null)
+ {
+ _acceptor.close();
+ }
}
public NetworkConnection getConnection()
{
return _connection;
}
+
+ public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext)
+ {
+
+ try
+ {
+ _acceptor = new AcceptingThread(config, factory, sslContext);
+
+ _acceptor.start();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Unable to start server socket", e);
+ }
+
+
+ }
+
+ private class AcceptingThread extends Thread
+ {
+ private NetworkTransportConfiguration _config;
+ private ProtocolEngineFactory _factory;
+ private SSLContext _sslContent;
+ private ServerSocket _serverSocket;
+
+ private AcceptingThread(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext)
+ throws IOException
+ {
+ _config = config;
+ _factory = factory;
+ _sslContent = sslContext;
+
+ InetSocketAddress address = new InetSocketAddress(config.getHost(), config.getPort());
+
+ if(sslContext == null)
+ {
+ _serverSocket = new ServerSocket();
+ }
+ else
+ {
+ SSLServerSocketFactory socketFactory = sslContext.getServerSocketFactory();
+ _serverSocket = socketFactory.createServerSocket();
+ }
+
+ _serverSocket.bind(address);
+ _serverSocket.setReuseAddress(true);
+
+
+ }
+
+
+ /**
+ Close the underlying ServerSocket if it has not already been closed.
+ */
+ public void close()
+ {
+ if (!_serverSocket.isClosed())
+ {
+ try
+ {
+ _serverSocket.close();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ while (true)
+ {
+ try
+ {
+ Socket socket = _serverSocket.accept();
+ socket.setTcpNoDelay(_config.getTcpNoDelay());
+
+ final Integer sendBufferSize = _config.getSendBufferSize();
+ final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+ socket.setSendBufferSize(sendBufferSize);
+ socket.setReceiveBufferSize(receiveBufferSize);
+
+ ProtocolEngine engine = _factory.newProtocolEngine();
+
+ NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout);
+
+
+ engine.setNetworkConnection(connection, connection.getSender());
+
+ connection.start();
+
+
+ }
+ catch(RuntimeException e)
+ {
+ LOGGER.error(e, "Error in Acceptor thread " + _config.getPort());
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ LOGGER.debug(e, "SocketException - no new connections will be accepted on port "
+ + _config.getPort());
+ }
+ }
+
+
+ }
+
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Mon Sep 19 15:13:18 2011
@@ -92,13 +92,23 @@ final class IoReceiver implements Runnab
{
try
{
- if (shutdownBroken)
+ try
{
- socket.close();
+ if (shutdownBroken)
+ {
+ socket.close();
+ }
+ else
+ {
+ socket.shutdownInput();
+ }
}
- else
+ catch(SocketException se)
{
- socket.shutdownInput();
+ if(!socket.isClosed() && !socket.isInputShutdown())
+ {
+ throw se;
+ }
}
if (block && Thread.currentThread() != receiverThread)
{
@@ -117,6 +127,7 @@ final class IoReceiver implements Runnab
{
throw new TransportException(e);
}
+
}
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Mon Sep 19 15:13:18 2011
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
@@ -59,7 +60,7 @@ public final class IoSender implements R
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread senderThread;
private final List<Closeable> _listeners = new ArrayList<Closeable>();
-
+
private volatile Throwable exception = null;
public IoSender(Socket socket, int bufferSize, long timeout)
@@ -80,13 +81,13 @@ public final class IoSender implements R
try
{
//Create but deliberately don't start the thread.
- senderThread = Threading.getThreadFactory().createThread(this);
+ senderThread = Threading.getThreadFactory().createThread(this);
}
catch(Exception e)
{
throw new Error("Error creating IOSender thread",e);
}
-
+
senderThread.setDaemon(true);
senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
}
@@ -110,7 +111,7 @@ public final class IoSender implements R
{
if (closed.get())
{
- throw new SenderException("sender is closed", exception);
+ throw new SenderClosedException("sender is closed", exception);
}
final int size = buffer.length;
@@ -143,7 +144,7 @@ public final class IoSender implements R
if (closed.get())
{
- throw new SenderException("sender is closed", exception);
+ throw new SenderClosedException("sender is closed", exception);
}
if (head - tail >= size)
@@ -255,7 +256,7 @@ public final class IoSender implements R
public void run()
{
- final int size = buffer.length;
+ final int size = buffer.length;
while (true)
{
final int hd = head;
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java Mon Sep 19 15:13:18 2011
@@ -25,8 +25,8 @@ import java.nio.ByteBuffer;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
@@ -37,149 +37,12 @@ import org.apache.qpid.transport.network
import org.apache.qpid.transport.network.security.ssl.SSLSender;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
-public class SecurityLayer
+public interface SecurityLayer
{
- ConnectionSettings settings;
- Connection con;
- SSLSecurityLayer sslLayer;
- SASLSecurityLayer saslLayer;
-
- public void init(Connection con) throws TransportException
- {
- this.con = con;
- this.settings = con.getConnectionSettings();
- if (settings.isUseSSL())
- {
- sslLayer = new SSLSecurityLayer();
- }
- if (settings.isUseSASLEncryption())
- {
- saslLayer = new SASLSecurityLayer();
- }
-
- }
-
- public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
- {
- Sender<ByteBuffer> sender = delegate;
-
- if (settings.isUseSSL())
- {
- sender = sslLayer.sender(sender);
- }
-
- if (settings.isUseSASLEncryption())
- {
- sender = saslLayer.sender(sender);
- }
-
- return sender;
- }
-
- public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
- {
- Receiver<ByteBuffer> receiver = delegate;
-
- if (settings.isUseSSL())
- {
- receiver = sslLayer.receiver(receiver);
- }
-
- if (settings.isUseSASLEncryption())
- {
- receiver = saslLayer.receiver(receiver);
- }
-
- return receiver;
- }
-
- public String getUserID()
- {
- if (settings.isUseSSL())
- {
- return sslLayer.getUserID();
- }
- else
- {
- return null;
- }
- }
-
- class SSLSecurityLayer
- {
- SSLEngine engine;
- SSLSender sender;
-
- public SSLSecurityLayer()
- {
- SSLContext sslCtx;
- try
- {
- sslCtx = SSLUtil.createSSLContext(settings);
- }
- catch (Exception e)
- {
- throw new TransportException("Error creating SSL Context", e);
- }
-
- try
- {
- engine = sslCtx.createSSLEngine();
- engine.setUseClientMode(true);
- }
- catch(Exception e)
- {
- throw new TransportException("Error creating SSL Engine", e);
- }
- }
-
- public SSLSender sender(Sender<ByteBuffer> delegate)
- {
- sender = new SSLSender(engine,delegate);
- sender.setConnectionSettings(settings);
- return sender;
- }
-
- public SSLReceiver receiver(Receiver<ByteBuffer> delegate)
- {
- if (sender == null)
- {
- throw new
- IllegalStateException("SecurityLayer.sender method should be " +
- "invoked before SecurityLayer.receiver");
- }
-
- SSLReceiver receiver = new SSLReceiver(engine,delegate,sender);
- receiver.setConnectionSettings(settings);
- return receiver;
- }
-
- public String getUserID()
- {
- return SSLUtil.retriveIdentity(engine);
- }
-
- }
-
- class SASLSecurityLayer
- {
- public SASLSecurityLayer()
- {
- }
-
- public SASLSender sender(Sender<ByteBuffer> delegate)
- {
- SASLSender sender = new SASLSender(delegate);
- con.addConnectionListener((ConnectionListener)sender);
- return sender;
- }
-
- public SASLReceiver receiver(Receiver<ByteBuffer> delegate)
- {
- SASLReceiver receiver = new SASLReceiver(delegate);
- con.addConnectionListener((ConnectionListener)receiver);
- return receiver;
- }
-
- }
+
+ public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate);
+ public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate);
+ public String getUserID();
+
}
+
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java Mon Sep 19 15:13:18 2011
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.transport.network.security.ssl;
+import java.io.IOException;
import java.net.Socket;
+import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.Principal;
import java.security.PrivateKey;
@@ -40,7 +42,7 @@ public class QpidClientX509KeyManager ex
String alias;
public QpidClientX509KeyManager(String alias, String keyStorePath,
- String keyStorePassword,String keyStoreCertType) throws Exception
+ String keyStorePassword,String keyStoreCertType) throws GeneralSecurityException, IOException
{
this.alias = alias;
KeyStore ks = SSLUtil.getInitializedKeyStore(keyStorePath,keyStorePassword);
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java Mon Sep 19 15:13:18 2011
@@ -24,43 +24,43 @@ import java.nio.ByteBuffer;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLException;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLException;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.security.SSLStatus;
import org.apache.qpid.transport.util.Logger;
public class SSLReceiver implements Receiver<ByteBuffer>
{
- private Receiver<ByteBuffer> delegate;
- private SSLEngine engine;
- private SSLSender sender;
- private int sslBufSize;
+ private static final Logger log = Logger.get(SSLReceiver.class);
+
+ private final Receiver<ByteBuffer> delegate;
+ private final SSLEngine engine;
+ private final int sslBufSize;
+ private final ByteBuffer localBuffer;
+ private final SSLStatus _sslStatus;
private ByteBuffer appData;
- private ByteBuffer localBuffer;
private boolean dataCached = false;
- private final Object notificationToken;
- private ConnectionSettings settings;
-
- private static final Logger log = Logger.get(SSLReceiver.class);
- public SSLReceiver(SSLEngine engine, Receiver<ByteBuffer> delegate,SSLSender sender)
+ private String _hostname;
+
+ public SSLReceiver(final SSLEngine engine, final Receiver<ByteBuffer> delegate, final SSLStatus sslStatus)
{
this.engine = engine;
this.delegate = delegate;
- this.sender = sender;
this.sslBufSize = engine.getSession().getApplicationBufferSize();
appData = ByteBuffer.allocate(sslBufSize);
localBuffer = ByteBuffer.allocate(sslBufSize);
- notificationToken = sender.getNotificationToken();
+ _sslStatus = sslStatus;
}
- public void setConnectionSettings(ConnectionSettings settings)
+ public void setHostname(String hostname)
{
- this.settings = settings;
+ _hostname = hostname;
}
public void closed()
@@ -102,9 +102,9 @@ public class SSLReceiver implements Rece
try
{
SSLEngineResult result = engine.unwrap(netData, appData);
- synchronized (notificationToken)
+ synchronized (_sslStatus.getSslLock())
{
- notificationToken.notifyAll();
+ _sslStatus.getSslLock().notifyAll();
}
int read = result.bytesProduced();
@@ -129,9 +129,9 @@ public class SSLReceiver implements Rece
switch(status)
{
case CLOSED:
- synchronized(notificationToken)
+ synchronized(_sslStatus.getSslLock())
{
- notificationToken.notifyAll();
+ _sslStatus.getSslLock().notifyAll();
}
return;
@@ -163,20 +163,20 @@ public class SSLReceiver implements Rece
break;
case NEED_TASK:
- sender.doTasks();
+ doTasks();
handshakeStatus = engine.getHandshakeStatus();
case FINISHED:
- if (this.settings != null && this.settings.isVerifyHostname() )
+ if (_hostname != null)
{
- SSLUtil.verifyHostname(engine, this.settings.getHost());
+ SSLUtil.verifyHostname(engine, _hostname);
}
case NEED_WRAP:
case NOT_HANDSHAKING:
- synchronized(notificationToken)
+ synchronized(_sslStatus.getSslLock())
{
- notificationToken.notifyAll();
+ _sslStatus.getSslLock().notifyAll();
}
break;
@@ -189,14 +189,23 @@ public class SSLReceiver implements Rece
catch(SSLException e)
{
log.error(e, "Error caught in SSLReceiver");
- sender.setErrorFlag();
- synchronized(notificationToken)
+ _sslStatus.setSslErrorFlag();
+ synchronized(_sslStatus.getSslLock())
{
- notificationToken.notifyAll();
+ _sslStatus.getSslLock().notifyAll();
}
exception(new TransportException("Error in SSLReceiver",e));
}
}
}
+
+ private void doTasks()
+ {
+ Runnable runnable;
+ while ((runnable = engine.getDelegatedTask()) != null) {
+ runnable.run();
+ }
+ }
+
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java Mon Sep 19 15:13:18 2011
@@ -31,35 +31,38 @@ import javax.net.ssl.SSLEngineResult.Sta
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.security.SSLStatus;
import org.apache.qpid.transport.util.Logger;
public class SSLSender implements Sender<ByteBuffer>
{
- private Sender<ByteBuffer> delegate;
- private SSLEngine engine;
- private int sslBufSize;
- private ByteBuffer netData;
- private long timeout = 30000;
- private ConnectionSettings settings;
-
- private final Object engineState = new Object();
+ private static final Logger log = Logger.get(SSLSender.class);
+
+ private final Sender<ByteBuffer> delegate;
+ private final SSLEngine engine;
+ private final int sslBufSize;
+ private final ByteBuffer netData;
+ private final long timeout;
+ private final SSLStatus _sslStatus;
+
+ private String _hostname;
+
private final AtomicBoolean closed = new AtomicBoolean(false);
- private final AtomicBoolean error = new AtomicBoolean(false);
- private static final Logger log = Logger.get(SSLSender.class);
- public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate)
+ public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
{
this.engine = engine;
this.delegate = delegate;
sslBufSize = engine.getSession().getPacketBufferSize();
netData = ByteBuffer.allocate(sslBufSize);
timeout = Long.getLong("qpid.ssl_timeout", 60000);
+ _sslStatus = sslStatus;
}
- public void setConnectionSettings(ConnectionSettings settings)
+ public void setHostname(String hostname)
{
- this.settings = settings;
+ _hostname = hostname;
}
public void close()
@@ -83,13 +86,13 @@ public class SSLSender implements Sender
}
- synchronized(engineState)
+ synchronized(_sslStatus.getSslLock())
{
while (!engine.isOutboundDone())
{
try
{
- engineState.wait();
+ _sslStatus.getSslLock().wait();
}
catch(InterruptedException e)
{
@@ -148,7 +151,7 @@ public class SSLSender implements Sender
HandshakeStatus handshakeStatus;
Status status;
- while(appData.hasRemaining() && !error.get())
+ while(appData.hasRemaining() && !_sslStatus.getSslErrorFlag())
{
int read = 0;
try
@@ -160,6 +163,7 @@ public class SSLSender implements Sender
}
catch(SSLException e)
{
+ // Should this set _sslError??
throw new SenderException("SSL, Error occurred while encrypting data",e);
}
@@ -207,7 +211,7 @@ public class SSLSender implements Sender
case NEED_UNWRAP:
flush();
- synchronized(engineState)
+ synchronized(_sslStatus.getSslLock())
{
switch (engine.getHandshakeStatus())
{
@@ -215,7 +219,7 @@ public class SSLSender implements Sender
long start = System.currentTimeMillis();
try
{
- engineState.wait(timeout);
+ _sslStatus.getSslLock().wait(timeout);
}
catch(InterruptedException e)
{
@@ -234,9 +238,9 @@ public class SSLSender implements Sender
break;
case FINISHED:
- if (this.settings != null && this.settings.isVerifyHostname() )
+ if (_hostname != null)
{
- SSLUtil.verifyHostname(engine, this.settings.getHost());
+ SSLUtil.verifyHostname(engine, _hostname);
}
case NOT_HANDSHAKING:
@@ -249,7 +253,7 @@ public class SSLSender implements Sender
}
}
- public void doTasks()
+ private void doTasks()
{
Runnable runnable;
while ((runnable = engine.getDelegatedTask()) != null) {
@@ -257,16 +261,6 @@ public class SSLSender implements Sender
}
}
- public Object getNotificationToken()
- {
- return engineState;
- }
-
- public void setErrorFlag()
- {
- error.set(true);
- }
-
public void setIdleTimeout(int i)
{
delegate.setIdleTimeout(i);
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java Mon Sep 19 15:13:18 2011
@@ -125,38 +125,6 @@ public class SSLUtil
return id.toString();
}
- public static SSLContext createSSLContext(ConnectionSettings settings) throws Exception
- {
- SSLContextFactory sslContextFactory;
-
- if (settings.getCertAlias() == null)
- {
- sslContextFactory =
- new SSLContextFactory(settings.getTrustStorePath(),
- settings.getTrustStorePassword(),
- settings.getTrustStoreCertType(),
- settings.getKeyStorePath(),
- settings.getKeyStorePassword(),
- settings.getKeyStoreCertType());
-
- } else
- {
- sslContextFactory =
- new SSLContextFactory(settings.getTrustStorePath(),
- settings.getTrustStorePassword(),
- settings.getTrustStoreCertType(),
- new QpidClientX509KeyManager(settings.getCertAlias(),
- settings.getKeyStorePath(),
- settings.getKeyStorePassword(),
- settings.getKeyStoreCertType()));
-
- log.debug("Using custom key manager");
- }
-
- return sslContextFactory.buildServerContext();
-
- }
-
public static KeyStore getInitializedKeyStore(String storePath, String storePassword) throws GeneralSecurityException, IOException
{
KeyStore ks = KeyStore.getInstance("JKS");
@@ -176,7 +144,10 @@ public class SSLUtil
{
throw new IOException("Unable to load keystore resource: " + storePath);
}
- ks.load(in, storePassword.toCharArray());
+
+ char[] storeCharPassword = storePassword == null ? null : storePassword.toCharArray();
+
+ ks.load(in, storeCharPassword);
}
finally
{
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java Mon Sep 19 15:13:18 2011
@@ -143,8 +143,9 @@ public class FileUtils
}
/**
- * Either opens the specified filename as an input stream, or uses the default resource loaded using the
- * specified class loader, if opening the file fails or no file name is specified.
+ * Either opens the specified filename as an input stream or either the filesystem or classpath,
+ * or uses the default resource loaded using the specified class loader, if opening the file fails
+ * or no file name is specified.
*
* @param filename The name of the file to open.
* @param defaultResource The name of the default resource on the classpath if the file cannot be opened.
@@ -156,28 +157,28 @@ public class FileUtils
{
InputStream is = null;
- // Flag to indicate whether the default resource should be used. By default this is true, so that the default
- // is used when opening the file fails.
- boolean useDefault = true;
-
// Try to open the file if one was specified.
if (filename != null)
{
+ // try on filesystem
try
{
is = new BufferedInputStream(new FileInputStream(new File(filename)));
-
- // Clear the default flag because the file was succesfully opened.
- useDefault = false;
}
catch (FileNotFoundException e)
{
- // Ignore this exception, the default will be used instead.
+ is = null;
+ }
+
+ if (is == null)
+ {
+ // failed on filesystem, so try on classpath
+ is = cl.getResourceAsStream(filename);
}
}
// Load the default resource if a file was not specified, or if opening the file failed.
- if (useDefault)
+ if (is == null)
{
is = cl.getResourceAsStream(defaultResource);
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Mon Sep 19 15:13:18 2011
@@ -21,6 +21,9 @@ package org.apache.qpid.codec;
*/
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -46,9 +49,16 @@ public class AMQDecoderTest extends Test
}
- public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException
+ private ByteBuffer getHeartbeatBodyBuffer() throws IOException
{
- ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ HeartbeatBody.FRAME.writePayload(new DataOutputStream(baos));
+ return ByteBuffer.wrap(baos.toByteArray());
+ }
+
+ public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException
+ {
+ ByteBuffer msg = getHeartbeatBodyBuffer();
ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg);
if (frames.get(0) instanceof AMQFrame)
{
@@ -60,9 +70,9 @@ public class AMQDecoderTest extends Test
}
}
- public void testPartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException
+ public void testPartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException
{
- ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer();
+ ByteBuffer msg = getHeartbeatBodyBuffer();
ByteBuffer msgA = msg.slice();
int msgbPos = msg.remaining() / 2;
int msgaLimit = msg.remaining() - msgbPos;
@@ -83,10 +93,10 @@ public class AMQDecoderTest extends Test
}
}
- public void testMultipleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException
+ public void testMultipleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException
{
- ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer();
- ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer();
+ ByteBuffer msgA = getHeartbeatBodyBuffer();
+ ByteBuffer msgB = getHeartbeatBodyBuffer();
ByteBuffer msg = ByteBuffer.allocate(msgA.remaining() + msgB.remaining());
msg.put(msgA);
msg.put(msgB);
@@ -106,11 +116,11 @@ public class AMQDecoderTest extends Test
}
}
- public void testMultiplePartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException
+ public void testMultiplePartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException
{
- ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer();
- ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer();
- ByteBuffer msgC = HeartbeatBody.FRAME.toNioByteBuffer();
+ ByteBuffer msgA = getHeartbeatBodyBuffer();
+ ByteBuffer msgB = getHeartbeatBodyBuffer();
+ ByteBuffer msgC = getHeartbeatBodyBuffer();
ByteBuffer sliceA = ByteBuffer.allocate(msgA.remaining() + msgB.remaining() / 2);
sliceA.put(msgA);
Modified: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java Mon Sep 19 15:13:18 2011
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
-
import junit.framework.TestCase;
+import java.io.*;
+
public class BasicContentHeaderPropertiesTest extends TestCase
{
@@ -76,15 +76,14 @@ public class BasicContentHeaderPropertie
assertEquals(99, _testProperties.getPropertyFlags());
}
- public void testWritePropertyListPayload()
+ public void testWritePropertyListPayload() throws IOException
{
- ByteBuffer buf = ByteBuffer.allocate(300);
- _testProperties.writePropertyListPayload(buf);
+ _testProperties.writePropertyListPayload(new DataOutputStream(new ByteArrayOutputStream(300)));
}
public void testPopulatePropertiesFromBuffer() throws Exception
{
- ByteBuffer buf = ByteBuffer.allocate(300);
+ DataInputStream buf = new DataInputStream(new ByteArrayInputStream(new byte[300]));
_testProperties.populatePropertiesFromBuffer(buf, 99, 99);
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java Mon Sep 19 15:13:18 2011
@@ -23,14 +23,14 @@ package org.apache.qpid.framing;
import junit.framework.Assert;
import junit.framework.TestCase;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQPInvalidClassException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.*;
+
public class PropertyFieldTableTest extends TestCase
{
private static final Logger _logger = LoggerFactory.getLogger(PropertyFieldTableTest.class);
@@ -441,7 +441,7 @@ public class PropertyFieldTableTest exte
}
/** Check that a nested field table parameter correctly encodes and decodes to a byte buffer. */
- public void testNestedFieldTable()
+ public void testNestedFieldTable() throws IOException
{
byte[] testBytes = new byte[] { 0, 1, 2, 3, 4, 5 };
@@ -465,14 +465,16 @@ public class PropertyFieldTableTest exte
outerTable.setFieldTable("innerTable", innerTable);
// Write the outer table into the buffer.
- final ByteBuffer buffer = ByteBuffer.allocate((int) outerTable.getEncodedSize() + 4);
- outerTable.writeToBuffer(buffer);
- buffer.flip();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ outerTable.writeToBuffer(new DataOutputStream(baos));
+
+ byte[] data = baos.toByteArray();
// Extract the table back from the buffer again.
try
{
- FieldTable extractedOuterTable = EncodingUtils.readFieldTable(buffer);
+ FieldTable extractedOuterTable = EncodingUtils.readFieldTable(new DataInputStream(new ByteArrayInputStream(data)));
FieldTable extractedTable = extractedOuterTable.getFieldTable("innerTable");
@@ -567,7 +569,7 @@ public class PropertyFieldTableTest exte
Assert.assertEquals("Hello", table.getObject("object-string"));
}
- public void testwriteBuffer()
+ public void testwriteBuffer() throws IOException
{
byte[] bytes = { 99, 98, 97, 96, 95 };
@@ -585,15 +587,17 @@ public class PropertyFieldTableTest exte
table.setString("string", "hello");
table.setString("null-string", null);
- final ByteBuffer buffer = ByteBuffer.allocate((int) table.getEncodedSize() + 4); // FIXME XXX: Is cast a problem?
- table.writeToBuffer(buffer);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream((int) table.getEncodedSize() + 4);
+ table.writeToBuffer(new DataOutputStream(baos));
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ DataInputStream dis = new DataInputStream(bais);
- buffer.flip();
- long length = buffer.getUnsignedInt();
+ long length = dis.readInt() & 0xFFFFFFFFL;
- FieldTable table2 = new FieldTable(buffer, length);
+ FieldTable table2 = new FieldTable(dis, length);
Assert.assertEquals((Boolean) true, table2.getBoolean("bool"));
Assert.assertEquals((Byte) Byte.MAX_VALUE, table2.getByte("byte"));
Modified: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java Mon Sep 19 15:13:18 2011
@@ -24,18 +24,28 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.net.DatagramSocket;
+import java.net.ServerSocket;
+import java.util.*;
import junit.framework.TestCase;
import junit.framework.TestResult;
+import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-import org.apache.mina.util.AvailablePortFinder;
+
public class QpidTestCase extends TestCase
{
- protected static final Logger _logger = Logger.getLogger(QpidTestCase.class);
+ public static final String QPID_HOME = System.getProperty("QPID_HOME");
+ public static final String TEST_RESOURCES_DIR = QPID_HOME + "/../test-profiles/test_resources/";
+
+ private static final Logger _logger = Logger.getLogger(QpidTestCase.class);
+
+ private final Map<Logger, Level> _loggerLevelSetForTest = new HashMap<Logger, Level>();
+ private final Map<String, String> _propertiesSetForTest = new HashMap<String, String>();
+
+ private String _testName;
/**
* Some tests are excluded when the property test.excludes is set to true.
@@ -129,8 +139,186 @@ public class QpidTestCase extends TestCa
return storeClass != null ? storeClass : MEMORY_STORE_CLASS_NAME ;
}
+
+ public static final int MIN_PORT_NUMBER = 1;
+ public static final int MAX_PORT_NUMBER = 49151;
+
+
+ /**
+ * Gets the next available port starting at a port.
+ *
+ * @param fromPort the port to scan for availability
+ * @throws NoSuchElementException if there are no ports available
+ */
+ protected int getNextAvailable(int fromPort)
+ {
+ if ((fromPort < MIN_PORT_NUMBER) || (fromPort > MAX_PORT_NUMBER))
+ {
+ throw new IllegalArgumentException("Invalid start port: " + fromPort);
+ }
+
+ for (int i = fromPort; i <= MAX_PORT_NUMBER; i++)
+ {
+ if (available(i)) {
+ return i;
+ }
+ }
+
+ throw new NoSuchElementException("Could not find an available port above " + fromPort);
+ }
+
+ /**
+ * Checks to see if a specific port is available.
+ *
+ * @param port the port to check for availability
+ */
+ private boolean available(int port)
+ {
+ if ((port < MIN_PORT_NUMBER) || (port > MAX_PORT_NUMBER))
+ {
+ throw new IllegalArgumentException("Invalid start port: " + port);
+ }
+
+ ServerSocket ss = null;
+ DatagramSocket ds = null;
+ try
+ {
+ ss = new ServerSocket(port);
+ ss.setReuseAddress(true);
+ ds = new DatagramSocket(port);
+ ds.setReuseAddress(true);
+ return true;
+ }
+ catch (IOException e)
+ {
+ }
+ finally
+ {
+ if (ds != null)
+ {
+ ds.close();
+ }
+
+ if (ss != null)
+ {
+ try
+ {
+ ss.close();
+ }
+ catch (IOException e)
+ {
+ /* should not be thrown */
+ }
+ }
+ }
+
+ return false;
+ }
+
public int findFreePort()
{
- return AvailablePortFinder.getNextAvailable(10000);
+ return getNextAvailable(10000);
+ }
+
+ /**
+ * Set a System property for duration of this test only. The tearDown will
+ * guarantee to reset the property to its previous value after the test
+ * completes.
+ *
+ * @param property The property to set
+ * @param value the value to set it to, if null, the property will be cleared
+ */
+ protected void setTestSystemProperty(final String property, final String value)
+ {
+ if (!_propertiesSetForTest.containsKey(property))
+ {
+ // Record the current value so we can revert it later.
+ _propertiesSetForTest.put(property, System.getProperty(property));
+ }
+
+ if (value == null)
+ {
+ System.clearProperty(property);
+ }
+ else
+ {
+ System.setProperty(property, value);
+ }
+ }
+
+ /**
+ * Restore the System property values that were set by this test run.
+ */
+ protected void revertTestSystemProperties()
+ {
+ if(!_propertiesSetForTest.isEmpty())
+ {
+ _logger.debug("reverting " + _propertiesSetForTest.size() + " test properties");
+ for (String key : _propertiesSetForTest.keySet())
+ {
+ String value = _propertiesSetForTest.get(key);
+ if (value != null)
+ {
+ System.setProperty(key, value);
+ }
+ else
+ {
+ System.clearProperty(key);
+ }
+ }
+
+ _propertiesSetForTest.clear();
+ }
+ }
+
+ /**
+ * Adjust the VMs Log4j Settings just for this test run
+ *
+ * @param logger the logger to change
+ * @param level the level to set
+ */
+ protected void setLoggerLevel(Logger logger, Level level)
+ {
+ assertNotNull("Cannot set level of null logger", logger);
+ assertNotNull("Cannot set Logger("+logger.getName()+") to null level.",level);
+
+ if (!_loggerLevelSetForTest.containsKey(logger))
+ {
+ // Record the current value so we can revert it later.
+ _loggerLevelSetForTest.put(logger, logger.getLevel());
+ }
+
+ logger.setLevel(level);
+ }
+
+ /**
+ * Restore the logging levels defined by this test.
+ */
+ protected void revertLoggingLevels()
+ {
+ for (Logger logger : _loggerLevelSetForTest.keySet())
+ {
+ logger.setLevel(_loggerLevelSetForTest.get(logger));
+ }
+
+ _loggerLevelSetForTest.clear();
+ }
+
+ protected void tearDown() throws java.lang.Exception
+ {
+ _logger.info("========== tearDown " + _testName + " ==========");
+ revertTestSystemProperties();
+ revertLoggingLevels();
+ }
+
+ protected void setUp() throws Exception
+ {
+ _testName = getClass().getSimpleName() + "." + getName();
+ _logger.info("========== start " + _testName + " ==========");
+ }
+
+ protected String getTestName()
+ {
+ return _testName;
}
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java Mon Sep 19 15:13:18 2011
@@ -20,32 +20,27 @@
*/
package org.apache.qpid.transport;
-import org.apache.mina.util.AvailablePortFinder;
-
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.transport.network.ConnectionBinding;
-import org.apache.qpid.transport.network.io.IoAcceptor;
-import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.transport.util.Waiter;
+import static org.apache.qpid.transport.Option.EXPECTED;
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+import java.io.IOException;
import java.util.ArrayList;
-import java.util.List;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.io.IOException;
-import static org.apache.qpid.transport.Option.*;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoAcceptor;
+import org.apache.qpid.transport.util.Waiter;
/**
* ConnectionTest
*/
-
public class ConnectionTest extends QpidTestCase implements SessionListener
{
-
- private static final Logger log = Logger.get(ConnectionTest.class);
-
private int port;
private volatile boolean queue = false;
private List<MessageTransfer> messages = new ArrayList<MessageTransfer>();
@@ -58,7 +53,7 @@ public class ConnectionTest extends Qpid
{
super.setUp();
- port = AvailablePortFinder.getNextAvailable(12000);
+ port = findFreePort();
}
protected void tearDown() throws Exception
@@ -158,7 +153,8 @@ public class ConnectionTest extends Qpid
private Connection connect(final CountDownLatch closed)
{
- Connection conn = new Connection();
+ final Connection conn = new Connection();
+ conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings()));
conn.addConnectionListener(new ConnectionListener()
{
public void opened(Connection conn) {}
@@ -182,9 +178,9 @@ public class ConnectionTest extends Qpid
{
// Force os.name to be windows to exercise code in IoReceiver
// that looks for the value of os.name
- System.setProperty("os.name","windows");
+ setTestSystemProperty("os.name","windows");
- // Start server as 0-9 to froce a ProtocolVersionException
+ // Start server as 0-9 to force a ProtocolVersionException
startServer(new ProtocolHeader(1, 0, 9));
CountDownLatch closed = new CountDownLatch(1);
@@ -219,7 +215,7 @@ public class ConnectionTest extends Qpid
conn.send(protocolHeader);
List<Object> utf8 = new ArrayList<Object>();
utf8.add("utf8");
- conn.connectionStart(null, Collections.EMPTY_LIST, utf8);
+ conn.connectionStart(null, Collections.emptyList(), utf8);
}
@Override
@@ -270,40 +266,7 @@ public class ConnectionTest extends Qpid
}
}
- class FailoverConnectionListener implements ConnectionListener
- {
- public void opened(Connection conn) {}
-
- public void exception(Connection conn, ConnectionException e)
- {
- throw e;
- }
-
- public void closed(Connection conn)
- {
- queue = true;
- conn.connect("localhost", port, null, "guest", "guest");
- conn.resume();
- }
- }
-
- class TestSessionListener implements SessionListener
- {
- public void opened(Session s) {}
- public void resumed(Session s) {}
- public void exception(Session s, SessionException e) {}
- public void message(Session s, MessageTransfer xfr)
- {
- synchronized (incoming)
- {
- incoming.add(xfr);
- incoming.notifyAll();
- }
- s.processed(xfr);
- }
- public void closed(Session s) {}
- }
public void testResumeNonemptyReplayBuffer() throws Exception
{
@@ -311,6 +274,7 @@ public class ConnectionTest extends Qpid
Connection conn = new Connection();
conn.addConnectionListener(new FailoverConnectionListener());
+ conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings()));
conn.connect("localhost", port, null, "guest", "guest");
Session ssn = conn.createSession(1);
ssn.setSessionListener(new TestSessionListener());
@@ -365,6 +329,7 @@ public class ConnectionTest extends Qpid
startServer();
Connection conn = new Connection();
+ conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings()));
conn.addConnectionListener(new FailoverConnectionListener());
conn.connect("localhost", port, null, "guest", "guest");
Session ssn = conn.createSession(1);
@@ -387,6 +352,7 @@ public class ConnectionTest extends Qpid
startServer();
Connection conn = new Connection();
+ conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings()));
conn.connect("localhost", port, null, "guest", "guest");
Session ssn = conn.createSession();
ssn.sessionFlush(EXPECTED);
@@ -400,6 +366,7 @@ public class ConnectionTest extends Qpid
{
startServer();
Connection conn = new Connection();
+ conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings()));
conn.connect("localhost", port, null, "guest", "guest");
conn.connectionHeartbeat();
conn.close();
@@ -410,6 +377,7 @@ public class ConnectionTest extends Qpid
startServer();
Connection conn = new Connection();
+ conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings()));
conn.connect("localhost", port, null, "guest", "guest");
Session ssn = conn.createSession();
send(ssn, "EXCP 0");
@@ -429,6 +397,7 @@ public class ConnectionTest extends Qpid
startServer();
Connection conn = new Connection();
+ conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings()));
conn.connect("localhost", port, null, "guest", "guest");
Session ssn = conn.createSession();
send(ssn, "EXCP 0", true);
@@ -443,4 +412,38 @@ public class ConnectionTest extends Qpid
}
}
+ class FailoverConnectionListener implements ConnectionListener
+ {
+ public void opened(Connection conn) {}
+
+ public void exception(Connection conn, ConnectionException e)
+ {
+ throw e;
+ }
+
+ public void closed(Connection conn)
+ {
+ queue = true;
+ conn.connect("localhost", port, null, "guest", "guest");
+ conn.resume();
+ }
+ }
+
+ class TestSessionListener implements SessionListener
+ {
+ public void opened(Session s) {}
+ public void resumed(Session s) {}
+ public void exception(Session s, SessionException e) {}
+ public void message(Session s, MessageTransfer xfr)
+ {
+ synchronized (incoming)
+ {
+ incoming.add(xfr);
+ incoming.notifyAll();
+ }
+
+ s.processed(xfr);
+ }
+ public void closed(Session s) {}
+ }
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java Mon Sep 19 15:13:18 2011
@@ -49,10 +49,12 @@ public class TestNetworkConnection imple
_sender = new MockSender();
}
+
+
public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
NetworkTransportConfiguration config, SSLContextFactory sslFactory) throws BindException
{
-
+
}
public SocketAddress getLocalAddress()
@@ -68,37 +70,37 @@ public class TestNetworkConnection imple
public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkTransportConfiguration config,
SSLContextFactory sslFactory) throws OpenException
{
-
+
}
public void setMaxReadIdle(int idleTime)
{
-
+
}
public void setMaxWriteIdle(int idleTime)
{
-
+
}
public void close()
{
-
+
}
public void flush()
{
-
+
}
public void send(ByteBuffer msg)
{
-
+
}
public void setIdleTimeout(int i)
{
-
+
}
public void setPort(int port)
@@ -135,4 +137,8 @@ public class TestNetworkConnection imple
{
return _sender;
}
+
+ public void start()
+ {
+ }
}
Copied: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java (from r1156187, qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java)
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java?p2=qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java&p1=qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java&r1=1156187&r2=1172657&rev=1172657&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java Mon Sep 19 15:13:18 2011
@@ -23,16 +23,16 @@ package org.apache.qpid.transport.networ
import java.nio.ByteBuffer;
+import javax.net.ssl.SSLContext;
+
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.io.IoNetworkTransport;
-import org.apache.qpid.transport.network.mina.MinaNetworkTransport;
public class TransportTest extends QpidTestCase
{
@@ -43,7 +43,7 @@ public class TransportTest extends QpidT
{
final OutgoingNetworkTransport networkTransport = Transport.getOutgoingTransportInstance(ProtocolVersion.v8_0);
assertNotNull(networkTransport);
- assertTrue(networkTransport instanceof MinaNetworkTransport);
+ assertTrue(networkTransport instanceof IoNetworkTransport);
}
public void testGloballyOverriddenOutgoingTransportForv0_8() throws Exception
@@ -75,7 +75,7 @@ public class TransportTest extends QpidT
{
final IncomingNetworkTransport networkTransport = Transport.getIncomingTransportInstance();
assertNotNull(networkTransport);
- assertTrue(networkTransport instanceof MinaNetworkTransport);
+ assertTrue(networkTransport instanceof IoNetworkTransport);
}
public void testOverriddenGetIncomingTransport() throws Exception
@@ -129,7 +129,7 @@ public class TransportTest extends QpidT
}
public NetworkConnection connect(ConnectionSettings settings,
- Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory)
+ Receiver<ByteBuffer> delegate, SSLContext sslContext)
{
throw new UnsupportedOperationException();
}
@@ -149,7 +149,7 @@ public class TransportTest extends QpidT
}
public void accept(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory, SSLContextFactory sslFactory)
+ ProtocolEngineFactory factory, SSLContext sslContext)
{
throw new UnsupportedOperationException();
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java Mon Sep 19 15:13:18 2011
@@ -80,7 +80,7 @@ public class IoAcceptor<E> extends Threa
try
{
Socket sock = socket.accept();
- IoTransport<E> transport = new IoTransport<E>(sock, binding,false);
+ IoTransport<E> transport = new IoTransport<E>(sock, binding);
}
catch (IOException e)
{
Copied: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java (from r1156187, qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java)
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java?p2=qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java&p1=qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java&r1=1156187&r2=1172657&rev=1172657&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java Mon Sep 19 15:13:18 2011
@@ -22,15 +22,8 @@ package org.apache.qpid.transport.networ
import java.net.Socket;
import java.nio.ByteBuffer;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
-import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.Binding;
import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
-import org.apache.qpid.transport.network.security.ssl.SSLSender;
import org.apache.qpid.transport.util.Logger;
/**
@@ -45,13 +38,6 @@ import org.apache.qpid.transport.util.Lo
public final class IoTransport<E>
{
- static
- {
- org.apache.mina.common.ByteBuffer.setAllocator
- (new org.apache.mina.common.SimpleByteBufferAllocator());
- org.apache.mina.common.ByteBuffer.setUseDirectBuffers
- (Boolean.getBoolean("amqj.enableDirectBuffers"));
- }
private static final Logger log = Logger.get(IoTransport.class);
@@ -67,18 +53,10 @@ public final class IoTransport<E>
private IoReceiver receiver;
private long timeout = 60000;
- IoTransport(Socket socket, Binding<E,ByteBuffer> binding, boolean ssl)
+ IoTransport(Socket socket, Binding<E,ByteBuffer> binding)
{
this.socket = socket;
-
- if (ssl)
- {
- setupSSLTransport(socket, binding);
- }
- else
- {
- setupTransport(socket, binding);
- }
+ setupTransport(socket, binding);
}
private void setupTransport(Socket socket, Binding<E, ByteBuffer> binding)
@@ -95,40 +73,6 @@ public final class IoTransport<E>
ios.registerCloseListener(this.receiver);
}
- private void setupSSLTransport(Socket socket, Binding<E, ByteBuffer> binding)
- {
- SSLEngine engine = null;
- SSLContext sslCtx;
- try
- {
- sslCtx = createSSLContext();
- }
- catch (Exception e)
- {
- throw new TransportException("Error creating SSL Context", e);
- }
-
- try
- {
- engine = sslCtx.createSSLEngine();
- engine.setUseClientMode(true);
- }
- catch(Exception e)
- {
- throw new TransportException("Error creating SSL Engine", e);
- }
- IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout);
- ios.initiate();
- this.sender = new SSLSender(engine,ios);
- this.endpoint = binding.endpoint(sender);
- this.receiver = new IoReceiver(socket, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender),
- 2*readBufferSize, timeout);
- this.receiver.initiate();
- ios.registerCloseListener(this.receiver);
-
- log.info("SSL Sender and Receiver initiated");
- }
-
public Sender<ByteBuffer> getSender()
{
return sender;
@@ -144,22 +88,4 @@ public final class IoTransport<E>
return socket;
}
- private SSLContext createSSLContext() throws Exception
- {
- String trustStorePath = System.getProperty("javax.net.ssl.trustStore");
- String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");
- String trustStoreCertType = System.getProperty("qpid.ssl.trustStoreCertType","SunX509");
-
- String keyStorePath = System.getProperty("javax.net.ssl.keyStore",trustStorePath);
- String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword",trustStorePassword);
- String keyStoreCertType = System.getProperty("qpid.ssl.keyStoreCertType","SunX509");
-
- SSLContextFactory sslContextFactory = new SSLContextFactory(trustStorePath,trustStorePassword,
- trustStoreCertType,keyStorePath,
- keyStorePassword,keyStoreCertType);
-
- return sslContextFactory.buildServerContext();
-
- }
-
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java Mon Sep 19 15:13:18 2011
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.NetworkTransportConfiguration;
@@ -333,7 +334,7 @@ public class MinaNetworkHandlerTest exte
}
}
- public class CountingProtocolEngine implements ProtocolEngine
+ public class CountingProtocolEngine implements ServerProtocolEngine
{
public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>();
private int _readBytes;
@@ -447,6 +448,11 @@ public class MinaNetworkHandlerTest exte
return _closed;
}
+ public long getConnectionId()
+ {
+ return -1;
+ }
+
}
private class EchoProtocolEngine extends CountingProtocolEngine
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org