You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/20 20:43:26 UTC
svn commit: r1186990 [35/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java Thu Oct 20 18:42:46 2011
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
public class HeartbeatBodyFactory implements BodyFactory
{
- public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+ public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException
{
return new HeartbeatBody();
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Thu Oct 20 18:42:46 2011
@@ -22,6 +22,10 @@ package org.apache.qpid.framing;
import org.apache.qpid.AMQException;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -62,35 +66,30 @@ public class ProtocolInitiation extends
pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion());
}
- public ProtocolInitiation(ByteBuffer in)
+ public ProtocolInitiation(DataInputStream in) throws IOException
{
_protocolHeader = new byte[4];
- in.get(_protocolHeader);
+ in.read(_protocolHeader);
- _protocolClass = in.get();
- _protocolInstance = in.get();
- _protocolMajor = in.get();
- _protocolMinor = in.get();
+ _protocolClass = in.readByte();
+ _protocolInstance = in.readByte();
+ _protocolMajor = in.readByte();
+ _protocolMinor = in.readByte();
}
- public void writePayload(org.apache.mina.common.ByteBuffer buffer)
- {
- writePayload(buffer.buf());
- }
-
public long getSize()
{
return 4 + 1 + 1 + 1 + 1;
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
- buffer.put(_protocolHeader);
- buffer.put(_protocolClass);
- buffer.put(_protocolInstance);
- buffer.put(_protocolMajor);
- buffer.put(_protocolMinor);
+ buffer.write(_protocolHeader);
+ buffer.write(_protocolClass);
+ buffer.write(_protocolInstance);
+ buffer.write(_protocolMajor);
+ buffer.write(_protocolMinor);
}
public boolean equals(Object o)
@@ -144,9 +143,9 @@ public class ProtocolInitiation extends
* @return true if we have enough data to decode the PI frame fully, false if more
* data is required
*/
- public boolean decodable(ByteBuffer in)
+ public boolean decodable(DataInputStream in) throws IOException
{
- return (in.remaining() >= 8);
+ return (in.available() >= 8);
}
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java Thu Oct 20 18:42:46 2011
@@ -21,7 +21,8 @@
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataOutputStream;
+import java.io.IOException;
public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -68,7 +69,7 @@ public class SmallCompositeAMQDataBlock
return frameSize;
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
if (_firstFrame != null)
{
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java Thu Oct 20 18:42:46 2011
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.IOException;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -144,7 +145,7 @@ public class VersionSpecificRegistry
}
- public AMQMethodBody get(short classID, short methodID, ByteBuffer in, long size) throws AMQFrameDecodingException
+ public AMQMethodBody get(short classID, short methodID, DataInputStream in, long size) throws AMQFrameDecodingException, IOException
{
AMQMethodBodyInstanceFactory bodyFactory;
try
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java Thu Oct 20 18:42:46 2011
@@ -21,12 +21,10 @@
package org.apache.qpid.framing.abstraction;
-import org.apache.mina.common.ByteBuffer;
-
public interface ContentChunk
{
int getSize();
- ByteBuffer getData();
+ byte[] getData();
void reduceToFit();
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java Thu Oct 20 18:42:46 2011
@@ -23,8 +23,6 @@ package org.apache.qpid.framing.abstract
import org.apache.qpid.framing.AMQBody;
-import java.nio.ByteBuffer;
-
public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
{
AMQBody convertToBody(ContentChunk contentBody);
@@ -32,5 +30,5 @@ public interface ProtocolVersionMethodCo
void configure();
- AMQBody convertToBody(ByteBuffer buf);
+ AMQBody convertToBody(byte[] input);
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java Thu Oct 20 18:42:46 2011
@@ -21,16 +21,13 @@
package org.apache.qpid.framing.amqp_0_9;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.*;
-import org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl;
+
public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
{
@@ -72,9 +69,9 @@ public class MethodConverter_0_9 extends
}
- public AMQBody convertToBody(java.nio.ByteBuffer buf)
+ public AMQBody convertToBody(byte[] data)
{
- return new ContentBody(ByteBuffer.wrap(buf));
+ return new ContentBody(data);
}
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
@@ -116,9 +113,9 @@ public class MethodConverter_0_9 extends
return _contentBodyChunk.getSize();
}
- public ByteBuffer getData()
+ public byte[] getData()
{
- return _contentBodyChunk.payload;
+ return _contentBodyChunk._payload;
}
public void reduceToFit()
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java Thu Oct 20 18:42:46 2011
@@ -21,8 +21,6 @@
package org.apache.qpid.framing.amqp_0_91;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.framing.abstraction.ContentChunk;
@@ -70,9 +68,9 @@ public class MethodConverter_0_91 extend
}
- public AMQBody convertToBody(java.nio.ByteBuffer buf)
+ public AMQBody convertToBody(byte[] data)
{
- return new ContentBody(ByteBuffer.wrap(buf));
+ return new ContentBody(data);
}
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
@@ -114,9 +112,9 @@ public class MethodConverter_0_91 extend
return _contentBodyChunk.getSize();
}
- public ByteBuffer getData()
+ public byte[] getData()
{
- return _contentBodyChunk.payload;
+ return _contentBodyChunk._payload;
}
public void reduceToFit()
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java Thu Oct 20 18:42:46 2011
@@ -26,11 +26,8 @@ import org.apache.qpid.framing.abstracti
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl;
import org.apache.qpid.framing.*;
-import org.apache.mina.common.ByteBuffer;
-
public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
{
private int _basicPublishClassId;
@@ -60,9 +57,9 @@ public class MethodConverter_8_0 extends
return contentBodyChunk.getSize();
}
- public ByteBuffer getData()
+ public byte[] getData()
{
- return contentBodyChunk.payload;
+ return contentBodyChunk._payload;
}
public void reduceToFit()
@@ -81,9 +78,9 @@ public class MethodConverter_8_0 extends
}
- public AMQBody convertToBody(java.nio.ByteBuffer buf)
+ public AMQBody convertToBody(byte[] data)
{
- return new ContentBody(ByteBuffer.wrap(buf));
+ return new ContentBody(data);
}
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java Thu Oct 20 18:42:46 2011
@@ -80,7 +80,7 @@ public final class AMQConstant
/**
* An operator intervened to close the connection for some reason. The client may retry at some later date.
*/
- public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true);
+ public static final AMQConstant CONNECTION_FORCED = new AMQConstant(320, "connection forced", true);
/** The client tried to work with an unknown virtual host or cluster. */
public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true);
@@ -104,7 +104,7 @@ public final class AMQConstant
public static final AMQConstant REQUEST_TIMEOUT = new AMQConstant(408, "Request Timeout", true);
- public static final AMQConstant INVALID_ARGUMENT = new AMQConstant(409, "argument invalid", true);
+ public static final AMQConstant ARGUMENT_INVALID = new AMQConstant(409, "argument invalid", true);
/**
* The client sent a malformed frame that the server could not decode. This strongly implies a programming error
@@ -153,10 +153,7 @@ public final class AMQConstant
public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true);
- /**
- * The server does not support the protocol version
- */
- public static final AMQConstant UNSUPPORTED_BROKER_PROTOCOL_ERROR = new AMQConstant(542, "broker unsupported protocol", true);
+ public static final AMQConstant INVALID_ARGUMENT = new AMQConstant(542, "invalid argument", true);
/**
* The client imp does not support the protocol version
*/
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java Thu Oct 20 18:42:46 2011
@@ -21,10 +21,11 @@
package org.apache.qpid.protocol;
import java.net.SocketAddress;
+import java.nio.ByteBuffer;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
/**
* A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
@@ -32,9 +33,6 @@ import org.apache.qpid.transport.Receive
*/
public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
{
- // Sets the network driver providing data for this ProtocolEngine
- void setNetworkDriver (NetworkDriver driver);
-
// Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
@@ -58,4 +56,6 @@ public interface ProtocolEngine extends
void readerIdle();
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
+
}
\ No newline at end of file
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java Thu Oct 20 18:42:46 2011
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.protocol;
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.NetworkConnection;
public interface ProtocolEngineFactory
{
// Returns a new instance of a ProtocolEngine
- ProtocolEngine newProtocolEngine(NetworkDriver networkDriver);
+ ProtocolEngine newProtocolEngine();
}
\ No newline at end of file
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java Thu Oct 20 18:42:46 2011
@@ -20,18 +20,17 @@
*/
package org.apache.qpid.ssl;
-import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
+import org.apache.qpid.transport.network.security.ssl.QpidClientX509KeyManager;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
/**
@@ -39,157 +38,92 @@ import org.apache.qpid.transport.network
* before this will work.
*
*/
-public class SSLContextFactory {
-
- /**
- * Path to the Java keystore file
- */
- private String _keyStorePath;
-
- /**
- * Password for the keystore
- */
- private String _keyStorePassword;
-
- /**
- * Cert type to use in keystore
- */
- private String _keyStoreCertType;
-
- /**
- * Path to the Java truststore file
- */
- private String _trustStorePath;
-
- /**
- * Password for the truststore
- */
- private String _trustStorePassword;
-
- /**
- * Cert type to use in truststore
- */
- private String _trustStoreCertType;
-
- private KeyManager customKeyManager;
-
- public SSLContextFactory(String trustStorePath, String trustStorePassword,
- String trustStoreCertType)
+public class SSLContextFactory
+{
+ public static final String JAVA_KEY_STORE_CODE = "JKS";
+ public static final String TRANSPORT_LAYER_SECURITY_CODE = "TLS";
+ public static final String KEY_STORE_CERTIFICATE_TYPE = "SunX509";
+
+ private SSLContextFactory()
{
- this(trustStorePath,trustStorePassword,trustStoreCertType,
- trustStorePath,trustStorePassword,trustStoreCertType);
+ //no instances
}
- /**
- * Create a factory instance
- * @param keystorePath path to the Java keystore file
- * @param keystorePassword password for the Java keystore
- * @param certType certificate type
- */
- public SSLContextFactory(String trustStorePath, String trustStorePassword, String trustStoreCertType,
- String keyStorePath, String keyStorePassword, String keyStoreCertType)
- {
-
- _trustStorePath = trustStorePath;
- _trustStorePassword = trustStorePassword;
-
- if (_trustStorePassword != null && _trustStorePassword.equals("none"))
- {
- _trustStorePassword = null;
- }
- _trustStoreCertType = trustStoreCertType;
-
- _keyStorePath = keyStorePath;
- _keyStorePassword = keyStorePassword;
-
- if (_keyStorePassword != null && _keyStorePassword.equals("none"))
- {
- _keyStorePassword = null;
- }
- _keyStoreCertType = keyStoreCertType;
-
- if (_trustStorePath == null) {
- throw new IllegalArgumentException("A TrustStore path or KeyStore path must be specified");
- }
- if (_trustStoreCertType == null) {
- throw new IllegalArgumentException("Cert type must be specified");
- }
- }
-
- public SSLContextFactory(String trustStorePath, String trustStorePassword, String trustStoreCertType,
- KeyManager customKeyManager)
+ public static SSLContext buildServerContext(final String keyStorePath,
+ final String keyStorePassword, final String keyStoreCertType)
+ throws GeneralSecurityException, IOException
{
+ return buildContext(null, null, null, keyStorePath, keyStorePassword,
+ keyStoreCertType, null);
+ }
- _trustStorePath = trustStorePath;
- _trustStorePassword = trustStorePassword;
-
- if (_trustStorePassword != null && _trustStorePassword.equals("none"))
- {
- _trustStorePassword = null;
- }
- _trustStoreCertType = trustStoreCertType;
-
- if (_trustStorePath == null) {
- throw new IllegalArgumentException("A TrustStore path or KeyStore path must be specified");
- }
- if (_trustStoreCertType == null) {
- throw new IllegalArgumentException("Cert type must be specified");
- }
-
- this.customKeyManager = customKeyManager;
+ public static SSLContext buildClientContext(final String trustStorePath,
+ final String trustStorePassword, final String trustStoreCertType,
+ final String keyStorePath, final String keyStorePassword,
+ final String keyStoreCertType, final String certAlias)
+ throws GeneralSecurityException, IOException
+ {
+ return buildContext(trustStorePath, trustStorePassword,
+ trustStoreCertType, keyStorePath, keyStorePassword,
+ keyStoreCertType, certAlias);
}
-
-
- /**
- * Builds a SSLContext appropriate for use with a server
- * @return SSLContext
- * @throws GeneralSecurityException
- * @throws IOException
- */
-
- public SSLContext buildServerContext() throws GeneralSecurityException, IOException
- {
- KeyStore ts = SSLUtil.getInitializedKeyStore(_trustStorePath,_trustStorePassword);
- TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType);
- tmf.init(ts);
-
+
+ private static SSLContext buildContext(final String trustStorePath,
+ final String trustStorePassword, final String trustStoreCertType,
+ final String keyStorePath, final String keyStorePassword,
+ final String keyStoreCertType, final String certAlias)
+ throws GeneralSecurityException, IOException
+ {
// Initialize the SSLContext to work with our key managers.
- SSLContext sslContext = SSLContext.getInstance("TLS");
-
- if (customKeyManager != null)
+ final SSLContext sslContext = SSLContext
+ .getInstance(TRANSPORT_LAYER_SECURITY_CODE);
+
+ final TrustManager[] trustManagers;
+ final KeyManager[] keyManagers;
+
+ if (trustStorePath != null)
{
- sslContext.init(new KeyManager[]{customKeyManager},
- tmf.getTrustManagers(), null);
-
+ final KeyStore ts = SSLUtil.getInitializedKeyStore(trustStorePath,
+ trustStorePassword);
+ final TrustManagerFactory tmf = TrustManagerFactory
+ .getInstance(trustStoreCertType);
+ tmf.init(ts);
+
+ trustManagers = tmf.getTrustManagers();
}
else
{
- // Create keystore
- KeyStore ks = SSLUtil.getInitializedKeyStore(_keyStorePath,_keyStorePassword);
- // Set up key manager factory to use our key store
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(_keyStoreCertType);
- kmf.init(ks, _keyStorePassword.toCharArray());
+ trustManagers = null;
+ }
- sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+ if (keyStorePath != null)
+ {
+ if (certAlias != null)
+ {
+ keyManagers = new KeyManager[] { new QpidClientX509KeyManager(
+ certAlias, keyStorePath, keyStorePassword,
+ keyStoreCertType) };
+ }
+ else
+ {
+ final KeyStore ks = SSLUtil.getInitializedKeyStore(
+ keyStorePath, keyStorePassword);
+
+ char[] keyStoreCharPassword = keyStorePassword == null ? null : keyStorePassword.toCharArray();
+ // Set up key manager factory to use our key store
+ final KeyManagerFactory kmf = KeyManagerFactory
+ .getInstance(keyStoreCertType);
+ kmf.init(ks, keyStoreCharPassword);
+ keyManagers = kmf.getKeyManagers();
+ }
}
-
- return sslContext;
- }
-
- /**
- * Creates a SSLContext factory appropriate for use with a client
- * @return SSLContext
- * @throws GeneralSecurityException
- * @throws IOException
- */
- public SSLContext buildClientContext() throws GeneralSecurityException, IOException
- {
- KeyStore ks = SSLUtil.getInitializedKeyStore(_trustStorePath,_trustStorePassword);
- TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType);
- tmf.init(ks);
- SSLContext context = SSLContext.getInstance("TLS");
- context.init(null, tmf.getTrustManagers(), null);
- return context;
- }
-
+ else
+ {
+ keyManagers = null;
+ }
+
+ sslContext.init(keyManagers, trustManagers, null);
+
+ return sslContext;
+ }
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java Thu Oct 20 18:42:46 2011
@@ -23,7 +23,7 @@ package org.apache.qpid.thread;
import org.apache.qpid.thread.Threading;
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import java.util.concurrent.Executor;
public class QpidThreadExecutor implements Executor
{
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Thu Oct 20 18:42:46 2011
@@ -20,28 +20,20 @@
*/
package org.apache.qpid.transport;
-import org.ietf.jgss.GSSContext;
-import org.ietf.jgss.GSSException;
-import org.ietf.jgss.GSSManager;
-import org.ietf.jgss.GSSName;
-import org.ietf.jgss.Oid;
-
-import org.apache.qpid.security.UsernamePasswordCallbackHandler;
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.RESUMING;
-import org.apache.qpid.transport.util.Logger;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.transport.util.Logger;
+
/**
* ClientDelegate
@@ -52,31 +44,13 @@ public class ClientDelegate extends Conn
{
private static final Logger log = Logger.get(ClientDelegate.class);
- private static final String KRB5_OID_STR = "1.2.840.113554.1.2.2";
- protected static final Oid KRB5_OID;
- static
- {
- Oid oid;
- try
- {
- oid = new Oid(KRB5_OID_STR);
- }
- catch (GSSException ignore)
- {
- oid = null;
- }
- KRB5_OID = oid;
- }
-
- private List<String> clientMechs;
- private ConnectionSettings conSettings;
+ protected final ConnectionSettings _conSettings;
public ClientDelegate(ConnectionSettings settings)
{
- this.conSettings = settings;
- this.clientMechs = Arrays.asList(settings.getSaslMechs().split(" "));
+ this._conSettings = settings;
}
public void init(Connection conn, ProtocolHeader hdr)
@@ -92,9 +66,9 @@ public class ClientDelegate extends Conn
{
Map<String,Object> clientProperties = new HashMap<String,Object>();
- if(this.conSettings.getClientProperties() != null)
+ if(this._conSettings.getClientProperties() != null)
{
- clientProperties.putAll(this.conSettings.getClientProperties());
+ clientProperties.putAll(_conSettings.getClientProperties());
}
clientProperties.put("qpid.session_flow", 1);
@@ -109,41 +83,12 @@ public class ClientDelegate extends Conn
(clientProperties, null, null, conn.getLocale());
return;
}
-
- List<String> choosenMechs = new ArrayList<String>();
- for (String mech:clientMechs)
- {
- if (brokerMechs.contains(mech))
- {
- choosenMechs.add(mech);
- }
- }
-
- if (choosenMechs.size() == 0)
- {
- conn.exception(new ConnectionException("The following SASL mechanisms " +
- clientMechs.toString() +
- " specified by the client are not supported by the broker"));
- return;
- }
-
- String[] mechs = new String[choosenMechs.size()];
- choosenMechs.toArray(mechs);
-
conn.setServerProperties(start.getServerProperties());
try
{
- Map<String,Object> saslProps = new HashMap<String,Object>();
- if (conSettings.isUseSASLEncryption())
- {
- saslProps.put(Sasl.QOP, "auth-conf");
- }
- UsernamePasswordCallbackHandler handler =
- new UsernamePasswordCallbackHandler();
- handler.initialise(conSettings.getUsername(), conSettings.getPassword());
- SaslClient sc = Sasl.createSaslClient
- (mechs, null, conSettings.getSaslProtocol(), conSettings.getSaslServerName(), saslProps, handler);
+ final SaslClient sc = createSaslClient(brokerMechs);
+
conn.setSaslClient(sc);
byte[] response = sc.hasInitialResponse() ?
@@ -152,12 +97,22 @@ public class ClientDelegate extends Conn
(clientProperties, sc.getMechanismName(), response,
conn.getLocale());
}
+ catch (ConnectionException ce)
+ {
+ conn.exception(ce);
+ }
catch (SaslException e)
{
conn.exception(e);
}
}
+
+ protected SaslClient createSaslClient(List<Object> brokerMechs) throws ConnectionException, SaslException
+ {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void connectionSecure(Connection conn, ConnectionSecure secure)
{
@@ -176,7 +131,7 @@ public class ClientDelegate extends Conn
@Override
public void connectionTune(Connection conn, ConnectionTune tune)
{
- int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(),
+ int hb_interval = calculateHeartbeatInterval(_conSettings.getHeartbeatInterval(),
tune.getHeartbeatMin(),
tune.getHeartbeatMax()
);
@@ -191,32 +146,12 @@ public class ClientDelegate extends Conn
//(or that forced by protocol limitations [0xFFFF])
conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax);
- conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST);
+ conn.connectionOpen(_conSettings.getVhost(), null, Option.INSIST);
}
@Override
public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
{
- SaslClient sc = conn.getSaslClient();
- if (sc != null)
- {
- if (sc.getMechanismName().equals("GSSAPI"))
- {
- String id = getKerberosUser();
- if (id != null)
- {
- conn.setUserID(id);
- }
- }
- else if (sc.getMechanismName().equals("EXTERNAL"))
- {
- if (conn.getSecurityLayer() != null)
- {
- conn.setUserID(conn.getSecurityLayer().getUserID());
- }
- }
- }
-
if (conn.isConnectionResuming())
{
conn.setState(RESUMING);
@@ -247,7 +182,7 @@ public class ClientDelegate extends Conn
int i = heartbeat;
if (i == 0)
{
- log.warn("Idle timeout is zero. Heartbeats are disabled");
+ log.info("Idle timeout is 0 sec. Heartbeats are disabled.");
return 0; // heartbeats are disabled.
}
else if (i >= min && i <= max)
@@ -256,8 +191,8 @@ public class ClientDelegate extends Conn
}
else
{
- log.warn("Ignoring the idle timeout %s set by the connection," +
- " using the brokers max value %s", i,max);
+ log.info("The broker does not support the configured connection idle timeout of %s sec," +
+ " using the brokers max supported value of %s sec instead.", i,max);
return max;
}
}
@@ -286,35 +221,7 @@ public class ClientDelegate extends Conn
}
- private String getKerberosUser()
- {
- log.debug("Obtaining userID from kerberos");
- String service = conSettings.getSaslProtocol() + "@" + conSettings.getSaslServerName();
- GSSManager manager = GSSManager.getInstance();
-
- try
- {
- GSSName acceptorName = manager.createName(service,
- GSSName.NT_HOSTBASED_SERVICE, KRB5_OID);
-
- GSSContext secCtx = manager.createContext(acceptorName,
- KRB5_OID,
- null,
- GSSContext.INDEFINITE_LIFETIME);
- secCtx.initSecContext(new byte[0], 0, 1);
- if (secCtx.getSrcName() != null)
- {
- return secCtx.getSrcName().toString();
- }
- }
- catch (GSSException e)
- {
- log.warn("Unable to retrieve userID from Kerberos due to error",e);
- }
-
- return null;
- }
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Connection.java Thu Oct 20 18:42:46 2011
@@ -25,21 +25,29 @@ import static org.apache.qpid.transport.
import static org.apache.qpid.transport.Connection.State.NEW;
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
-import static org.apache.qpid.transport.Connection.State.RESUMING;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslServer;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
import org.apache.qpid.transport.network.security.SecurityLayer;
+import org.apache.qpid.transport.network.security.SecurityLayerFactory;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
import org.apache.qpid.util.Strings;
@@ -65,6 +73,7 @@ public class Connection extends Connecti
public static final int MAX_CHANNEL_MAX = 0xFFFF;
public static final int MIN_USABLE_CHANNEL_NUM = 0;
+
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
static class DefaultConnectionListener implements ConnectionListener
@@ -112,17 +121,14 @@ public class Connection extends Connecti
private SaslServer saslServer;
private SaslClient saslClient;
private int idleTimeout = 0;
- private String _authorizationID;
private Map<String,Object> _serverProperties;
private String userID;
private ConnectionSettings conSettings;
private SecurityLayer securityLayer;
private String _clientId;
-
- private static final AtomicLong idGenerator = new AtomicLong(0);
- private final long _connectionId = idGenerator.incrementAndGet();
+
private final AtomicBoolean connectionLost = new AtomicBoolean(false);
-
+
public Connection() {}
public void setConnectionDelegate(ConnectionDelegate delegate)
@@ -233,14 +239,24 @@ public class Connection extends Connecti
conSettings = settings;
state = OPENING;
userID = settings.getUsername();
- delegate = new ClientDelegate(settings);
-
- TransportBuilder transport = new TransportBuilder();
- transport.init(this);
- this.sender = transport.buildSenderPipe();
- transport.buildReceiverPipe(this);
- this.securityLayer = transport.getSecurityLayer();
-
+
+ securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings());
+
+ OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
+ Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(new InputHandler(new Assembler(this)));
+ if(secureReceiver instanceof ConnectionListener)
+ {
+ addConnectionListener((ConnectionListener)secureReceiver);
+ }
+
+ NetworkConnection network = transport.connect(settings, secureReceiver, null);
+ final Sender<ByteBuffer> secureSender = securityLayer.sender(network.getSender());
+ if(secureSender instanceof ConnectionListener)
+ {
+ addConnectionListener((ConnectionListener)secureSender);
+ }
+ sender = new Disassembler(secureSender, settings.getMaxFrameSize());
+
send(new ProtocolHeader(1, 0, 10));
Waiter w = new Waiter(lock, timeout);
@@ -321,23 +337,31 @@ public class Connection extends Connecti
Waiter w = new Waiter(lock, timeout);
while (w.hasTime() && state != OPEN && error == null)
{
- w.await();
+ w.await();
}
-
+
if (state != OPEN)
{
throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state);
}
-
+
Session ssn = _sessionFactory.newSession(this, name, expiry);
- sessions.put(name, ssn);
+ registerSession(ssn);
map(ssn);
ssn.attach();
return ssn;
}
}
- void removeSession(Session ssn)
+ public void registerSession(Session ssn)
+ {
+ synchronized (lock)
+ {
+ sessions.put(ssn.getName(),ssn);
+ }
+ }
+
+ public void removeSession(Session ssn)
{
synchronized (lock)
{
@@ -352,11 +376,6 @@ public class Connection extends Connecti
_sessionFactory = sessionFactory;
}
- public long getConnectionId()
- {
- return _connectionId;
- }
-
public ConnectionDelegate getConnectionDelegate()
{
return delegate;
@@ -405,7 +424,7 @@ public class Connection extends Connecti
else
{
throw new ProtocolViolationException(
- "Received frames for an already dettached session", null);
+ "Received frames for an already detached session", null);
}
}
@@ -454,7 +473,7 @@ public class Connection extends Connecti
}
}
- protected Session getSession(int channel)
+ public Session getSession(int channel)
{
synchronized (lock)
{
@@ -468,18 +487,10 @@ public class Connection extends Connecti
{
for (Session ssn : sessions.values())
{
- if (ssn.isTransacted())
- {
- removeSession(ssn);
- ssn.setState(Session.State.CLOSED);
- }
- else
- {
- map(ssn);
- ssn.attach();
- ssn.resume();
- }
+ map(ssn);
+ ssn.resume();
}
+
setState(OPEN);
}
}
@@ -566,12 +577,12 @@ public class Connection extends Connecti
{
close(ConnectionCloseCode.NORMAL, null);
}
-
+
public void mgmtClose()
{
close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface.");
}
-
+
public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options)
{
synchronized (lock)
@@ -645,16 +656,6 @@ public class Connection extends Connecti
return idleTimeout;
}
- public void setAuthorizationID(String authorizationID)
- {
- _authorizationID = authorizationID;
- }
-
- public String getAuthorizationID()
- {
- return _authorizationID;
- }
-
public String getUserID()
{
return userID;
@@ -684,15 +685,24 @@ public class Connection extends Connecti
{
return conSettings;
}
-
+
public SecurityLayer getSecurityLayer()
{
return securityLayer;
}
-
+
public boolean isConnectionResuming()
{
return connectionLost.get();
}
+ protected Collection<Session> getChannels()
+ {
+ return channels.values();
+ }
+
+ public boolean hasSessionWithName(final String name)
+ {
+ return sessions.containsKey(new Binary(name.getBytes()));
+ }
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java Thu Oct 20 18:42:46 2011
@@ -85,7 +85,7 @@ public abstract class ConnectionDelegate
@Override public void sessionDetach(Connection conn, SessionDetach dtc)
{
Session ssn = conn.getSession(dtc.getChannel());
- ssn.sessionDetached(dtc.getName(), SessionDetachCode.NORMAL);
+ ssn.sessionDetached(dtc.getName(), ssn.getDetachCode() == null? SessionDetachCode.NORMAL: ssn.getDetachCode());
conn.unmap(ssn);
ssn.closed();
}
@@ -95,6 +95,7 @@ public abstract class ConnectionDelegate
Session ssn = conn.getSession(dtc.getChannel());
if (ssn != null)
{
+ ssn.setDetachCode(dtc.getCode());
conn.unmap(ssn);
ssn.closed();
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java Thu Oct 20 18:42:46 2011
@@ -30,6 +30,8 @@ import java.util.Map;
*/
public class ConnectionSettings
{
+ public static final String WILDCARD_ADDRESS = "*";
+
String protocol = "tcp";
String host = "localhost";
String vhost;
@@ -56,7 +58,7 @@ public class ConnectionSettings
boolean verifyHostname;
// SASL props
- String saslMechs = System.getProperty("qpid.sasl_mechs", "PLAIN");
+ String saslMechs = System.getProperty("qpid.sasl_mechs", null);
String saslProtocol = System.getProperty("qpid.sasl_protocol", "AMQP");
String saslServerName = System.getProperty("qpid.sasl_server_name", "localhost");
boolean useSASLEncryption;
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Thu Oct 20 18:42:46 2011
@@ -75,10 +75,7 @@ public class ServerDelegate extends Conn
if (mechanism == null || mechanism.length() == 0)
{
- conn.connectionTune
- (getChannelMax(),
- org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
- 0, getHeartbeatMax());
+ tuneAuthorizedConnection(conn);
return;
}
@@ -97,8 +94,7 @@ public class ServerDelegate extends Conn
}
catch (SaslException e)
{
- conn.exception(e);
- conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
+ connectionAuthFailed(conn, e);
}
}
@@ -109,33 +105,52 @@ public class ServerDelegate extends Conn
return ss;
}
- private void secure(Connection conn, byte[] response)
+ protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
{
- SaslServer ss = conn.getSaslServer();
try
{
byte[] challenge = ss.evaluateResponse(response);
if (ss.isComplete())
{
ss.dispose();
- conn.connectionTune
- (getChannelMax(),
- org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
- 0, getHeartbeatMax());
- conn.setAuthorizationID(ss.getAuthorizationID());
+ tuneAuthorizedConnection(conn);
}
else
{
- conn.connectionSecure(challenge);
+ connectionAuthContinue(conn, challenge);
}
}
catch (SaslException e)
{
- conn.exception(e);
- conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
+ connectionAuthFailed(conn, e);
}
}
+ protected void connectionAuthFailed(final Connection conn, Exception e)
+ {
+ conn.exception(e);
+ conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
+ }
+
+ protected void connectionAuthContinue(final Connection conn, byte[] challenge)
+ {
+ conn.connectionSecure(challenge);
+ }
+
+ protected void tuneAuthorizedConnection(final Connection conn)
+ {
+ conn.connectionTune
+ (getChannelMax(),
+ org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+ 0, getHeartbeatMax());
+ }
+
+ protected void secure(final Connection conn, final byte[] response)
+ {
+ final SaslServer ss = conn.getSaslServer();
+ secure(ss, conn, response);
+ }
+
protected int getHeartbeatMax()
{
return 0xFFFF;
@@ -155,22 +170,7 @@ public class ServerDelegate extends Conn
@Override
public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
{
- int okChannelMax = ok.getChannelMax();
-
- if (okChannelMax > getChannelMax())
- {
- _logger.error("Connection '" + conn.getConnectionId() + "' being severed, " +
- "client connectionTuneOk returned a channelMax (" + okChannelMax +
- ") above the servers offered limit (" + getChannelMax() +")");
- //Due to the error we must forcefully close the connection without negotiation
- conn.getSender().close();
- return;
- }
-
- //0 means no implied limit, except available server resources
- //(or that forced by protocol limitations [0xFFFF])
- conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
}
@Override
@@ -200,4 +200,11 @@ public class ServerDelegate extends Conn
ssn.sessionAttached(atc.getName());
ssn.setState(Session.State.OPEN);
}
+
+ protected void setConnectionTuneOkChannelMax(final Connection conn, final int okChannelMax)
+ {
+ //0 means no implied limit, except available server resources
+ //(or that forced by protocol limitations [0xFFFF])
+ conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
+ }
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Session.java Thu Oct 20 18:42:46 2011
@@ -30,6 +30,8 @@ import static org.apache.qpid.transport.
import static org.apache.qpid.transport.Session.State.NEW;
import static org.apache.qpid.transport.Session.State.OPEN;
import static org.apache.qpid.transport.Session.State.RESUMING;
+
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.transport.network.Frame;
import static org.apache.qpid.transport.util.Functions.mod;
import org.apache.qpid.transport.util.Logger;
@@ -42,7 +44,9 @@ import static org.apache.qpid.util.Seria
import static org.apache.qpid.util.Strings.toUTF8;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -55,7 +59,6 @@ import java.util.concurrent.TimeUnit;
public class Session extends SessionInvoker
{
-
private static final Logger log = Logger.get(Session.class);
public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
@@ -89,7 +92,9 @@ public class Session extends SessionInvo
private int channel;
private SessionDelegate delegate;
private SessionListener listener = new DefaultSessionListener();
- private long timeout = 60000;
+ private final long timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
+ Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
+ ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
private boolean autoSync = false;
private boolean incomingInit;
@@ -117,7 +122,9 @@ public class Session extends SessionInvo
private Thread resumer = null;
private boolean transacted = false;
-
+ private SessionDetachCode detachCode;
+ private final Object stateLock = new Object();
+
protected Session(Connection connection, Binary name, long expiry)
{
this(connection, new SessionDelegate(), name, expiry);
@@ -252,6 +259,8 @@ public class Session extends SessionInvo
{
synchronized (commands)
{
+ attach();
+
for (int i = maxComplete + 1; lt(i, commandsOut); i++)
{
Method m = commands[mod(i, commands.length)];
@@ -262,16 +271,48 @@ public class Session extends SessionInvo
}
else if (m instanceof MessageTransfer)
{
- ((MessageTransfer)m).getHeader().get(DeliveryProperties.class).setRedelivered(true);
+ MessageTransfer xfr = (MessageTransfer)m;
+
+ if (xfr.getHeader() != null)
+ {
+ if (xfr.getHeader().get(DeliveryProperties.class) != null)
+ {
+ xfr.getHeader().get(DeliveryProperties.class).setRedelivered(true);
+ }
+ else
+ {
+ Struct[] structs = xfr.getHeader().getStructs();
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ deliveryProps.setRedelivered(true);
+
+ List<Struct> list = Arrays.asList(structs);
+ list.add(deliveryProps);
+ xfr.setHeader(new Header(list));
+ }
+
+ }
+ else
+ {
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ deliveryProps.setRedelivered(true);
+ xfr.setHeader(new Header(deliveryProps));
+ }
}
sessionCommandPoint(m.getId(), 0);
send(m);
}
-
+
sessionCommandPoint(commandsOut, 0);
+
sessionFlush(COMPLETED);
resumer = Thread.currentThread();
state = RESUMING;
+
+ if(isTransacted())
+ {
+ txSelect();
+ }
+
listener.resumed(this);
resumer = null;
}
@@ -422,7 +463,10 @@ public class Session extends SessionInvo
{
return;
}
- sessionCompleted(copy, options);
+ if (copy.size() > 0)
+ {
+ sessionCompleted(copy, options);
+ }
}
}
@@ -532,17 +576,6 @@ public class Session extends SessionInvo
{
if (m.getEncodedTrack() == Frame.L4)
{
-
- if (state == DETACHED && transacted)
- {
- state = CLOSED;
- delegate.closed(this);
- connection.removeSession(this);
- throw new SessionException(
- "Session failed over, possibly in the middle of a transaction. " +
- "Closing the session. Any Transaction in progress will be rolledback.");
- }
-
if (m.hasPayload())
{
acquireCredit();
@@ -550,24 +583,30 @@ public class Session extends SessionInvo
synchronized (commands)
{
- if (state == DETACHED && m.isUnreliable())
+ //allow the txSelect operation to be invoked during resume
+ boolean skipWait = m instanceof TxSelect && state == RESUMING;
+
+ if(!skipWait)
{
- Thread current = Thread.currentThread();
- if (!current.equals(resumer))
+ if (state == DETACHED && m.isUnreliable())
{
- return;
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
+ {
+ return;
+ }
}
- }
- if (state != OPEN && state != CLOSED && state != CLOSING)
- {
- Thread current = Thread.currentThread();
- if (!current.equals(resumer))
+ if (state != OPEN && state != CLOSED && state != CLOSING)
{
- Waiter w = new Waiter(commands, timeout);
- while (w.hasTime() && (state != OPEN && state != CLOSED))
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
{
- w.await();
+ Waiter w = new Waiter(commands, timeout);
+ while (w.hasTime() && (state != OPEN && state != CLOSED))
+ {
+ w.await();
+ }
}
}
}
@@ -661,7 +700,12 @@ public class Session extends SessionInvo
{
sessionCommandPoint(0, 0);
}
- if ((!closing && !transacted && m instanceof MessageTransfer) || m.hasCompletionListener())
+
+ boolean replayTransfer = !closing && !transacted &&
+ m instanceof MessageTransfer &&
+ ! m.isUnreliable();
+
+ if ((replayTransfer) || m.hasCompletionListener())
{
commands[mod(next, commands.length)] = m;
commandBytes += m.getBodySize();
@@ -926,16 +970,29 @@ public class Session extends SessionInvo
public void close()
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Closing [%s] in state [%s]", this, state);
+ }
synchronized (commands)
{
- state = CLOSING;
- setClose(true);
- sessionRequestTimeout(0);
- sessionDetach(name.getBytes());
-
- awaitClose();
-
-
+ switch(state)
+ {
+ case DETACHED:
+ state = CLOSED;
+ delegate.closed(this);
+ connection.removeSession(this);
+ listener.closed(this);
+ break;
+ case CLOSED:
+ break;
+ default:
+ state = CLOSING;
+ setClose(true);
+ sessionRequestTimeout(0);
+ sessionDetach(name.getBytes());
+ awaitClose();
+ }
}
}
@@ -995,7 +1052,8 @@ public class Session extends SessionInvo
if(state == CLOSED)
{
- connection.removeSession(this);
+ connection.removeSession(this);
+ listener.closed(this);
}
}
@@ -1008,13 +1066,55 @@ public class Session extends SessionInvo
{
return String.format("ssn:%s", name);
}
-
+
public void setTransacted(boolean b) {
this.transacted = b;
}
-
+
public boolean isTransacted(){
return transacted;
}
-
+
+ public void setDetachCode(SessionDetachCode dtc)
+ {
+ this.detachCode = dtc;
+ }
+
+ public SessionDetachCode getDetachCode()
+ {
+ return this.detachCode;
+ }
+
+ public void awaitOpen()
+ {
+ switch (state)
+ {
+ case NEW:
+ synchronized(stateLock)
+ {
+ Waiter w = new Waiter(stateLock, timeout);
+ while (w.hasTime() && state == NEW)
+ {
+ w.await();
+ }
+ }
+
+ if (state != OPEN)
+ {
+ throw new SessionException("Timed out waiting for Session to open");
+ }
+ break;
+ case DETACHED:
+ case CLOSING:
+ case CLOSED:
+ throw new SessionException("Session closed");
+ default :
+ break;
+ }
+ }
+
+ public Object getStateLock()
+ {
+ return stateLock;
+ }
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Thu Oct 20 18:42:46 2011
@@ -76,6 +76,10 @@ public class SessionDelegate
@Override public void sessionAttached(Session ssn, SessionAttached atc)
{
ssn.setState(Session.State.OPEN);
+ synchronized (ssn.getStateLock())
+ {
+ ssn.getStateLock().notifyAll();
+ }
}
@Override public void sessionTimeout(Session ssn, SessionTimeout t)
@@ -202,11 +206,19 @@ public class SessionDelegate
public void closed(Session session)
{
- log.warn("CLOSED: [%s]", session);
+ log.debug("CLOSED: [%s]", session);
+ synchronized (session.getStateLock())
+ {
+ session.getStateLock().notifyAll();
+ }
}
public void detached(Session session)
{
- log.warn("DETACHED: [%s]", session);
+ log.debug("DETACHED: [%s]", session);
+ synchronized (session.getStateLock())
+ {
+ session.getStateLock().notifyAll();
+ }
}
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java Thu Oct 20 18:42:46 2011
@@ -63,6 +63,7 @@ abstract class AbstractEncoder implement
ENCODINGS.put(Double.class, Type.DOUBLE);
ENCODINGS.put(Character.class, Type.CHAR);
ENCODINGS.put(byte[].class, Type.VBIN32);
+ ENCODINGS.put(UUID.class, Type.UUID);
}
private final Map<String,byte[]> str8cache = new LinkedHashMap<String,byte[]>()
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java Thu Oct 20 18:42:46 2011
@@ -20,19 +20,11 @@
*/
package org.apache.qpid.transport.network;
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.ConnectionSettings;
-
+/**
+ * A network transport is responsible for the establishment of network connections.
+ * NetworkTransport implementations are pluggable via the {@link Transport} class.
+ */
public interface NetworkTransport
{
- public void init(ConnectionSettings settings);
-
- public Sender<ByteBuffer> sender();
-
- public void receiver(Receiver<ByteBuffer> delegate);
-
public void close();
-}
\ No newline at end of file
+}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Thu Oct 20 18:42:46 2011
@@ -21,57 +21,49 @@
package org.apache.qpid.transport.network.io;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
+import java.net.*;
import java.nio.ByteBuffer;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.NetworkTransport;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocketFactory;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
import org.apache.qpid.transport.util.Logger;
-public class IoNetworkTransport implements NetworkTransport, IoContext
+public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
{
- static
- {
- org.apache.mina.common.ByteBuffer.setAllocator
- (new org.apache.mina.common.SimpleByteBufferAllocator());
- org.apache.mina.common.ByteBuffer.setUseDirectBuffers
- (Boolean.getBoolean("amqj.enableDirectBuffers"));
- }
- private static final Logger log = Logger.get(IoNetworkTransport.class);
+ private static final Logger LOGGER = Logger.get(IoNetworkTransport.class);
- private Socket socket;
- private Sender<ByteBuffer> sender;
- private IoReceiver receiver;
- private long timeout = 60000;
- private ConnectionSettings settings;
-
- public void init(ConnectionSettings settings)
+ private Socket _socket;
+ private IoNetworkConnection _connection;
+ private long _timeout = 60000;
+ private AcceptingThread _acceptor;
+
+ public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext)
{
+ int sendBufferSize = settings.getWriteBufferSize();
+ int receiveBufferSize = settings.getReadBufferSize();
+
try
{
- this.settings = settings;
- InetAddress address = InetAddress.getByName(settings.getHost());
- socket = new Socket();
- socket.setReuseAddress(true);
- socket.setTcpNoDelay(settings.isTcpNodelay());
+ _socket = new Socket();
+ _socket.setReuseAddress(true);
+ _socket.setTcpNoDelay(settings.isTcpNodelay());
+ _socket.setSendBufferSize(sendBufferSize);
+ _socket.setReceiveBufferSize(receiveBufferSize);
- log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
- log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
+ LOGGER.debug("SO_RCVBUF : %s", _socket.getReceiveBufferSize());
+ LOGGER.debug("SO_SNDBUF : %s", _socket.getSendBufferSize());
- socket.setSendBufferSize(settings.getWriteBufferSize());
- socket.setReceiveBufferSize(settings.getReadBufferSize());
-
- log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
- log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
+ InetAddress address = InetAddress.getByName(settings.getHost());
- socket.connect(new InetSocketAddress(address, settings.getPort()));
+ _socket.connect(new InetSocketAddress(address, settings.getPort()));
}
catch (SocketException e)
{
@@ -81,36 +73,159 @@ public class IoNetworkTransport implemen
{
throw new TransportException("Error connecting to broker", e);
}
- }
- public void receiver(Receiver<ByteBuffer> delegate)
- {
- receiver = new IoReceiver(this, delegate,
- 2*settings.getReadBufferSize() , timeout);
- }
+ try
+ {
+ _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout);
+ _connection.start();
+ }
+ catch(Exception e)
+ {
+ try
+ {
+ _socket.close();
+ }
+ catch(IOException ioe)
+ {
+ //ignored, throw based on original exception
+ }
- public Sender<ByteBuffer> sender()
- {
- return new IoSender(this, 2*settings.getWriteBufferSize(), timeout);
+ throw new TransportException("Error creating network connection", e);
+ }
+
+ return _connection;
}
public void close()
{
-
+ if(_connection != null)
+ {
+ _connection.close();
+ }
+ if(_acceptor != null)
+ {
+ _acceptor.close();
+ }
}
- public Sender<ByteBuffer> getSender()
+ public NetworkConnection getConnection()
{
- return sender;
+ return _connection;
}
- public IoReceiver getReceiver()
+ public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext)
{
- return receiver;
+
+ try
+ {
+ _acceptor = new AcceptingThread(config, factory, sslContext);
+
+ _acceptor.start();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Unable to start server socket", e);
+ }
+
+
}
- public Socket getSocket()
+ private class AcceptingThread extends Thread
{
- return socket;
+ private NetworkTransportConfiguration _config;
+ private ProtocolEngineFactory _factory;
+ private SSLContext _sslContent;
+ private ServerSocket _serverSocket;
+
+ private AcceptingThread(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext)
+ throws IOException
+ {
+ _config = config;
+ _factory = factory;
+ _sslContent = sslContext;
+
+ InetSocketAddress address = new InetSocketAddress(config.getHost(), config.getPort());
+
+ if(sslContext == null)
+ {
+ _serverSocket = new ServerSocket();
+ }
+ else
+ {
+ SSLServerSocketFactory socketFactory = sslContext.getServerSocketFactory();
+ _serverSocket = socketFactory.createServerSocket();
+ }
+
+ _serverSocket.bind(address);
+ _serverSocket.setReuseAddress(true);
+
+
+ }
+
+
+ /**
+ Close the underlying ServerSocket if it has not already been closed.
+ */
+ public void close()
+ {
+ if (!_serverSocket.isClosed())
+ {
+ try
+ {
+ _serverSocket.close();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ while (true)
+ {
+ try
+ {
+ Socket socket = _serverSocket.accept();
+ socket.setTcpNoDelay(_config.getTcpNoDelay());
+
+ final Integer sendBufferSize = _config.getSendBufferSize();
+ final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+ socket.setSendBufferSize(sendBufferSize);
+ socket.setReceiveBufferSize(receiveBufferSize);
+
+ ProtocolEngine engine = _factory.newProtocolEngine();
+
+ NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout);
+
+
+ engine.setNetworkConnection(connection, connection.getSender());
+
+ connection.start();
+
+
+ }
+ catch(RuntimeException e)
+ {
+ LOGGER.error(e, "Error in Acceptor thread " + _config.getPort());
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ LOGGER.debug(e, "SocketException - no new connections will be accepted on port "
+ + _config.getPort());
+ }
+ }
+
+
}
+
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Thu Oct 20 18:42:46 2011
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.transport.network.io;
+import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
@@ -37,56 +38,77 @@ import java.util.concurrent.atomic.Atomi
*
*/
-final class IoReceiver implements Runnable
+final class IoReceiver implements Runnable, Closeable
{
private static final Logger log = Logger.get(IoReceiver.class);
- private final IoContext ioCtx;
private final Receiver<ByteBuffer> receiver;
private final int bufferSize;
private final Socket socket;
private final long timeout;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread receiverThread;
- private final boolean shutdownBroken =
- ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*");
+ private static final boolean shutdownBroken;
+ static
+ {
+ String osName = System.getProperty("os.name");
+ shutdownBroken = osName == null ? false : osName.matches("(?i).*windows.*");
+ }
- public IoReceiver(IoContext ioCtx, Receiver<ByteBuffer> receiver,
- int bufferSize, long timeout)
+ public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout)
{
- this.ioCtx = ioCtx;
this.receiver = receiver;
this.bufferSize = bufferSize;
- this.socket = ioCtx.getSocket();
+ this.socket = socket;
this.timeout = timeout;
try
{
+ //Create but deliberately don't start the thread.
receiverThread = Threading.getThreadFactory().createThread(this);
}
catch(Exception e)
{
- throw new Error("Error creating IOReceiver thread",e);
+ throw new RuntimeException("Error creating IOReceiver thread",e);
}
receiverThread.setDaemon(true);
receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
+ }
+
+ public void initiate()
+ {
receiverThread.start();
}
+ public void close()
+ {
+ close(false);
+ }
+
void close(boolean block)
{
if (!closed.getAndSet(true))
{
try
{
- if (shutdownBroken)
+ try
{
- socket.close();
+ if (shutdownBroken)
+ {
+ socket.close();
+ }
+ else
+ {
+ socket.shutdownInput();
+ }
}
- else
+ catch(SocketException se)
{
- socket.shutdownInput();
+ if(!socket.isClosed() && !socket.isInputShutdown())
+ {
+ throw se;
+ }
}
if (block && Thread.currentThread() != receiverThread)
{
@@ -105,6 +127,7 @@ final class IoReceiver implements Runnab
{
throw new TransportException(e);
}
+
}
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Thu Oct 20 18:42:46 2011
@@ -24,10 +24,14 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
@@ -43,7 +47,6 @@ public final class IoSender implements R
// we can test other cases as well
private final static int START = Integer.MAX_VALUE - 10;
- private final IoContext ioCtx;
private final long timeout;
private final Socket socket;
private final OutputStream out;
@@ -56,14 +59,13 @@ public final class IoSender implements R
private final Object notEmpty = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread senderThread;
-
- private volatile Throwable exception = null;
+ private final List<Closeable> _listeners = new ArrayList<Closeable>();
+ private volatile Throwable exception = null;
- public IoSender(IoContext ioCtx, int bufferSize, long timeout)
+ public IoSender(Socket socket, int bufferSize, long timeout)
{
- this.ioCtx = ioCtx;
- this.socket = ioCtx.getSocket();
+ this.socket = socket;
this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2
this.timeout = timeout;
@@ -78,15 +80,20 @@ public final class IoSender implements R
try
{
- senderThread = Threading.getThreadFactory().createThread(this);
+ //Create but deliberately don't start the thread.
+ senderThread = Threading.getThreadFactory().createThread(this);
}
catch(Exception e)
{
throw new Error("Error creating IOSender thread",e);
}
-
+
senderThread.setDaemon(true);
senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
+ }
+
+ public void initiate()
+ {
senderThread.start();
}
@@ -104,7 +111,11 @@ public final class IoSender implements R
{
if (closed.get())
{
- throw new SenderException("sender is closed", exception);
+ throw new SenderClosedException("sender is closed", exception);
+ }
+ if(!senderThread.isAlive())
+ {
+ throw new SenderException("sender thread not alive");
}
final int size = buffer.length;
@@ -137,7 +148,7 @@ public final class IoSender implements R
if (closed.get())
{
- throw new SenderException("sender is closed", exception);
+ throw new SenderClosedException("sender is closed", exception);
}
if (head - tail >= size)
@@ -204,16 +215,20 @@ public final class IoSender implements R
senderThread.join(timeout);
if (senderThread.isAlive())
{
+ log.error("join timed out");
throw new SenderException("join timed out");
}
}
- ioCtx.getReceiver().close(false);
}
catch (InterruptedException e)
{
+ log.error("interrupted whilst waiting for sender thread to stop");
throw new SenderException(e);
}
-
+ finally
+ {
+ closeListeners();
+ }
if (reportException && exception != null)
{
throw new SenderException(exception);
@@ -221,9 +236,31 @@ public final class IoSender implements R
}
}
+ private void closeListeners()
+ {
+ Exception ex = null;
+ for(Closeable listener : _listeners)
+ {
+ try
+ {
+ listener.close();
+ }
+ catch(Exception e)
+ {
+ log.error("Exception closing listener: " + e.getMessage());
+ ex = e;
+ }
+ }
+
+ if (ex != null)
+ {
+ throw new SenderException(ex.getMessage(), ex);
+ }
+ }
+
public void run()
{
- final int size = buffer.length;
+ final int size = buffer.length;
while (true)
{
final int hd = head;
@@ -304,4 +341,9 @@ public final class IoSender implements R
throw new SenderException(e);
}
}
+
+ public void registerCloseListener(Closeable listener)
+ {
+ _listeners.add(listener);
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org