You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC
svn commit: r1187150 [35/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java Fri Oct 21 01:19:00 2011
@@ -26,8 +26,11 @@ import org.apache.qpid.framing.abstracti
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl;
import org.apache.qpid.framing.*;
+import org.apache.mina.common.ByteBuffer;
+
public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
{
private int _basicPublishClassId;
@@ -57,9 +60,9 @@ public class MethodConverter_8_0 extends
return contentBodyChunk.getSize();
}
- public byte[] getData()
+ public ByteBuffer getData()
{
- return contentBodyChunk._payload;
+ return contentBodyChunk.payload;
}
public void reduceToFit()
@@ -78,9 +81,9 @@ public class MethodConverter_8_0 extends
}
- public AMQBody convertToBody(byte[] data)
+ public AMQBody convertToBody(java.nio.ByteBuffer buf)
{
- return new ContentBody(data);
+ return new ContentBody(ByteBuffer.wrap(buf));
}
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java Fri Oct 21 01:19:00 2011
@@ -80,7 +80,7 @@ public final class AMQConstant
/**
* An operator intervened to close the connection for some reason. The client may retry at some later date.
*/
- public static final AMQConstant CONNECTION_FORCED = new AMQConstant(320, "connection forced", true);
+ public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true);
/** The client tried to work with an unknown virtual host or cluster. */
public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true);
@@ -104,7 +104,7 @@ public final class AMQConstant
public static final AMQConstant REQUEST_TIMEOUT = new AMQConstant(408, "Request Timeout", true);
- public static final AMQConstant ARGUMENT_INVALID = new AMQConstant(409, "argument invalid", true);
+ public static final AMQConstant INVALID_ARGUMENT = new AMQConstant(409, "argument invalid", true);
/**
* The client sent a malformed frame that the server could not decode. This strongly implies a programming error
@@ -153,7 +153,10 @@ public final class AMQConstant
public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true);
- public static final AMQConstant INVALID_ARGUMENT = new AMQConstant(542, "invalid argument", true);
+ /**
+ * The server does not support the protocol version
+ */
+ public static final AMQConstant UNSUPPORTED_BROKER_PROTOCOL_ERROR = new AMQConstant(542, "broker unsupported protocol", true);
/**
* The client imp does not support the protocol version
*/
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java Fri Oct 21 01:19:00 2011
@@ -21,11 +21,10 @@
package org.apache.qpid.protocol;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.NetworkConnection;
/**
* A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
@@ -33,6 +32,9 @@ import org.apache.qpid.transport.network
*/
public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
{
+ // Sets the network driver providing data for this ProtocolEngine
+ void setNetworkDriver (NetworkDriver driver);
+
// Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
@@ -56,6 +58,4 @@ public interface ProtocolEngine extends
void readerIdle();
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
-
}
\ No newline at end of file
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java Fri Oct 21 01:19:00 2011
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.protocol;
-import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.NetworkDriver;
public interface ProtocolEngineFactory
{
// Returns a new instance of a ProtocolEngine
- ProtocolEngine newProtocolEngine();
+ ProtocolEngine newProtocolEngine(NetworkDriver networkDriver);
}
\ No newline at end of file
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java Fri Oct 21 01:19:00 2011
@@ -20,17 +20,18 @@
*/
package org.apache.qpid.ssl;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
-import org.apache.qpid.transport.network.security.ssl.QpidClientX509KeyManager;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
/**
@@ -38,92 +39,157 @@ import org.apache.qpid.transport.network
* before this will work.
*
*/
-public class SSLContextFactory
-{
- public static final String JAVA_KEY_STORE_CODE = "JKS";
- public static final String TRANSPORT_LAYER_SECURITY_CODE = "TLS";
- public static final String KEY_STORE_CERTIFICATE_TYPE = "SunX509";
-
- private SSLContextFactory()
- {
- //no instances
- }
-
- public static SSLContext buildServerContext(final String keyStorePath,
- final String keyStorePassword, final String keyStoreCertType)
- throws GeneralSecurityException, IOException
+public class SSLContextFactory {
+
+ /**
+ * Path to the Java keystore file
+ */
+ private String _keyStorePath;
+
+ /**
+ * Password for the keystore
+ */
+ private String _keyStorePassword;
+
+ /**
+ * Cert type to use in keystore
+ */
+ private String _keyStoreCertType;
+
+ /**
+ * Path to the Java truststore file
+ */
+ private String _trustStorePath;
+
+ /**
+ * Password for the truststore
+ */
+ private String _trustStorePassword;
+
+ /**
+ * Cert type to use in truststore
+ */
+ private String _trustStoreCertType;
+
+ private KeyManager customKeyManager;
+
+ public SSLContextFactory(String trustStorePath, String trustStorePassword,
+ String trustStoreCertType)
{
- return buildContext(null, null, null, keyStorePath, keyStorePassword,
- keyStoreCertType, null);
+ this(trustStorePath,trustStorePassword,trustStoreCertType,
+ trustStorePath,trustStorePassword,trustStoreCertType);
}
- public static SSLContext buildClientContext(final String trustStorePath,
- final String trustStorePassword, final String trustStoreCertType,
- final String keyStorePath, final String keyStorePassword,
- final String keyStoreCertType, final String certAlias)
- throws GeneralSecurityException, IOException
- {
- return buildContext(trustStorePath, trustStorePassword,
- trustStoreCertType, keyStorePath, keyStorePassword,
- keyStoreCertType, certAlias);
- }
-
- private static SSLContext buildContext(final String trustStorePath,
- final String trustStorePassword, final String trustStoreCertType,
- final String keyStorePath, final String keyStorePassword,
- final String keyStoreCertType, final String certAlias)
- throws GeneralSecurityException, IOException
+ /**
+ * Create a factory instance
+ * @param keystorePath path to the Java keystore file
+ * @param keystorePassword password for the Java keystore
+ * @param certType certificate type
+ */
+ public SSLContextFactory(String trustStorePath, String trustStorePassword, String trustStoreCertType,
+ String keyStorePath, String keyStorePassword, String keyStoreCertType)
+ {
+
+ _trustStorePath = trustStorePath;
+ _trustStorePassword = trustStorePassword;
+
+ if (_trustStorePassword != null && _trustStorePassword.equals("none"))
+ {
+ _trustStorePassword = null;
+ }
+ _trustStoreCertType = trustStoreCertType;
+
+ _keyStorePath = keyStorePath;
+ _keyStorePassword = keyStorePassword;
+
+ if (_keyStorePassword != null && _keyStorePassword.equals("none"))
+ {
+ _keyStorePassword = null;
+ }
+ _keyStoreCertType = keyStoreCertType;
+
+ if (_trustStorePath == null) {
+ throw new IllegalArgumentException("A TrustStore path or KeyStore path must be specified");
+ }
+ if (_trustStoreCertType == null) {
+ throw new IllegalArgumentException("Cert type must be specified");
+ }
+ }
+
+ public SSLContextFactory(String trustStorePath, String trustStorePassword, String trustStoreCertType,
+ KeyManager customKeyManager)
{
- // Initialize the SSLContext to work with our key managers.
- final SSLContext sslContext = SSLContext
- .getInstance(TRANSPORT_LAYER_SECURITY_CODE);
-
- final TrustManager[] trustManagers;
- final KeyManager[] keyManagers;
- if (trustStorePath != null)
+ _trustStorePath = trustStorePath;
+ _trustStorePassword = trustStorePassword;
+
+ if (_trustStorePassword != null && _trustStorePassword.equals("none"))
{
- final KeyStore ts = SSLUtil.getInitializedKeyStore(trustStorePath,
- trustStorePassword);
- final TrustManagerFactory tmf = TrustManagerFactory
- .getInstance(trustStoreCertType);
- tmf.init(ts);
-
- trustManagers = tmf.getTrustManagers();
+ _trustStorePassword = null;
}
- else
- {
- trustManagers = null;
+ _trustStoreCertType = trustStoreCertType;
+
+ if (_trustStorePath == null) {
+ throw new IllegalArgumentException("A TrustStore path or KeyStore path must be specified");
}
-
- if (keyStorePath != null)
+ if (_trustStoreCertType == null) {
+ throw new IllegalArgumentException("Cert type must be specified");
+ }
+
+ this.customKeyManager = customKeyManager;
+ }
+
+
+ /**
+ * Builds a SSLContext appropriate for use with a server
+ * @return SSLContext
+ * @throws GeneralSecurityException
+ * @throws IOException
+ */
+
+ public SSLContext buildServerContext() throws GeneralSecurityException, IOException
+ {
+ KeyStore ts = SSLUtil.getInitializedKeyStore(_trustStorePath,_trustStorePassword);
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType);
+ tmf.init(ts);
+
+ // Initialize the SSLContext to work with our key managers.
+ SSLContext sslContext = SSLContext.getInstance("TLS");
+
+ if (customKeyManager != null)
{
- if (certAlias != null)
- {
- keyManagers = new KeyManager[] { new QpidClientX509KeyManager(
- certAlias, keyStorePath, keyStorePassword,
- keyStoreCertType) };
- }
- else
- {
- final KeyStore ks = SSLUtil.getInitializedKeyStore(
- keyStorePath, keyStorePassword);
-
- char[] keyStoreCharPassword = keyStorePassword == null ? null : keyStorePassword.toCharArray();
- // Set up key manager factory to use our key store
- final KeyManagerFactory kmf = KeyManagerFactory
- .getInstance(keyStoreCertType);
- kmf.init(ks, keyStoreCharPassword);
- keyManagers = kmf.getKeyManagers();
- }
+ sslContext.init(new KeyManager[]{customKeyManager},
+ tmf.getTrustManagers(), null);
+
}
else
{
- keyManagers = null;
- }
+ // Create keystore
+ KeyStore ks = SSLUtil.getInitializedKeyStore(_keyStorePath,_keyStorePassword);
+ // Set up key manager factory to use our key store
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(_keyStoreCertType);
+ kmf.init(ks, _keyStorePassword.toCharArray());
- sslContext.init(keyManagers, trustManagers, null);
-
- return sslContext;
- }
+ sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+ }
+
+ return sslContext;
+ }
+
+ /**
+ * Creates a SSLContext factory appropriate for use with a client
+ * @return SSLContext
+ * @throws GeneralSecurityException
+ * @throws IOException
+ */
+ public SSLContext buildClientContext() throws GeneralSecurityException, IOException
+ {
+ KeyStore ks = SSLUtil.getInitializedKeyStore(_trustStorePath,_trustStorePassword);
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType);
+ tmf.init(ks);
+ SSLContext context = SSLContext.getInstance("TLS");
+ context.init(null, tmf.getTrustManagers(), null);
+ return context;
+ }
+
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java Fri Oct 21 01:19:00 2011
@@ -23,7 +23,7 @@ package org.apache.qpid.thread;
import org.apache.qpid.thread.Threading;
-import java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
public class QpidThreadExecutor implements Executor
{
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Fri Oct 21 01:19:00 2011
@@ -20,20 +20,28 @@
*/
package org.apache.qpid.transport;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+
+import org.apache.qpid.security.UsernamePasswordCallbackHandler;
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.RESUMING;
+import org.apache.qpid.transport.util.Logger;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
-import org.apache.qpid.transport.util.Logger;
-
/**
* ClientDelegate
@@ -44,13 +52,31 @@ public class ClientDelegate extends Conn
{
private static final Logger log = Logger.get(ClientDelegate.class);
+ private static final String KRB5_OID_STR = "1.2.840.113554.1.2.2";
+ protected static final Oid KRB5_OID;
+ static
+ {
+ Oid oid;
+ try
+ {
+ oid = new Oid(KRB5_OID_STR);
+ }
+ catch (GSSException ignore)
+ {
+ oid = null;
+ }
- protected final ConnectionSettings _conSettings;
+ KRB5_OID = oid;
+ }
+
+ private List<String> clientMechs;
+ private ConnectionSettings conSettings;
public ClientDelegate(ConnectionSettings settings)
{
- this._conSettings = settings;
+ this.conSettings = settings;
+ this.clientMechs = Arrays.asList(settings.getSaslMechs().split(" "));
}
public void init(Connection conn, ProtocolHeader hdr)
@@ -66,9 +92,9 @@ public class ClientDelegate extends Conn
{
Map<String,Object> clientProperties = new HashMap<String,Object>();
- if(this._conSettings.getClientProperties() != null)
+ if(this.conSettings.getClientProperties() != null)
{
- clientProperties.putAll(_conSettings.getClientProperties());
+ clientProperties.putAll(this.conSettings.getClientProperties());
}
clientProperties.put("qpid.session_flow", 1);
@@ -83,12 +109,41 @@ public class ClientDelegate extends Conn
(clientProperties, null, null, conn.getLocale());
return;
}
+
+ List<String> choosenMechs = new ArrayList<String>();
+ for (String mech:clientMechs)
+ {
+ if (brokerMechs.contains(mech))
+ {
+ choosenMechs.add(mech);
+ }
+ }
+
+ if (choosenMechs.size() == 0)
+ {
+ conn.exception(new ConnectionException("The following SASL mechanisms " +
+ clientMechs.toString() +
+ " specified by the client are not supported by the broker"));
+ return;
+ }
+
+ String[] mechs = new String[choosenMechs.size()];
+ choosenMechs.toArray(mechs);
+
conn.setServerProperties(start.getServerProperties());
try
{
- final SaslClient sc = createSaslClient(brokerMechs);
-
+ Map<String,Object> saslProps = new HashMap<String,Object>();
+ if (conSettings.isUseSASLEncryption())
+ {
+ saslProps.put(Sasl.QOP, "auth-conf");
+ }
+ UsernamePasswordCallbackHandler handler =
+ new UsernamePasswordCallbackHandler();
+ handler.initialise(conSettings.getUsername(), conSettings.getPassword());
+ SaslClient sc = Sasl.createSaslClient
+ (mechs, null, conSettings.getSaslProtocol(), conSettings.getSaslServerName(), saslProps, handler);
conn.setSaslClient(sc);
byte[] response = sc.hasInitialResponse() ?
@@ -97,22 +152,12 @@ public class ClientDelegate extends Conn
(clientProperties, sc.getMechanismName(), response,
conn.getLocale());
}
- catch (ConnectionException ce)
- {
- conn.exception(ce);
- }
catch (SaslException e)
{
conn.exception(e);
}
}
-
- protected SaslClient createSaslClient(List<Object> brokerMechs) throws ConnectionException, SaslException
- {
- throw new UnsupportedOperationException();
- }
-
@Override
public void connectionSecure(Connection conn, ConnectionSecure secure)
{
@@ -131,7 +176,7 @@ public class ClientDelegate extends Conn
@Override
public void connectionTune(Connection conn, ConnectionTune tune)
{
- int hb_interval = calculateHeartbeatInterval(_conSettings.getHeartbeatInterval(),
+ int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(),
tune.getHeartbeatMin(),
tune.getHeartbeatMax()
);
@@ -146,12 +191,32 @@ public class ClientDelegate extends Conn
//(or that forced by protocol limitations [0xFFFF])
conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax);
- conn.connectionOpen(_conSettings.getVhost(), null, Option.INSIST);
+ conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST);
}
@Override
public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
{
+ SaslClient sc = conn.getSaslClient();
+ if (sc != null)
+ {
+ if (sc.getMechanismName().equals("GSSAPI"))
+ {
+ String id = getKerberosUser();
+ if (id != null)
+ {
+ conn.setUserID(id);
+ }
+ }
+ else if (sc.getMechanismName().equals("EXTERNAL"))
+ {
+ if (conn.getSecurityLayer() != null)
+ {
+ conn.setUserID(conn.getSecurityLayer().getUserID());
+ }
+ }
+ }
+
if (conn.isConnectionResuming())
{
conn.setState(RESUMING);
@@ -182,7 +247,7 @@ public class ClientDelegate extends Conn
int i = heartbeat;
if (i == 0)
{
- log.info("Idle timeout is 0 sec. Heartbeats are disabled.");
+ log.warn("Idle timeout is zero. Heartbeats are disabled");
return 0; // heartbeats are disabled.
}
else if (i >= min && i <= max)
@@ -191,8 +256,8 @@ public class ClientDelegate extends Conn
}
else
{
- log.info("The broker does not support the configured connection idle timeout of %s sec," +
- " using the brokers max supported value of %s sec instead.", i,max);
+ log.warn("Ignoring the idle timeout %s set by the connection," +
+ " using the brokers max value %s", i,max);
return max;
}
}
@@ -221,7 +286,35 @@ public class ClientDelegate extends Conn
}
+ private String getKerberosUser()
+ {
+ log.debug("Obtaining userID from kerberos");
+ String service = conSettings.getSaslProtocol() + "@" + conSettings.getSaslServerName();
+ GSSManager manager = GSSManager.getInstance();
+
+ try
+ {
+ GSSName acceptorName = manager.createName(service,
+ GSSName.NT_HOSTBASED_SERVICE, KRB5_OID);
+
+ GSSContext secCtx = manager.createContext(acceptorName,
+ KRB5_OID,
+ null,
+ GSSContext.INDEFINITE_LIFETIME);
+ secCtx.initSecContext(new byte[0], 0, 1);
+ if (secCtx.getSrcName() != null)
+ {
+ return secCtx.getSrcName().toString();
+ }
+ }
+ catch (GSSException e)
+ {
+ log.warn("Unable to retrieve userID from Kerberos due to error",e);
+ }
+
+ return null;
+ }
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Connection.java Fri Oct 21 01:19:00 2011
@@ -25,29 +25,21 @@ import static org.apache.qpid.transport.
import static org.apache.qpid.transport.Connection.State.NEW;
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
+import static org.apache.qpid.transport.Connection.State.RESUMING;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslServer;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.network.Assembler;
-import org.apache.qpid.transport.network.Disassembler;
-import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
import org.apache.qpid.transport.network.security.SecurityLayer;
-import org.apache.qpid.transport.network.security.SecurityLayerFactory;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
import org.apache.qpid.util.Strings;
@@ -73,7 +65,6 @@ public class Connection extends Connecti
public static final int MAX_CHANNEL_MAX = 0xFFFF;
public static final int MIN_USABLE_CHANNEL_NUM = 0;
-
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
static class DefaultConnectionListener implements ConnectionListener
@@ -121,14 +112,17 @@ public class Connection extends Connecti
private SaslServer saslServer;
private SaslClient saslClient;
private int idleTimeout = 0;
+ private String _authorizationID;
private Map<String,Object> _serverProperties;
private String userID;
private ConnectionSettings conSettings;
private SecurityLayer securityLayer;
private String _clientId;
-
+
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+ private final long _connectionId = idGenerator.incrementAndGet();
private final AtomicBoolean connectionLost = new AtomicBoolean(false);
-
+
public Connection() {}
public void setConnectionDelegate(ConnectionDelegate delegate)
@@ -239,24 +233,14 @@ public class Connection extends Connecti
conSettings = settings;
state = OPENING;
userID = settings.getUsername();
-
- securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings());
-
- OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
- Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(new InputHandler(new Assembler(this)));
- if(secureReceiver instanceof ConnectionListener)
- {
- addConnectionListener((ConnectionListener)secureReceiver);
- }
-
- NetworkConnection network = transport.connect(settings, secureReceiver, null);
- final Sender<ByteBuffer> secureSender = securityLayer.sender(network.getSender());
- if(secureSender instanceof ConnectionListener)
- {
- addConnectionListener((ConnectionListener)secureSender);
- }
- sender = new Disassembler(secureSender, settings.getMaxFrameSize());
-
+ delegate = new ClientDelegate(settings);
+
+ TransportBuilder transport = new TransportBuilder();
+ transport.init(this);
+ this.sender = transport.buildSenderPipe();
+ transport.buildReceiverPipe(this);
+ this.securityLayer = transport.getSecurityLayer();
+
send(new ProtocolHeader(1, 0, 10));
Waiter w = new Waiter(lock, timeout);
@@ -337,31 +321,23 @@ public class Connection extends Connecti
Waiter w = new Waiter(lock, timeout);
while (w.hasTime() && state != OPEN && error == null)
{
- w.await();
+ w.await();
}
-
+
if (state != OPEN)
{
throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state);
}
-
+
Session ssn = _sessionFactory.newSession(this, name, expiry);
- registerSession(ssn);
+ sessions.put(name, ssn);
map(ssn);
ssn.attach();
return ssn;
}
}
- public void registerSession(Session ssn)
- {
- synchronized (lock)
- {
- sessions.put(ssn.getName(),ssn);
- }
- }
-
- public void removeSession(Session ssn)
+ void removeSession(Session ssn)
{
synchronized (lock)
{
@@ -376,6 +352,11 @@ public class Connection extends Connecti
_sessionFactory = sessionFactory;
}
+ public long getConnectionId()
+ {
+ return _connectionId;
+ }
+
public ConnectionDelegate getConnectionDelegate()
{
return delegate;
@@ -424,7 +405,7 @@ public class Connection extends Connecti
else
{
throw new ProtocolViolationException(
- "Received frames for an already detached session", null);
+ "Received frames for an already dettached session", null);
}
}
@@ -473,7 +454,7 @@ public class Connection extends Connecti
}
}
- public Session getSession(int channel)
+ protected Session getSession(int channel)
{
synchronized (lock)
{
@@ -487,10 +468,18 @@ public class Connection extends Connecti
{
for (Session ssn : sessions.values())
{
- map(ssn);
- ssn.resume();
+ if (ssn.isTransacted())
+ {
+ removeSession(ssn);
+ ssn.setState(Session.State.CLOSED);
+ }
+ else
+ {
+ map(ssn);
+ ssn.attach();
+ ssn.resume();
+ }
}
-
setState(OPEN);
}
}
@@ -577,12 +566,12 @@ public class Connection extends Connecti
{
close(ConnectionCloseCode.NORMAL, null);
}
-
+
public void mgmtClose()
{
close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface.");
}
-
+
public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options)
{
synchronized (lock)
@@ -656,6 +645,16 @@ public class Connection extends Connecti
return idleTimeout;
}
+ public void setAuthorizationID(String authorizationID)
+ {
+ _authorizationID = authorizationID;
+ }
+
+ public String getAuthorizationID()
+ {
+ return _authorizationID;
+ }
+
public String getUserID()
{
return userID;
@@ -685,24 +684,15 @@ public class Connection extends Connecti
{
return conSettings;
}
-
+
public SecurityLayer getSecurityLayer()
{
return securityLayer;
}
-
+
public boolean isConnectionResuming()
{
return connectionLost.get();
}
- protected Collection<Session> getChannels()
- {
- return channels.values();
- }
-
- public boolean hasSessionWithName(final String name)
- {
- return sessions.containsKey(new Binary(name.getBytes()));
- }
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java Fri Oct 21 01:19:00 2011
@@ -85,7 +85,7 @@ public abstract class ConnectionDelegate
@Override public void sessionDetach(Connection conn, SessionDetach dtc)
{
Session ssn = conn.getSession(dtc.getChannel());
- ssn.sessionDetached(dtc.getName(), ssn.getDetachCode() == null? SessionDetachCode.NORMAL: ssn.getDetachCode());
+ ssn.sessionDetached(dtc.getName(), SessionDetachCode.NORMAL);
conn.unmap(ssn);
ssn.closed();
}
@@ -95,7 +95,6 @@ public abstract class ConnectionDelegate
Session ssn = conn.getSession(dtc.getChannel());
if (ssn != null)
{
- ssn.setDetachCode(dtc.getCode());
conn.unmap(ssn);
ssn.closed();
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java Fri Oct 21 01:19:00 2011
@@ -30,8 +30,6 @@ import java.util.Map;
*/
public class ConnectionSettings
{
- public static final String WILDCARD_ADDRESS = "*";
-
String protocol = "tcp";
String host = "localhost";
String vhost;
@@ -58,7 +56,7 @@ public class ConnectionSettings
boolean verifyHostname;
// SASL props
- String saslMechs = System.getProperty("qpid.sasl_mechs", null);
+ String saslMechs = System.getProperty("qpid.sasl_mechs", "PLAIN");
String saslProtocol = System.getProperty("qpid.sasl_protocol", "AMQP");
String saslServerName = System.getProperty("qpid.sasl_server_name", "localhost");
boolean useSASLEncryption;
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Fri Oct 21 01:19:00 2011
@@ -75,7 +75,10 @@ public class ServerDelegate extends Conn
if (mechanism == null || mechanism.length() == 0)
{
- tuneAuthorizedConnection(conn);
+ conn.connectionTune
+ (getChannelMax(),
+ org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+ 0, getHeartbeatMax());
return;
}
@@ -94,7 +97,8 @@ public class ServerDelegate extends Conn
}
catch (SaslException e)
{
- connectionAuthFailed(conn, e);
+ conn.exception(e);
+ conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
}
}
@@ -105,52 +109,33 @@ public class ServerDelegate extends Conn
return ss;
}
- protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
+ private void secure(Connection conn, byte[] response)
{
+ SaslServer ss = conn.getSaslServer();
try
{
byte[] challenge = ss.evaluateResponse(response);
if (ss.isComplete())
{
ss.dispose();
- tuneAuthorizedConnection(conn);
+ conn.connectionTune
+ (getChannelMax(),
+ org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+ 0, getHeartbeatMax());
+ conn.setAuthorizationID(ss.getAuthorizationID());
}
else
{
- connectionAuthContinue(conn, challenge);
+ conn.connectionSecure(challenge);
}
}
catch (SaslException e)
{
- connectionAuthFailed(conn, e);
+ conn.exception(e);
+ conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
}
}
- protected void connectionAuthFailed(final Connection conn, Exception e)
- {
- conn.exception(e);
- conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
- }
-
- protected void connectionAuthContinue(final Connection conn, byte[] challenge)
- {
- conn.connectionSecure(challenge);
- }
-
- protected void tuneAuthorizedConnection(final Connection conn)
- {
- conn.connectionTune
- (getChannelMax(),
- org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
- 0, getHeartbeatMax());
- }
-
- protected void secure(final Connection conn, final byte[] response)
- {
- final SaslServer ss = conn.getSaslServer();
- secure(ss, conn, response);
- }
-
protected int getHeartbeatMax()
{
return 0xFFFF;
@@ -170,7 +155,22 @@ public class ServerDelegate extends Conn
@Override
public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
{
+ int okChannelMax = ok.getChannelMax();
+
+ if (okChannelMax > getChannelMax())
+ {
+ _logger.error("Connection '" + conn.getConnectionId() + "' being severed, " +
+ "client connectionTuneOk returned a channelMax (" + okChannelMax +
+ ") above the servers offered limit (" + getChannelMax() +")");
+ //Due to the error we must forcefully close the connection without negotiation
+ conn.getSender().close();
+ return;
+ }
+
+ //0 means no implied limit, except available server resources
+ //(or that forced by protocol limitations [0xFFFF])
+ conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
}
@Override
@@ -200,11 +200,4 @@ public class ServerDelegate extends Conn
ssn.sessionAttached(atc.getName());
ssn.setState(Session.State.OPEN);
}
-
- protected void setConnectionTuneOkChannelMax(final Connection conn, final int okChannelMax)
- {
- //0 means no implied limit, except available server resources
- //(or that forced by protocol limitations [0xFFFF])
- conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
- }
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/Session.java Fri Oct 21 01:19:00 2011
@@ -30,8 +30,6 @@ import static org.apache.qpid.transport.
import static org.apache.qpid.transport.Session.State.NEW;
import static org.apache.qpid.transport.Session.State.OPEN;
import static org.apache.qpid.transport.Session.State.RESUMING;
-
-import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.transport.network.Frame;
import static org.apache.qpid.transport.util.Functions.mod;
import org.apache.qpid.transport.util.Logger;
@@ -44,9 +42,7 @@ import static org.apache.qpid.util.Seria
import static org.apache.qpid.util.Strings.toUTF8;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -59,6 +55,7 @@ import java.util.concurrent.TimeUnit;
public class Session extends SessionInvoker
{
+
private static final Logger log = Logger.get(Session.class);
public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
@@ -92,9 +89,7 @@ public class Session extends SessionInvo
private int channel;
private SessionDelegate delegate;
private SessionListener listener = new DefaultSessionListener();
- private final long timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
- Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
- ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
+ private long timeout = 60000;
private boolean autoSync = false;
private boolean incomingInit;
@@ -122,9 +117,7 @@ public class Session extends SessionInvo
private Thread resumer = null;
private boolean transacted = false;
- private SessionDetachCode detachCode;
- private final Object stateLock = new Object();
-
+
protected Session(Connection connection, Binary name, long expiry)
{
this(connection, new SessionDelegate(), name, expiry);
@@ -259,8 +252,6 @@ public class Session extends SessionInvo
{
synchronized (commands)
{
- attach();
-
for (int i = maxComplete + 1; lt(i, commandsOut); i++)
{
Method m = commands[mod(i, commands.length)];
@@ -271,48 +262,16 @@ public class Session extends SessionInvo
}
else if (m instanceof MessageTransfer)
{
- MessageTransfer xfr = (MessageTransfer)m;
-
- if (xfr.getHeader() != null)
- {
- if (xfr.getHeader().get(DeliveryProperties.class) != null)
- {
- xfr.getHeader().get(DeliveryProperties.class).setRedelivered(true);
- }
- else
- {
- Struct[] structs = xfr.getHeader().getStructs();
- DeliveryProperties deliveryProps = new DeliveryProperties();
- deliveryProps.setRedelivered(true);
-
- List<Struct> list = Arrays.asList(structs);
- list.add(deliveryProps);
- xfr.setHeader(new Header(list));
- }
-
- }
- else
- {
- DeliveryProperties deliveryProps = new DeliveryProperties();
- deliveryProps.setRedelivered(true);
- xfr.setHeader(new Header(deliveryProps));
- }
+ ((MessageTransfer)m).getHeader().get(DeliveryProperties.class).setRedelivered(true);
}
sessionCommandPoint(m.getId(), 0);
send(m);
}
-
+
sessionCommandPoint(commandsOut, 0);
-
sessionFlush(COMPLETED);
resumer = Thread.currentThread();
state = RESUMING;
-
- if(isTransacted())
- {
- txSelect();
- }
-
listener.resumed(this);
resumer = null;
}
@@ -463,10 +422,7 @@ public class Session extends SessionInvo
{
return;
}
- if (copy.size() > 0)
- {
- sessionCompleted(copy, options);
- }
+ sessionCompleted(copy, options);
}
}
@@ -576,6 +532,17 @@ public class Session extends SessionInvo
{
if (m.getEncodedTrack() == Frame.L4)
{
+
+ if (state == DETACHED && transacted)
+ {
+ state = CLOSED;
+ delegate.closed(this);
+ connection.removeSession(this);
+ throw new SessionException(
+ "Session failed over, possibly in the middle of a transaction. " +
+ "Closing the session. Any Transaction in progress will be rolledback.");
+ }
+
if (m.hasPayload())
{
acquireCredit();
@@ -583,30 +550,24 @@ public class Session extends SessionInvo
synchronized (commands)
{
- //allow the txSelect operation to be invoked during resume
- boolean skipWait = m instanceof TxSelect && state == RESUMING;
-
- if(!skipWait)
+ if (state == DETACHED && m.isUnreliable())
{
- if (state == DETACHED && m.isUnreliable())
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
{
- Thread current = Thread.currentThread();
- if (!current.equals(resumer))
- {
- return;
- }
+ return;
}
+ }
- if (state != OPEN && state != CLOSED && state != CLOSING)
+ if (state != OPEN && state != CLOSED && state != CLOSING)
+ {
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
{
- Thread current = Thread.currentThread();
- if (!current.equals(resumer))
+ Waiter w = new Waiter(commands, timeout);
+ while (w.hasTime() && (state != OPEN && state != CLOSED))
{
- Waiter w = new Waiter(commands, timeout);
- while (w.hasTime() && (state != OPEN && state != CLOSED))
- {
- w.await();
- }
+ w.await();
}
}
}
@@ -700,12 +661,7 @@ public class Session extends SessionInvo
{
sessionCommandPoint(0, 0);
}
-
- boolean replayTransfer = !closing && !transacted &&
- m instanceof MessageTransfer &&
- ! m.isUnreliable();
-
- if ((replayTransfer) || m.hasCompletionListener())
+ if ((!closing && !transacted && m instanceof MessageTransfer) || m.hasCompletionListener())
{
commands[mod(next, commands.length)] = m;
commandBytes += m.getBodySize();
@@ -970,29 +926,16 @@ public class Session extends SessionInvo
public void close()
{
- if (log.isDebugEnabled())
- {
- log.debug("Closing [%s] in state [%s]", this, state);
- }
synchronized (commands)
{
- switch(state)
- {
- case DETACHED:
- state = CLOSED;
- delegate.closed(this);
- connection.removeSession(this);
- listener.closed(this);
- break;
- case CLOSED:
- break;
- default:
- state = CLOSING;
- setClose(true);
- sessionRequestTimeout(0);
- sessionDetach(name.getBytes());
- awaitClose();
- }
+ state = CLOSING;
+ setClose(true);
+ sessionRequestTimeout(0);
+ sessionDetach(name.getBytes());
+
+ awaitClose();
+
+
}
}
@@ -1052,8 +995,7 @@ public class Session extends SessionInvo
if(state == CLOSED)
{
- connection.removeSession(this);
- listener.closed(this);
+ connection.removeSession(this);
}
}
@@ -1066,55 +1008,13 @@ public class Session extends SessionInvo
{
return String.format("ssn:%s", name);
}
-
+
public void setTransacted(boolean b) {
this.transacted = b;
}
-
+
public boolean isTransacted(){
return transacted;
}
-
- public void setDetachCode(SessionDetachCode dtc)
- {
- this.detachCode = dtc;
- }
-
- public SessionDetachCode getDetachCode()
- {
- return this.detachCode;
- }
-
- public void awaitOpen()
- {
- switch (state)
- {
- case NEW:
- synchronized(stateLock)
- {
- Waiter w = new Waiter(stateLock, timeout);
- while (w.hasTime() && state == NEW)
- {
- w.await();
- }
- }
-
- if (state != OPEN)
- {
- throw new SessionException("Timed out waiting for Session to open");
- }
- break;
- case DETACHED:
- case CLOSING:
- case CLOSED:
- throw new SessionException("Session closed");
- default :
- break;
- }
- }
-
- public Object getStateLock()
- {
- return stateLock;
- }
+
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Fri Oct 21 01:19:00 2011
@@ -76,10 +76,6 @@ public class SessionDelegate
@Override public void sessionAttached(Session ssn, SessionAttached atc)
{
ssn.setState(Session.State.OPEN);
- synchronized (ssn.getStateLock())
- {
- ssn.getStateLock().notifyAll();
- }
}
@Override public void sessionTimeout(Session ssn, SessionTimeout t)
@@ -206,19 +202,11 @@ public class SessionDelegate
public void closed(Session session)
{
- log.debug("CLOSED: [%s]", session);
- synchronized (session.getStateLock())
- {
- session.getStateLock().notifyAll();
- }
+ log.warn("CLOSED: [%s]", session);
}
public void detached(Session session)
{
- log.debug("DETACHED: [%s]", session);
- synchronized (session.getStateLock())
- {
- session.getStateLock().notifyAll();
- }
+ log.warn("DETACHED: [%s]", session);
}
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java Fri Oct 21 01:19:00 2011
@@ -63,7 +63,6 @@ abstract class AbstractEncoder implement
ENCODINGS.put(Double.class, Type.DOUBLE);
ENCODINGS.put(Character.class, Type.CHAR);
ENCODINGS.put(byte[].class, Type.VBIN32);
- ENCODINGS.put(UUID.class, Type.UUID);
}
private final Map<String,byte[]> str8cache = new LinkedHashMap<String,byte[]>()
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java Fri Oct 21 01:19:00 2011
@@ -20,11 +20,19 @@
*/
package org.apache.qpid.transport.network;
-/**
- * A network transport is responsible for the establishment of network connections.
- * NetworkTransport implementations are pluggable via the {@link Transport} class.
- */
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ConnectionSettings;
+
public interface NetworkTransport
{
+ public void init(ConnectionSettings settings);
+
+ public Sender<ByteBuffer> sender();
+
+ public void receiver(Receiver<ByteBuffer> delegate);
+
public void close();
-}
+}
\ No newline at end of file
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Fri Oct 21 01:19:00 2011
@@ -21,49 +21,57 @@
package org.apache.qpid.transport.network.io;
import java.io.IOException;
-import java.net.*;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
import java.nio.ByteBuffer;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocketFactory;
-
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.transport.*;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.NetworkTransport;
import org.apache.qpid.transport.util.Logger;
-public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+public class IoNetworkTransport implements NetworkTransport, IoContext
{
+ static
+ {
+ org.apache.mina.common.ByteBuffer.setAllocator
+ (new org.apache.mina.common.SimpleByteBufferAllocator());
+ org.apache.mina.common.ByteBuffer.setUseDirectBuffers
+ (Boolean.getBoolean("amqj.enableDirectBuffers"));
+ }
- private static final Logger LOGGER = Logger.get(IoNetworkTransport.class);
-
- private Socket _socket;
- private IoNetworkConnection _connection;
- private long _timeout = 60000;
- private AcceptingThread _acceptor;
+ private static final Logger log = Logger.get(IoNetworkTransport.class);
- public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext)
+ private Socket socket;
+ private Sender<ByteBuffer> sender;
+ private IoReceiver receiver;
+ private long timeout = 60000;
+ private ConnectionSettings settings;
+
+ public void init(ConnectionSettings settings)
{
- int sendBufferSize = settings.getWriteBufferSize();
- int receiveBufferSize = settings.getReadBufferSize();
-
try
{
- _socket = new Socket();
- _socket.setReuseAddress(true);
- _socket.setTcpNoDelay(settings.isTcpNodelay());
- _socket.setSendBufferSize(sendBufferSize);
- _socket.setReceiveBufferSize(receiveBufferSize);
+ this.settings = settings;
+ InetAddress address = InetAddress.getByName(settings.getHost());
+ socket = new Socket();
+ socket.setReuseAddress(true);
+ socket.setTcpNoDelay(settings.isTcpNodelay());
- LOGGER.debug("SO_RCVBUF : %s", _socket.getReceiveBufferSize());
- LOGGER.debug("SO_SNDBUF : %s", _socket.getSendBufferSize());
+ log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+ log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
- InetAddress address = InetAddress.getByName(settings.getHost());
+ socket.setSendBufferSize(settings.getWriteBufferSize());
+ socket.setReceiveBufferSize(settings.getReadBufferSize());
+
+ log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+ log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
- _socket.connect(new InetSocketAddress(address, settings.getPort()));
+ socket.connect(new InetSocketAddress(address, settings.getPort()));
}
catch (SocketException e)
{
@@ -73,159 +81,36 @@ public class IoNetworkTransport implemen
{
throw new TransportException("Error connecting to broker", e);
}
+ }
- try
- {
- _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout);
- _connection.start();
- }
- catch(Exception e)
- {
- try
- {
- _socket.close();
- }
- catch(IOException ioe)
- {
- //ignored, throw based on original exception
- }
-
- throw new TransportException("Error creating network connection", e);
- }
+ public void receiver(Receiver<ByteBuffer> delegate)
+ {
+ receiver = new IoReceiver(this, delegate,
+ 2*settings.getReadBufferSize() , timeout);
+ }
- return _connection;
+ public Sender<ByteBuffer> sender()
+ {
+ return new IoSender(this, 2*settings.getWriteBufferSize(), timeout);
}
public void close()
{
- if(_connection != null)
- {
- _connection.close();
- }
- if(_acceptor != null)
- {
- _acceptor.close();
- }
+
}
- public NetworkConnection getConnection()
+ public Sender<ByteBuffer> getSender()
{
- return _connection;
+ return sender;
}
- public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext)
+ public IoReceiver getReceiver()
{
-
- try
- {
- _acceptor = new AcceptingThread(config, factory, sslContext);
-
- _acceptor.start();
- }
- catch (IOException e)
- {
- throw new TransportException("Unable to start server socket", e);
- }
-
-
+ return receiver;
}
- private class AcceptingThread extends Thread
+ public Socket getSocket()
{
- private NetworkTransportConfiguration _config;
- private ProtocolEngineFactory _factory;
- private SSLContext _sslContent;
- private ServerSocket _serverSocket;
-
- private AcceptingThread(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory,
- SSLContext sslContext)
- throws IOException
- {
- _config = config;
- _factory = factory;
- _sslContent = sslContext;
-
- InetSocketAddress address = new InetSocketAddress(config.getHost(), config.getPort());
-
- if(sslContext == null)
- {
- _serverSocket = new ServerSocket();
- }
- else
- {
- SSLServerSocketFactory socketFactory = sslContext.getServerSocketFactory();
- _serverSocket = socketFactory.createServerSocket();
- }
-
- _serverSocket.bind(address);
- _serverSocket.setReuseAddress(true);
-
-
- }
-
-
- /**
- Close the underlying ServerSocket if it has not already been closed.
- */
- public void close()
- {
- if (!_serverSocket.isClosed())
- {
- try
- {
- _serverSocket.close();
- }
- catch (IOException e)
- {
- throw new TransportException(e);
- }
- }
- }
-
- @Override
- public void run()
- {
- try
- {
- while (true)
- {
- try
- {
- Socket socket = _serverSocket.accept();
- socket.setTcpNoDelay(_config.getTcpNoDelay());
-
- final Integer sendBufferSize = _config.getSendBufferSize();
- final Integer receiveBufferSize = _config.getReceiveBufferSize();
-
- socket.setSendBufferSize(sendBufferSize);
- socket.setReceiveBufferSize(receiveBufferSize);
-
- ProtocolEngine engine = _factory.newProtocolEngine();
-
- NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout);
-
-
- engine.setNetworkConnection(connection, connection.getSender());
-
- connection.start();
-
-
- }
- catch(RuntimeException e)
- {
- LOGGER.error(e, "Error in Acceptor thread " + _config.getPort());
- }
- }
- }
- catch (IOException e)
- {
- LOGGER.debug(e, "SocketException - no new connections will be accepted on port "
- + _config.getPort());
- }
- }
-
-
+ return socket;
}
-
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Fri Oct 21 01:19:00 2011
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.transport.network.io;
-import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
@@ -38,77 +37,56 @@ import java.util.concurrent.atomic.Atomi
*
*/
-final class IoReceiver implements Runnable, Closeable
+final class IoReceiver implements Runnable
{
private static final Logger log = Logger.get(IoReceiver.class);
+ private final IoContext ioCtx;
private final Receiver<ByteBuffer> receiver;
private final int bufferSize;
private final Socket socket;
private final long timeout;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread receiverThread;
- private static final boolean shutdownBroken;
- static
- {
- String osName = System.getProperty("os.name");
- shutdownBroken = osName == null ? false : osName.matches("(?i).*windows.*");
- }
+ private final boolean shutdownBroken =
+ ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*");
- public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout)
+ public IoReceiver(IoContext ioCtx, Receiver<ByteBuffer> receiver,
+ int bufferSize, long timeout)
{
+ this.ioCtx = ioCtx;
this.receiver = receiver;
this.bufferSize = bufferSize;
- this.socket = socket;
+ this.socket = ioCtx.getSocket();
this.timeout = timeout;
try
{
- //Create but deliberately don't start the thread.
receiverThread = Threading.getThreadFactory().createThread(this);
}
catch(Exception e)
{
- throw new RuntimeException("Error creating IOReceiver thread",e);
+ throw new Error("Error creating IOReceiver thread",e);
}
receiverThread.setDaemon(true);
receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
- }
-
- public void initiate()
- {
receiverThread.start();
}
- public void close()
- {
- close(false);
- }
-
void close(boolean block)
{
if (!closed.getAndSet(true))
{
try
{
- try
+ if (shutdownBroken)
{
- if (shutdownBroken)
- {
- socket.close();
- }
- else
- {
- socket.shutdownInput();
- }
+ socket.close();
}
- catch(SocketException se)
+ else
{
- if(!socket.isClosed() && !socket.isInputShutdown())
- {
- throw se;
- }
+ socket.shutdownInput();
}
if (block && Thread.currentThread() != receiverThread)
{
@@ -127,7 +105,6 @@ final class IoReceiver implements Runnab
{
throw new TransportException(e);
}
-
}
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Fri Oct 21 01:19:00 2011
@@ -24,14 +24,10 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
@@ -47,6 +43,7 @@ public final class IoSender implements R
// we can test other cases as well
private final static int START = Integer.MAX_VALUE - 10;
+ private final IoContext ioCtx;
private final long timeout;
private final Socket socket;
private final OutputStream out;
@@ -59,13 +56,14 @@ public final class IoSender implements R
private final Object notEmpty = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread senderThread;
- private final List<Closeable> _listeners = new ArrayList<Closeable>();
-
+
private volatile Throwable exception = null;
- public IoSender(Socket socket, int bufferSize, long timeout)
+
+ public IoSender(IoContext ioCtx, int bufferSize, long timeout)
{
- this.socket = socket;
+ this.ioCtx = ioCtx;
+ this.socket = ioCtx.getSocket();
this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2
this.timeout = timeout;
@@ -80,20 +78,15 @@ public final class IoSender implements R
try
{
- //Create but deliberately don't start the thread.
- senderThread = Threading.getThreadFactory().createThread(this);
+ senderThread = Threading.getThreadFactory().createThread(this);
}
catch(Exception e)
{
throw new Error("Error creating IOSender thread",e);
}
-
+
senderThread.setDaemon(true);
senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
- }
-
- public void initiate()
- {
senderThread.start();
}
@@ -111,11 +104,7 @@ public final class IoSender implements R
{
if (closed.get())
{
- throw new SenderClosedException("sender is closed", exception);
- }
- if(!senderThread.isAlive())
- {
- throw new SenderException("sender thread not alive");
+ throw new SenderException("sender is closed", exception);
}
final int size = buffer.length;
@@ -148,7 +137,7 @@ public final class IoSender implements R
if (closed.get())
{
- throw new SenderClosedException("sender is closed", exception);
+ throw new SenderException("sender is closed", exception);
}
if (head - tail >= size)
@@ -215,20 +204,16 @@ public final class IoSender implements R
senderThread.join(timeout);
if (senderThread.isAlive())
{
- log.error("join timed out");
throw new SenderException("join timed out");
}
}
+ ioCtx.getReceiver().close(false);
}
catch (InterruptedException e)
{
- log.error("interrupted whilst waiting for sender thread to stop");
throw new SenderException(e);
}
- finally
- {
- closeListeners();
- }
+
if (reportException && exception != null)
{
throw new SenderException(exception);
@@ -236,31 +221,9 @@ public final class IoSender implements R
}
}
- private void closeListeners()
- {
- Exception ex = null;
- for(Closeable listener : _listeners)
- {
- try
- {
- listener.close();
- }
- catch(Exception e)
- {
- log.error("Exception closing listener: " + e.getMessage());
- ex = e;
- }
- }
-
- if (ex != null)
- {
- throw new SenderException(ex.getMessage(), ex);
- }
- }
-
public void run()
{
- final int size = buffer.length;
+ final int size = buffer.length;
while (true)
{
final int hd = head;
@@ -341,9 +304,4 @@ public final class IoSender implements R
throw new SenderException(e);
}
}
-
- public void registerCloseListener(Closeable listener)
- {
- _listeners.add(listener);
- }
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java Fri Oct 21 01:19:00 2011
@@ -25,8 +25,8 @@ import java.nio.ByteBuffer;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
-import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
@@ -37,12 +37,149 @@ import org.apache.qpid.transport.network
import org.apache.qpid.transport.network.security.ssl.SSLSender;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
-public interface SecurityLayer
+public class SecurityLayer
{
-
- public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate);
- public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate);
- public String getUserID();
-
+ ConnectionSettings settings;
+ Connection con;
+ SSLSecurityLayer sslLayer;
+ SASLSecurityLayer saslLayer;
+
+ public void init(Connection con) throws TransportException
+ {
+ this.con = con;
+ this.settings = con.getConnectionSettings();
+ if (settings.isUseSSL())
+ {
+ sslLayer = new SSLSecurityLayer();
+ }
+ if (settings.isUseSASLEncryption())
+ {
+ saslLayer = new SASLSecurityLayer();
+ }
+
+ }
+
+ public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
+ {
+ Sender<ByteBuffer> sender = delegate;
+
+ if (settings.isUseSSL())
+ {
+ sender = sslLayer.sender(sender);
+ }
+
+ if (settings.isUseSASLEncryption())
+ {
+ sender = saslLayer.sender(sender);
+ }
+
+ return sender;
+ }
+
+ public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
+ {
+ Receiver<ByteBuffer> receiver = delegate;
+
+ if (settings.isUseSSL())
+ {
+ receiver = sslLayer.receiver(receiver);
+ }
+
+ if (settings.isUseSASLEncryption())
+ {
+ receiver = saslLayer.receiver(receiver);
+ }
+
+ return receiver;
+ }
+
+ public String getUserID()
+ {
+ if (settings.isUseSSL())
+ {
+ return sslLayer.getUserID();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ class SSLSecurityLayer
+ {
+ SSLEngine engine;
+ SSLSender sender;
+
+ public SSLSecurityLayer()
+ {
+ SSLContext sslCtx;
+ try
+ {
+ sslCtx = SSLUtil.createSSLContext(settings);
+ }
+ catch (Exception e)
+ {
+ throw new TransportException("Error creating SSL Context", e);
+ }
+
+ try
+ {
+ engine = sslCtx.createSSLEngine();
+ engine.setUseClientMode(true);
+ }
+ catch(Exception e)
+ {
+ throw new TransportException("Error creating SSL Engine", e);
+ }
+ }
+
+ public SSLSender sender(Sender<ByteBuffer> delegate)
+ {
+ sender = new SSLSender(engine,delegate);
+ sender.setConnectionSettings(settings);
+ return sender;
+ }
+
+ public SSLReceiver receiver(Receiver<ByteBuffer> delegate)
+ {
+ if (sender == null)
+ {
+ throw new
+ IllegalStateException("SecurityLayer.sender method should be " +
+ "invoked before SecurityLayer.receiver");
+ }
+
+ SSLReceiver receiver = new SSLReceiver(engine,delegate,sender);
+ receiver.setConnectionSettings(settings);
+ return receiver;
+ }
+
+ public String getUserID()
+ {
+ return SSLUtil.retriveIdentity(engine);
+ }
+
+ }
+
+ class SASLSecurityLayer
+ {
+ public SASLSecurityLayer()
+ {
+ }
+
+ public SASLSender sender(Sender<ByteBuffer> delegate)
+ {
+ SASLSender sender = new SASLSender(delegate);
+ con.addConnectionListener((ConnectionListener)sender);
+ return sender;
+ }
+
+ public SASLReceiver receiver(Receiver<ByteBuffer> delegate)
+ {
+ SASLReceiver receiver = new SASLReceiver(delegate);
+ con.addConnectionListener((ConnectionListener)receiver);
+ return receiver;
+ }
+
+ }
}
-
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java Fri Oct 21 01:19:00 2011
@@ -43,7 +43,8 @@ public class SASLSender extends SASLEncr
this.delegate = delegate;
log.debug("SASL Sender enabled");
}
-
+
+ @Override
public void close()
{
@@ -64,11 +65,13 @@ public class SASLSender extends SASLEncr
}
}
+ @Override
public void flush()
{
delegate.flush();
}
+ @Override
public void send(ByteBuffer buf)
{
if (closed.get())
@@ -105,6 +108,7 @@ public class SASLSender extends SASLEncr
}
}
+ @Override
public void setIdleTimeout(int i)
{
delegate.setIdleTimeout(i);
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.transport.network.security.ssl;
-import java.io.IOException;
import java.net.Socket;
-import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.Principal;
import java.security.PrivateKey;
@@ -42,7 +40,7 @@ public class QpidClientX509KeyManager ex
String alias;
public QpidClientX509KeyManager(String alias, String keyStorePath,
- String keyStorePassword,String keyStoreCertType) throws GeneralSecurityException, IOException
+ String keyStorePassword,String keyStoreCertType) throws Exception
{
this.alias = alias;
KeyStore ks = SSLUtil.getInitializedKeyStore(keyStorePath,keyStorePassword);
@@ -50,45 +48,51 @@ public class QpidClientX509KeyManager ex
kmf.init(ks, keyStorePassword.toCharArray());
this.delegate = (X509ExtendedKeyManager)kmf.getKeyManagers()[0];
}
-
+
+ @Override
public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket)
{
log.debug("chooseClientAlias:Returning alias " + alias);
return alias;
}
+ @Override
public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket)
{
return delegate.chooseServerAlias(keyType, issuers, socket);
}
+ @Override
public X509Certificate[] getCertificateChain(String alias)
{
return delegate.getCertificateChain(alias);
}
+ @Override
public String[] getClientAliases(String keyType, Principal[] issuers)
{
log.debug("getClientAliases:Returning alias " + alias);
return new String[]{alias};
}
+ @Override
public PrivateKey getPrivateKey(String alias)
{
return delegate.getPrivateKey(alias);
}
+ @Override
public String[] getServerAliases(String keyType, Principal[] issuers)
{
return delegate.getServerAliases(keyType, issuers);
}
-
+
public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine)
{
log.debug("chooseEngineClientAlias:Returning alias " + alias);
return alias;
}
-
+
public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine)
{
return delegate.chooseEngineServerAlias(keyType, issuers, engine);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org