You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/03/19 16:16:47 UTC
svn commit: r925261 - in
/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport: ./
network/ network/io/ network/security/
Author: rajith
Date: Fri Mar 19 15:16:46 2010
New Revision: 925261
URL: http://svn.apache.org/viewvc?rev=925261&view=rev
Log:
Did some general refactoring to support QPID-2444 QPID-2445 QPID-2446 QPID-2447
1. Provided a mechanism to register any transport using qpid.transport jvm arg.
2. Created a TransportBuilder class which abstracts the building of receiver and sender pipes.
3. Created a ConnectionSettings class that contains all connection level parameters. This enhancement is also required to support QPID-2343
4. Added SecurityLayer class that adds the nessacery codecs and cordination to retrive the userID for mechanisms like EXTERNAL
Added:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=925261&r1=925260&r2=925261&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Fri Mar 19 15:16:46 2010
@@ -25,14 +25,7 @@ import static org.apache.qpid.transport.
import static org.apache.qpid.transport.Connection.State.NEW;
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
-import org.apache.qpid.transport.network.ConnectionBinding;
-import org.apache.qpid.transport.network.io.IoTransport;
-import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.transport.util.Waiter;
-import org.apache.qpid.util.Strings;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslServer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -40,6 +33,14 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslServer;
+
+import org.apache.qpid.transport.network.security.SecurityLayer;
+import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.util.Waiter;
+import org.apache.qpid.util.Strings;
+
/**
* Connection
@@ -109,7 +110,8 @@ public class Connection extends Connecti
private Map<String,Object> _serverProperties;
private String userID;
private ConnectionSettings conSettings;
-
+ private SecurityLayer securityLayer;
+
// want to make this final
private int _connectionId;
@@ -215,10 +217,17 @@ public class Connection extends Connecti
userID = settings.getUsername();
delegate = new ClientDelegate(settings);
- IoTransport.connect(settings.getHost(),
+ /*IoTransport.connect(settings.getHost(),
settings.getPort(),
ConnectionBinding.get(this),
- settings.isUseSSL());
+ settings.isUseSSL());*/
+
+ TransportBuilder transport = new TransportBuilder();
+ transport.init(this);
+ this.sender = transport.buildSenderPipe();
+ transport.buildReceiverPipe(this);
+ this.securityLayer = transport.getSecurityLayer();
+
send(new ProtocolHeader(1, 0, 10));
Waiter w = new Waiter(lock, timeout);
@@ -633,5 +642,10 @@ public class Connection extends Connecti
{
return conSettings;
}
+
+ public SecurityLayer getSecurityLayer()
+ {
+ return securityLayer;
+ }
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java?rev=925261&r1=925260&r2=925261&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java Fri Mar 19 15:16:46 2010
@@ -36,47 +36,32 @@ public class ConnectionSettings
String username = "guest";
String password = "guest";
int port = 5672;
- boolean tcpNodelay;
+ boolean tcpNodelay = Boolean.getBoolean("amqj.tcp_nodelay");
int maxChannelCount = 32767;
int maxFrameSize = 65535;
int heartbeatInterval;
+ int readBufferSize = 65535;
+ int writeBufferSize = 65535;
+ long transportTimeout = 60000;
// SSL props
boolean useSSL;
- String keyStorePath;
- String keyStorePassword;
- String keyStoreCertType;
- String trustStoreCertType;
- String trustStorePath;
- String trustStorePassword;
+ String keyStorePath = System.getProperty("javax.net.ssl.keyStore");
+ String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword");
+ String keyStoreCertType = System.getProperty("qpid.ssl.keyStoreCertType","SunX509");;
+ String trustStoreCertType = System.getProperty("qpid.ssl.trustStoreCertType","SunX509");;
+ String trustStorePath = System.getProperty("javax.net.ssl.trustStore");;
+ String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");;
String certAlias;
boolean verifyHostname;
// SASL props
- String saslMechs = "PLAIN";
- String saslProtocol = "AMQP";
- String saslServerName = "localhost";
+ String saslMechs = System.getProperty("qpid.sasl_mechs", "PLAIN");
+ String saslProtocol = System.getProperty("qpid.sasl_protocol", "AMQP");
+ String saslServerName = System.getProperty("qpid.sasl_server_name", "localhost");
boolean useSASLEncryption;
-
- private Connection owner;
-
+
private Map<String, Object> _clientProperties;
-
- public Connection getConnection()
- {
- return owner;
- }
-
- public void setConnection(Connection owner)
- {
- if (this.owner != null)
- {
- throw new IllegalStateException(
- "A ConnectionSettings instance can be associated" +
- " with one and only one Connection instance");
- }
- this.owner = owner;
- }
public boolean isTcpNodelay()
{
@@ -318,4 +303,34 @@ public class ConnectionSettings
this.trustStoreCertType = trustStoreCertType;
}
+ public int getReadBufferSize()
+ {
+ return readBufferSize;
+ }
+
+ public void setReadBufferSize(int readBufferSize)
+ {
+ this.readBufferSize = readBufferSize;
+ }
+
+ public int getWriteBufferSize()
+ {
+ return writeBufferSize;
+ }
+
+ public void setWriteBufferSize(int writeBufferSize)
+ {
+ this.writeBufferSize = writeBufferSize;
+ }
+
+ public long getTransportTimeout()
+ {
+ return transportTimeout;
+ }
+
+ public void setTransportTimeout(long transportTimeout)
+ {
+ this.transportTimeout = transportTimeout;
+ }
+
}
Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java?rev=925261&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java Fri Mar 19 15:16:46 2010
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.NetworkTransport;
+import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.security.SecurityLayer;
+
+public class TransportBuilder
+{
+ private Connection con;
+ private ConnectionSettings settings;
+ private NetworkTransport transport;
+ private SecurityLayer securityLayer = new SecurityLayer();
+
+ public void init(Connection con) throws TransportException
+ {
+ this.con = con;
+ this.settings = con.getConnectionSettings();
+ transport = Transport.getTransport();
+ transport.init(settings);
+ securityLayer.init(con);
+ }
+
+ public Sender<ProtocolEvent> buildSenderPipe()
+ {
+ ConnectionSettings settings = con.getConnectionSettings();
+
+ // Io layer
+ Sender<ByteBuffer> sender = transport.sender();
+
+ // Security layer
+ sender = securityLayer.sender(sender);
+
+ Disassembler dis = new Disassembler(sender, settings.getMaxFrameSize());
+ return dis;
+ }
+
+ public void buildReceiverPipe(Receiver<ProtocolEvent> delegate)
+ {
+ ConnectionSettings settings = con.getConnectionSettings();
+
+ Receiver<ByteBuffer> receiver = new InputHandler(new Assembler(delegate));
+
+ // Security layer
+ receiver = securityLayer.receiver(receiver);
+
+ //Io layer
+ transport.receiver(receiver);
+ }
+
+ public SecurityLayer getSecurityLayer()
+ {
+ return securityLayer;
+ }
+
+}
\ No newline at end of file
Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java?rev=925261&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java Fri Mar 19 15:16:46 2010
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ConnectionSettings;
+
+public interface NetworkTransport
+{
+ public void init(ConnectionSettings settings);
+
+ public Sender<ByteBuffer> sender();
+
+ public void receiver(Receiver<ByteBuffer> delegate);
+
+ public void close();
+}
\ No newline at end of file
Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java?rev=925261&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java Fri Mar 19 15:16:46 2010
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network;
+
+import org.apache.qpid.transport.TransportException;
+
+public class Transport
+{
+ private final static Class<?> transportClass;
+
+ static
+ {
+ try
+ {
+ transportClass =
+ Class.forName(System.getProperty("qpid.transport",
+ "org.apache.qpid.transport.network.io.IoNetworkTransport"));
+
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error occured while loading Qpid Transport",e);
+ }
+ }
+
+ public static NetworkTransport getTransport() throws TransportException
+ {
+ try
+ {
+ return (NetworkTransport)transportClass.newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new TransportException("Error while creating a new transport instance",e);
+ }
+ }
+}
\ No newline at end of file
Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java?rev=925261&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java Fri Mar 19 15:16:46 2010
@@ -0,0 +1,15 @@
+package org.apache.qpid.transport.network.io;
+
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Sender;
+
+public interface IoContext
+{
+ Sender<ByteBuffer> getSender();
+
+ IoReceiver getReceiver();
+
+ Socket getSocket();
+}
Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=925261&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Fri Mar 19 15:16:46 2010
@@ -0,0 +1,120 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.NetworkTransport;
+import org.apache.qpid.transport.util.Logger;
+
+public class IoNetworkTransport implements NetworkTransport, IoContext
+{
+ static
+ {
+ org.apache.mina.common.ByteBuffer.setAllocator
+ (new org.apache.mina.common.SimpleByteBufferAllocator());
+ org.apache.mina.common.ByteBuffer.setUseDirectBuffers
+ (Boolean.getBoolean("amqj.enableDirectBuffers"));
+ }
+
+ private static final Logger log = Logger.get(IoNetworkTransport.class);
+
+ private Socket socket;
+ private Sender<ByteBuffer> sender;
+ private IoReceiver receiver;
+ private long timeout = 60000;
+ private ConnectionSettings settings;
+
+ @Override
+ public void init(ConnectionSettings settings)
+ {
+ try
+ {
+ this.settings = settings;
+ InetAddress address = InetAddress.getByName(settings.getHost());
+ socket = new Socket();
+ socket.setReuseAddress(true);
+ socket.setTcpNoDelay(settings.isTcpNodelay());
+
+ log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+ log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
+
+ socket.setSendBufferSize(settings.getWriteBufferSize());
+ socket.setReceiveBufferSize(settings.getReadBufferSize());
+
+ log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+ log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
+
+ socket.connect(new InetSocketAddress(address, settings.getPort()));
+ }
+ catch (SocketException e)
+ {
+ throw new TransportException("Error connecting to broker", e);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Error connecting to broker", e);
+ }
+ }
+
+ @Override
+ public void receiver(Receiver<ByteBuffer> delegate)
+ {
+ receiver = new IoReceiver(this, delegate,
+ 2*settings.getReadBufferSize() , timeout);
+ }
+
+ @Override
+ public Sender<ByteBuffer> sender()
+ {
+ return new IoSender(this, 2*settings.getWriteBufferSize(), timeout);
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ public Sender<ByteBuffer> getSender()
+ {
+ return sender;
+ }
+
+ public IoReceiver getReceiver()
+ {
+ return receiver;
+ }
+
+ public Socket getSocket()
+ {
+ return socket;
+ }
+}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=925261&r1=925260&r2=925261&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Fri Mar 19 15:16:46 2010
@@ -42,7 +42,7 @@ final class IoReceiver implements Runnab
private static final Logger log = Logger.get(IoReceiver.class);
- private final IoTransport transport;
+ private final IoContext ioCtx;
private final Receiver<ByteBuffer> receiver;
private final int bufferSize;
private final Socket socket;
@@ -52,13 +52,13 @@ final class IoReceiver implements Runnab
private final boolean shutdownBroken =
((String) System.getProperties().get("os.name")).matches("(?i).*windows.*");
- public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver,
+ public IoReceiver(IoContext ioCtx, Receiver<ByteBuffer> receiver,
int bufferSize, long timeout)
{
- this.transport = transport;
+ this.ioCtx = ioCtx;
this.receiver = receiver;
this.bufferSize = bufferSize;
- this.socket = transport.getSocket();
+ this.socket = ioCtx.getSocket();
this.timeout = timeout;
try
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=925261&r1=925260&r2=925261&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Fri Mar 19 15:16:46 2010
@@ -43,7 +43,7 @@ public final class IoSender implements R
// we can test other cases as well
private final static int START = Integer.MAX_VALUE - 10;
- private final IoTransport transport;
+ private final IoContext ioCtx;
private final long timeout;
private final Socket socket;
private final OutputStream out;
@@ -60,10 +60,10 @@ public final class IoSender implements R
private volatile Throwable exception = null;
- public IoSender(IoTransport transport, int bufferSize, long timeout)
+ public IoSender(IoContext ioCtx, int bufferSize, long timeout)
{
- this.transport = transport;
- this.socket = transport.getSocket();
+ this.ioCtx = ioCtx;
+ this.socket = ioCtx.getSocket();
this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2
this.timeout = timeout;
@@ -207,7 +207,7 @@ public final class IoSender implements R
throw new SenderException("join timed out");
}
}
- transport.getReceiver().close(false);
+ ioCtx.getReceiver().close(false);
}
catch (InterruptedException e)
{
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=925261&r1=925260&r2=925261&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java Fri Mar 19 15:16:46 2010
@@ -51,7 +51,7 @@ import org.apache.qpid.transport.util.Lo
* SO_RCVBUF - amqj.receiveBufferSize
* SO_SNDBUF - amqj.sendBufferSize
*/
-public final class IoTransport<E>
+public final class IoTransport<E> implements IoContext
{
static
@@ -119,17 +119,17 @@ public final class IoTransport<E>
}
}
- Sender<ByteBuffer> getSender()
+ public Sender<ByteBuffer> getSender()
{
return sender;
}
- IoReceiver getReceiver()
+ public IoReceiver getReceiver()
{
return receiver;
}
- Socket getSocket()
+ public Socket getSocket()
{
return socket;
}
Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java?rev=925261&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java Fri Mar 19 15:16:46 2010
@@ -0,0 +1,185 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.security;
+
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionListener;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
+import org.apache.qpid.transport.network.security.sasl.SASLSender;
+import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
+import org.apache.qpid.transport.network.security.ssl.SSLSender;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+
+public class SecurityLayer
+{
+ ConnectionSettings settings;
+ Connection con;
+ SSLSecurityLayer sslLayer;
+ SASLSecurityLayer saslLayer;
+
+ public void init(Connection con) throws TransportException
+ {
+ this.con = con;
+ this.settings = con.getConnectionSettings();
+ if (settings.isUseSSL())
+ {
+ sslLayer = new SSLSecurityLayer();
+ }
+ if (settings.isUseSASLEncryption())
+ {
+ saslLayer = new SASLSecurityLayer();
+ }
+
+ }
+
+ public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
+ {
+ Sender<ByteBuffer> sender = delegate;
+
+ if (settings.isUseSSL())
+ {
+ sender = sslLayer.sender(sender);
+ }
+
+ if (settings.isUseSASLEncryption())
+ {
+ sender = saslLayer.sender(sender);
+ }
+
+ return sender;
+ }
+
+ public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
+ {
+ Receiver<ByteBuffer> receiver = delegate;
+
+ if (settings.isUseSSL())
+ {
+ receiver = sslLayer.receiver(receiver);
+ }
+
+ if (settings.isUseSASLEncryption())
+ {
+ receiver = saslLayer.receiver(receiver);
+ }
+
+ return receiver;
+ }
+
+ public String getUserID()
+ {
+ if (settings.isUseSSL())
+ {
+ return sslLayer.getUserID();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ class SSLSecurityLayer
+ {
+ SSLEngine engine;
+ SSLSender sender;
+
+ public SSLSecurityLayer()
+ {
+ SSLContext sslCtx;
+ try
+ {
+ sslCtx = SSLUtil.createSSLContext(settings);
+ }
+ catch (Exception e)
+ {
+ throw new TransportException("Error creating SSL Context", e);
+ }
+
+ try
+ {
+ engine = sslCtx.createSSLEngine();
+ engine.setUseClientMode(true);
+ }
+ catch(Exception e)
+ {
+ throw new TransportException("Error creating SSL Engine", e);
+ }
+ }
+
+ public SSLSender sender(Sender<ByteBuffer> delegate)
+ {
+ sender = new SSLSender(engine,delegate);
+ sender.setConnectionSettings(settings);
+ return sender;
+ }
+
+ public SSLReceiver receiver(Receiver<ByteBuffer> delegate)
+ {
+ if (sender == null)
+ {
+ throw new
+ IllegalStateException("SecurityLayer.sender method should be " +
+ "invoked before SecurityLayer.receiver");
+ }
+
+ SSLReceiver receiver = new SSLReceiver(engine,delegate,sender);
+ receiver.setConnectionSettings(settings);
+ return receiver;
+ }
+
+ public String getUserID()
+ {
+ return null;
+ }
+
+ }
+
+ class SASLSecurityLayer
+ {
+ public SASLSecurityLayer()
+ {
+ }
+
+ public SASLSender sender(Sender<ByteBuffer> delegate)
+ {
+ SASLSender sender = new SASLSender(delegate);
+ con.addConnectionListener((ConnectionListener)sender);
+ return sender;
+ }
+
+ public SASLReceiver receiver(Receiver<ByteBuffer> delegate)
+ {
+ SASLReceiver receiver = new SASLReceiver(delegate);
+ con.addConnectionListener((ConnectionListener)receiver);
+ return receiver;
+ }
+
+ }
+}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org