You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/09/01 18:27:55 UTC
svn commit: r810110 [1/2] - in
/qpid/branches/java-network-refactor/qpid/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/configuration/
broker/src/main/java/org/apache/qpid/server/connection/ broker/src/ma...
Author: aidan
Date: Tue Sep 1 16:27:52 2009
New Revision: 810110
URL: http://svn.apache.org/viewvc?rev=810110&view=rev
Log:
QPID-2025: Add a AMQProtocolEngine from the de-MINAfied AMQMinaProtocolSession. Remove various now-unused classes and update references. Add tests for AMQDecoder. Net -1500 lines, +25% performance on transient messaging. Nice.
Added:
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
- copied, changed from r810108, qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java
qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/
qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java
Removed:
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
Modified:
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Tue Sep 1 16:27:52 2009
@@ -20,6 +20,13 @@
*/
package org.apache.qpid.server;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Properties;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
@@ -30,16 +37,9 @@
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.xml.QpidLog4JConfigurator;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.FixedSizeByteBufferAllocator;
-import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.mina.util.NewThreadExecutor;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
import org.apache.qpid.server.information.management.ServerInformationMBean;
@@ -48,19 +48,13 @@
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.management.LoggingManagementMBean;
import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
-import org.apache.qpid.server.protocol.AMQPProtocolProvider;
+import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.transport.QpidAcceptor;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Properties;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
/**
* Main entry point for AMQPD.
@@ -300,20 +294,6 @@
_brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion()
+ " build: " + QpidProperties.getBuildVersion());
- ByteBuffer.setUseDirectBuffers(serverConfig.getEnableDirectBuffers());
-
- // the MINA default is currently to use the pooled allocator although this may change in future
- // once more testing of the performance of the simple allocator has been done
- if (!serverConfig.getEnablePooledAllocator())
- {
- ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
- }
-
- if (serverConfig.getUseBiasedWrites())
- {
- System.setProperty("org.apache.qpid.use_write_biased_pool", "true");
- }
-
int port = serverConfig.getPort();
String portStr = commandLine.getOptionValue("p");
@@ -329,7 +309,54 @@
}
}
- bind(port, serverConfig);
+ String bindAddr = commandLine.getOptionValue("b");
+ if (bindAddr == null)
+ {
+ bindAddr = serverConfig.getBind();
+ }
+ InetAddress bindAddress = null;
+ if (bindAddr.equals("wildcard"))
+ {
+ bindAddress = new InetSocketAddress(port).getAddress();
+ }
+ else
+ {
+ bindAddress = InetAddress.getByAddress(parseIP(bindAddr));
+ }
+
+ String keystorePath = serverConfig.getKeystorePath();
+ String keystorePassword = serverConfig.getKeystorePassword();
+ String certType = serverConfig.getCertType();
+ SSLContextFactory sslFactory = null;
+ boolean isSsl = false;
+
+ if (!serverConfig.getSSLOnly())
+ {
+ NetworkDriver driver = new MINANetworkDriver();
+ driver.bind(port, new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(),
+ serverConfig.getNetworkConfiguration(), null);
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+ new QpidAcceptor(driver,"TCP"));
+ CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port));
+ }
+
+ if (serverConfig.getEnableSSL())
+ {
+ sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
+ NetworkDriver driver = new MINANetworkDriver();
+ driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress},
+ new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory);
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+ new QpidAcceptor(driver,"TCP"));
+ CurrentActor.get().message(BrokerMessages.BRK_1002("TCP/SSL", serverConfig.getSSLPort()));
+ }
+
+ //fixme qpid.AMQP should be using qpidproperties to get value
+ _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
+ + " build: " + QpidProperties.getBuildVersion());
+
+ CurrentActor.get().message(BrokerMessages.BRK_1004());
+
}
finally
{
@@ -358,114 +385,6 @@
}
}
- protected void bind(int port, ServerConfiguration config) throws BindException
- {
- String bindAddr = commandLine.getOptionValue("b");
- if (bindAddr == null)
- {
- bindAddr = config.getBind();
- }
-
- try
- {
- IoAcceptor acceptor;
-
- if (ApplicationRegistry.getInstance().getConfiguration().getQpidNIO())
- {
- _logger.warn("Using Qpid Multithreaded IO Processing");
- acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(config.getProcessors(), new NewThreadExecutor());
- }
- else
- {
- _logger.warn("Using Mina IO Processing");
- acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(config.getProcessors(), new NewThreadExecutor());
- }
-
- SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
- SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
-
- sc.setReceiveBufferSize(config.getReceiveBufferSize());
- sc.setSendBufferSize(config.getWriteBufferSize());
- sc.setTcpNoDelay(config.getTcpNoDelay());
-
- // if we do not use the executor pool threading model we get the default leader follower
- // implementation provided by MINA
- if (config.getEnableExecutorPool())
- {
- sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
- }
-
- if (!config.getEnableSSL() || !config.getSSLOnly())
- {
- AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
- InetSocketAddress bindAddress;
- if (bindAddr.equals("wildcard"))
- {
- bindAddress = new InetSocketAddress(port);
- }
- else
- {
- bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
- }
-
- bind(new QpidAcceptor(acceptor,"TCP"), bindAddress, handler, sconfig);
-
- //fixme qpid.AMQP should be using qpidproperties to get value
- _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
- }
-
- if (config.getEnableSSL())
- {
- AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
- try
- {
-
- bind(new QpidAcceptor(acceptor, "TCP/SSL"), new InetSocketAddress(config.getSSLPort()), handler, sconfig);
-
- //fixme qpid.AMQP should be using qpidproperties to get value
- _brokerLogger.info("Qpid.AMQP listening on SSL port " + config.getSSLPort());
-
- }
- catch (IOException e)
- {
- _brokerLogger.error("Unable to listen on SSL port: " + e, e);
- }
- }
-
- //fixme qpid.AMQP should be using qpidproperties to get value
- _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
- + " build: " + QpidProperties.getBuildVersion());
-
- CurrentActor.get().message(BrokerMessages.BRK_1004());
-
- }
- catch (Exception e)
- {
- _logger.error("Unable to bind service to registry: " + e, e);
- //fixme this need tidying up
- throw new BindException(e.getMessage());
- }
- }
-
- /**
- * Ensure that any bound Acceptors are recorded in the registry so they can be closed later.
- *
- * @param acceptor
- * @param bindAddress
- * @param handler
- * @param sconfig
- *
- * @throws IOException from the acceptor.bind command
- */
- private void bind(QpidAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfig) throws IOException
- {
- acceptor.getIoAcceptor().bind(bindAddress, handler, sconfig);
-
- CurrentActor.get().message(BrokerMessages.BRK_1002(acceptor.toString(), bindAddress.getPort()));
-
- ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor);
- }
-
public static void main(String[] args)
{
//if the -Dlog4j.configuration property has not been set, enable the init override
Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Tue Sep 1 16:27:52 2009
@@ -38,6 +38,7 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriverConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -564,7 +565,7 @@
public boolean getSSLOnly()
{
- return getConfig().getBoolean("connector.ssl.sslOnly", true);
+ return getConfig().getBoolean("connector.ssl.sslOnly", false);
}
public int getSSLPort()
@@ -613,4 +614,57 @@
getConfig().getLong("housekeeping.expiredMessageCheckPeriod",
DEFAULT_HOUSEKEEPING_PERIOD));
}
+
+ public NetworkDriverConfiguration getNetworkConfiguration()
+ {
+ return new NetworkDriverConfiguration()
+ {
+
+ public Integer getTrafficClass()
+ {
+ return null;
+ }
+
+ public Boolean getTcpNoDelay()
+ {
+ // Can't call parent getTcpNoDelay since it just calls this one
+ return getConfig().getBoolean("connector.tcpNoDelay", true);
+ }
+
+ public Integer getSoTimeout()
+ {
+ return null;
+ }
+
+ public Integer getSoLinger()
+ {
+ return null;
+ }
+
+ public Integer getSendBufferSize()
+ {
+ return getBufferWriteLimit();
+ }
+
+ public Boolean getReuseAddress()
+ {
+ return null;
+ }
+
+ public Integer getReceiveBufferSize()
+ {
+ return getBufferReadLimit();
+ }
+
+ public Boolean getOOBInline()
+ {
+ return null;
+ }
+
+ public Boolean getKeepAlive()
+ {
+ return null;
+ }
+ };
+ }
}
Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Tue Sep 1 16:27:52 2009
@@ -21,7 +21,6 @@
package org.apache.qpid.server.connection;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionException;
Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java Tue Sep 1 16:27:52 2009
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.connection;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
Copied: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (from r810108, qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?p2=qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java&p1=qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java&r1=810108&r2=810110&rev=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Tue Sep 1 16:27:52 2009
@@ -20,11 +20,25 @@
*/
package org.apache.qpid.server.protocol;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.JMException;
+import javax.security.sasl.SaslServer;
+
import org.apache.log4j.Logger;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.IoSession;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
@@ -36,8 +50,11 @@
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQProtocolHeaderException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
@@ -46,18 +63,23 @@
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.pool.Event;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.PoolingFilter;
import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -67,22 +89,10 @@
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Sender;
-import javax.management.JMException;
-import javax.security.sasl.SaslServer;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
+public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession
{
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
@@ -94,8 +104,6 @@
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
- private final IoSession _minaProtocolSession;
-
private AMQShortString _contextKey;
private AMQShortString _clientVersion = null;
@@ -135,47 +143,43 @@
private Principal _authorizedID;
private MethodDispatcher _dispatcher;
private ProtocolSessionIdentifier _sessionIdentifier;
-
- private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L;
- private org.apache.mina.common.WriteFuture _lastWriteFuture;
-
+
// Create a simple ID that increments for ever new Session
private final long _sessionID = idGenerator.getAndIncrement();
private AMQPConnectionActor _actor;
private LogSubject _logSubject;
+ private NetworkDriver _networkDriver;
+
+ private long _lastIoTime;
+
+ private long _writtenBytes;
+ private long _readBytes;
+
+ private Job _readJob;
+ private Job _writeJob;
+
+ private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+
public ManagedObject getManagedObject()
{
return _managedObject;
}
- public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
- throws AMQException
+ public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver)
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
- _minaProtocolSession = session;
- session.setAttachment(this);
-
- _codecFactory = codecFactory;
+ _networkDriver = driver;
+
+ _codecFactory = new AMQCodecFactory(true, this);
+
+ ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
+ _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false);
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
-
_actor.message(ConnectionMessages.CON_1001(null, null, false, false));
-
- try
- {
- IoServiceConfig config = session.getServiceConfig();
- ReadWriteThreadModel threadModel = (ReadWriteThreadModel) config.getThreadModel();
- threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
- threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
- }
- catch (RuntimeException e)
- {
- e.printStackTrace();
- throw e;
-
- }
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -191,16 +195,6 @@
}
}
- public IoSession getIOSession()
- {
- return _minaProtocolSession;
- }
-
- public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession)
- {
- return (AMQProtocolSession) minaProtocolSession.getAttachment();
- }
-
public long getSessionID()
{
return _sessionID;
@@ -211,6 +205,42 @@
return _actor;
}
+ @Override
+ public void received(final ByteBuffer msg)
+ {
+ _lastIoTime = System.currentTimeMillis();
+ try
+ {
+ final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
+ fireAsynchEvent(_readJob, new Event(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ // Decode buffer
+
+ for (AMQDataBlock dataBlock : dataBlocks)
+ {
+ try
+ {
+ dataBlockReceived(dataBlock);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ closeProtocolSession();
+ }
+ }
+ }
+ }));
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ closeProtocolSession();
+ }
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
@@ -314,21 +344,14 @@
null,
mechanisms.getBytes(),
locales.getBytes());
- _minaProtocolSession.write(responseBody.generateFrame(0));
+ _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer());
}
catch (AMQException e)
{
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
- _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
-
- // TODO: Close connection (but how to wait until message is sent?)
- // ritchiem 2006-12-04 will this not do?
- // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
- // future.join();
- // close connection
-
+ _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
}
}
@@ -437,8 +460,17 @@
public void writeFrame(AMQDataBlock frame)
{
_lastSent = frame;
-
- _lastWriteFuture = _minaProtocolSession.write(frame);
+ final ByteBuffer buf = frame.toNioByteBuffer();
+ _lastIoTime = System.currentTimeMillis();
+ _writtenBytes += buf.remaining();
+ fireAsynchEvent(_writeJob, new Event(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ _networkDriver.send(buf);
+ }
+ }));
}
public AMQShortString getContextKey()
@@ -539,7 +571,7 @@
{
_maxNoOfChannels = value;
}
-
+
public void commitTransactions(AMQChannel channel) throws AMQException
{
if ((channel != null) && channel.isTransactional())
@@ -555,7 +587,7 @@
channel.rollback();
}
}
-
+
/**
* Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
* subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
@@ -628,8 +660,8 @@
{
if (delay > 0)
{
- _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
- _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, (int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
+ _networkDriver.setMaxWriteIdle(delay);
+ _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
}
}
@@ -672,7 +704,7 @@
{
task.doTask(this);
}
-
+
_closed = true;
CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002());
@@ -699,21 +731,7 @@
public void closeProtocolSession()
{
- closeProtocolSession(true);
- }
-
- public void closeProtocolSession(boolean waitLast)
- {
- if (waitLast && (_lastWriteFuture != null))
- {
- _logger.debug("Waiting for last write to join.");
- _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- }
-
- _logger.debug("REALLY Closing protocol session:" + _minaProtocolSession);
- final CloseFuture future = _minaProtocolSession.close();
- future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
-
+ _networkDriver.close();
try
{
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
@@ -726,7 +744,7 @@
public String toString()
{
- return _minaProtocolSession.getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
+ return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
}
public String dump()
@@ -737,7 +755,7 @@
/** @return an object that can be used to identity */
public Object getKey()
{
- return _minaProtocolSession.getRemoteAddress();
+ return getRemoteAddress();
}
/**
@@ -748,7 +766,7 @@
*/
public String getLocalFQDN()
{
- SocketAddress address = _minaProtocolSession.getLocalAddress();
+ SocketAddress address = _networkDriver.getLocalAddress();
// we use the vmpipe address in some tests hence the need for this rather ugly test. The host
// information is used by SASL primary.
if (address instanceof InetSocketAddress)
@@ -764,7 +782,7 @@
throw new IllegalArgumentException("Unsupported socket address class: " + address);
}
}
-
+
public SaslServer getSaslServer()
{
return _saslServer;
@@ -837,7 +855,7 @@
public Object getClientIdentifier()
{
- return (_minaProtocolSession != null) ? _minaProtocolSession.getRemoteAddress() : null;
+ return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null;
}
public VirtualHost getVirtualHost()
@@ -867,7 +885,7 @@
{
_taskList.remove(task);
}
-
+
public ProtocolOutputConverter getProtocolOutputConverter()
{
return _protocolOutputConverter;
@@ -888,7 +906,7 @@
public SocketAddress getRemoteAddress()
{
- return _minaProtocolSession.getRemoteAddress();
+ return _networkDriver.getRemoteAddress();
}
public MethodRegistry getMethodRegistry()
@@ -901,23 +919,136 @@
return _dispatcher;
}
- public ProtocolSessionIdentifier getSessionIdentifier()
+ @Override
+ public void closed()
{
- return _sessionIdentifier;
+ try
+ {
+ closeSession();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Could not close protocol engine", e);
+ }
}
- public String getClientVersion()
+ @Override
+ public void readerIdle()
{
- return (_clientVersion == null) ? null : _clientVersion.toString();
+ // Nothing
+ }
+
+ @Override
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
+ @Override
+ public void writerIdle()
+ {
+ _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer());
}
- public void setSender(Sender<java.nio.ByteBuffer> sender)
+ @Override
+ public void exception(Throwable throwable)
{
- // No-op, interface munging between this and AMQProtocolSession
+ if (throwable instanceof AMQProtocolHeaderException)
+ {
+
+ writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+ _networkDriver.close();
+
+ _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable);
+ }
+ else if (throwable instanceof IOException)
+ {
+ _logger.error("IOException caught in" + this + ", session closed implictly: " + throwable);
+ }
+ else
+ {
+ _logger.error("Exception caught in" + this + ", closing session explictly: " + throwable, throwable);
+
+
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion());
+ ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
+
+ writeFrame(closeBody.generateFrame(0));
+
+ _networkDriver.close();
+ }
}
+ @Override
public void init()
{
- // No-op, interface munging between this and AMQProtocolSession
+ // Do nothing
+ }
+
+ @Override
+ public void setSender(Sender<ByteBuffer> sender)
+ {
+ // Do nothing
+ }
+
+ @Override
+ public long getReadBytes()
+ {
+ return _readBytes;
+ }
+
+ public long getWrittenBytes()
+ {
+ return _writtenBytes;
+ }
+
+ public long getLastIoTime()
+ {
+ return _lastIoTime;
+ }
+
+ public ProtocolSessionIdentifier getSessionIdentifier()
+ {
+ return _sessionIdentifier;
+ }
+
+ public String getClientVersion()
+ {
+ return (_clientVersion == null) ? null : _clientVersion.toString();
+ }
+
+ /**
+ * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
+ *
+ * @param job The job.
+ * @param event The event to hand off asynchronously.
+ */
+ void fireAsynchEvent(Job job, Event event)
+ {
+
+ job.add(event);
+
+ final ExecutorService pool = _poolReference .getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
+
+ // rather than perform additional checks on pool to check that it hasn't shutdown.
+ // catch the RejectedExecutionException that will result from executing on a shutdown pool
+ if (job.activate())
+ {
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+ }
+
}
+
}
Added: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java?rev=810110&view=auto
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java (added)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java Tue Sep 1 16:27:52 2009
@@ -0,0 +1,29 @@
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriver;
+
+public class AMQProtocolEngineFactory implements ProtocolEngineFactory
+{
+ private VirtualHostRegistry _vhosts;
+
+ public AMQProtocolEngineFactory()
+ {
+ this(1);
+ }
+
+ public AMQProtocolEngineFactory(Integer port)
+ {
+ _vhosts = ApplicationRegistry.getInstance(port).getVirtualHostRegistry();
+ }
+
+
+ public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
+ {
+ return new AMQProtocolEngine(_vhosts, networkDriver);
+ }
+
+}
Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Tue Sep 1 16:27:52 2009
@@ -34,6 +34,7 @@
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.security.Principal;
+import java.util.List;
public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
@@ -210,5 +211,19 @@
public MethodDispatcher getMethodDispatcher();
public ProtocolSessionIdentifier getSessionIdentifier();
+
+ String getClientVersion();
+
+ long getLastIoTime();
+
+ long getWrittenBytes();
+
+ Long getMaximumNumberOfChannels();
+
+ void setMaximumNumberOfChannels(Long value);
+
+ void commitTransactions(AMQChannel channel) throws AMQException;
+
+ List<AMQChannel> getChannels();
}
Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Tue Sep 1 16:27:52 2009
@@ -79,7 +79,7 @@
@MBeanDescription("Management Bean for an AMQ Broker Connection")
public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection
{
- private AMQMinaProtocolSession _session = null;
+ private AMQProtocolSession _protocolSession = null;
private String _name = null;
// openmbean data types for representing the channel attributes
@@ -92,10 +92,10 @@
new AMQShortString("Broker Management Console has closed the connection.");
@MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
- public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException
+ public AMQProtocolSessionMBean(AMQProtocolSession amqProtocolSession) throws NotCompliantMBeanException, OpenDataException
{
super(ManagedConnection.class, ManagedConnection.TYPE, ManagedConnection.VERSION);
- _session = session;
+ _protocolSession = amqProtocolSession;
String remote = getRemoteAddress();
remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
_name = jmxEncode(new StringBuffer(remote), 0).toString();
@@ -128,52 +128,52 @@
public String getClientId()
{
- return (_session.getContextKey() == null) ? null : _session.getContextKey().toString();
+ return (_protocolSession.getContextKey() == null) ? null : _protocolSession.getContextKey().toString();
}
public String getAuthorizedId()
{
- return (_session.getAuthorizedID() != null ) ? _session.getAuthorizedID().getName() : null;
+ return (_protocolSession.getAuthorizedID() != null ) ? _protocolSession.getAuthorizedID().getName() : null;
}
public String getVersion()
{
- return (_session.getClientVersion() == null) ? null : _session.getClientVersion().toString();
+ return (_protocolSession.getClientVersion() == null) ? null : _protocolSession.getClientVersion().toString();
}
public Date getLastIoTime()
{
- return new Date(_session.getIOSession().getLastIoTime());
+ return new Date(_protocolSession.getLastIoTime());
}
public String getRemoteAddress()
{
- return _session.getIOSession().getRemoteAddress().toString();
+ return _protocolSession.getRemoteAddress().toString();
}
public ManagedObject getParentObject()
{
- return _session.getVirtualHost().getManagedObject();
+ return _protocolSession.getVirtualHost().getManagedObject();
}
public Long getWrittenBytes()
{
- return _session.getIOSession().getWrittenBytes();
+ return _protocolSession.getWrittenBytes();
}
public Long getReadBytes()
{
- return _session.getIOSession().getReadBytes();
+ return _protocolSession.getWrittenBytes();
}
public Long getMaximumNumberOfChannels()
{
- return _session.getMaximumNumberOfChannels();
+ return _protocolSession.getMaximumNumberOfChannels();
}
public void setMaximumNumberOfChannels(Long value)
{
- _session.setMaximumNumberOfChannels(value);
+ _protocolSession.setMaximumNumberOfChannels(value);
}
public String getObjectInstanceName()
@@ -192,13 +192,13 @@
CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
try
{
- AMQChannel channel = _session.getChannel(channelId);
+ AMQChannel channel = _protocolSession.getChannel(channelId);
if (channel == null)
{
throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
}
- _session.commitTransactions(channel);
+ _protocolSession.commitTransactions(channel);
}
catch (AMQException ex)
{
@@ -221,13 +221,13 @@
CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
try
{
- AMQChannel channel = _session.getChannel(channelId);
+ AMQChannel channel = _protocolSession.getChannel(channelId);
if (channel == null)
{
throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
}
- _session.rollbackTransactions(channel);
+ _protocolSession.commitTransactions(channel);
}
catch (AMQException ex)
{
@@ -248,7 +248,7 @@
public TabularData channels() throws OpenDataException
{
TabularDataSupport channelsList = new TabularDataSupport(_channelsType);
- List<AMQChannel> list = _session.getChannels();
+ List<AMQChannel> list = _protocolSession.getChannels();
for (AMQChannel channel : list)
{
@@ -274,7 +274,7 @@
public void closeConnection() throws JMException
{
- MethodRegistry methodRegistry = _session.getMethodRegistry();
+ MethodRegistry methodRegistry = _protocolSession.getMethodRegistry();
ConnectionCloseBody responseBody =
methodRegistry.createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
// replyCode
@@ -301,12 +301,12 @@
try
{
- _session.writeFrame(responseBody.generateFrame(0));
+ _protocolSession.writeFrame(responseBody.generateFrame(0));
try
{
- _session.closeSession();
+ _protocolSession.closeSession();
}
catch (AMQException ex)
{
Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Tue Sep 1 16:27:52 2009
@@ -258,7 +258,7 @@
for (InetSocketAddress bindAddress : _acceptors.keySet())
{
QpidAcceptor acceptor = _acceptors.get(bindAddress);
- acceptor.getIoAcceptor().unbind(bindAddress);
+ acceptor.getNetworkDriver().close();
CurrentActor.get().message(BrokerMessages.BRK_1003(acceptor.toString(), bindAddress.getPort()));
}
}
Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java Tue Sep 1 16:27:52 2009
@@ -23,7 +23,6 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
@@ -32,7 +31,6 @@
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.ACLPluginFactory;
@@ -180,13 +178,13 @@
@Override
public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost)
{
- if (!(session instanceof AMQMinaProtocolSession))
+ SocketAddress sockAddr = session.getRemoteAddress();
+ if (!(sockAddr instanceof InetSocketAddress))
{
- return AuthzResult.ABSTAIN; // We only deal with tcp sessions, which
- // mean MINA right now
+ return AuthzResult.ABSTAIN; // We only deal with tcp sessions
}
- InetAddress addr = getInetAdressFromMinaSession((AMQMinaProtocolSession) session);
+ InetAddress addr = ((InetSocketAddress) sockAddr).getAddress();
if (addr == null)
{
@@ -213,19 +211,6 @@
}
- private InetAddress getInetAdressFromMinaSession(AMQMinaProtocolSession session)
- {
- SocketAddress remote = session.getIOSession().getRemoteAddress();
- if (remote instanceof InetSocketAddress)
- {
- return ((InetSocketAddress) remote).getAddress();
- }
- else
- {
- return null;
- }
- }
-
@Override
public void setConfiguration(Configuration config) throws ConfigurationException
{
Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java Tue Sep 1 16:27:52 2009
@@ -20,21 +20,21 @@
*/
package org.apache.qpid.server.transport;
-import org.apache.mina.common.IoAcceptor;
+import org.apache.qpid.transport.NetworkDriver;
public class QpidAcceptor
{
- IoAcceptor _acceptor;
+ NetworkDriver _driver;
String _protocol;
- public QpidAcceptor(IoAcceptor acceptor, String protocol)
+ public QpidAcceptor(NetworkDriver driver, String protocol)
{
- _acceptor = acceptor;
+ _driver = driver;
_protocol = protocol;
}
- public IoAcceptor getIoAcceptor()
+ public NetworkDriver getNetworkDriver()
{
- return _acceptor;
+ return _driver;
}
public String toString()
Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java Tue Sep 1 16:27:52 2009
@@ -20,27 +20,27 @@
*/
package org.apache.qpid.server.configuration;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+
import junit.framework.TestCase;
+
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.TestIoSession;
+import org.apache.qpid.server.protocol.TestNetworkDriver;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-
public class ServerConfigurationTest extends TestCase
{
@@ -589,12 +589,12 @@
{
// Check default
ServerConfiguration serverConfig = new ServerConfiguration(_config);
- assertEquals(true, serverConfig.getSSLOnly());
+ assertEquals(false, serverConfig.getSSLOnly());
// Check value we set
- _config.setProperty("connector.ssl.sslOnly", false);
+ _config.setProperty("connector.ssl.sslOnly", true);
serverConfig = new ServerConfiguration(_config);
- assertEquals(false, serverConfig.getSSLOnly());
+ assertEquals(true, serverConfig.getSSLOnly());
}
public void testGetSSLPort() throws ConfigurationException
@@ -791,16 +791,15 @@
// Test config
VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
- AMQCodecFactory codecFactory = new AMQCodecFactory(true);
-
- TestIoSession iosession = new TestIoSession();
- iosession.setAddress("127.0.0.1");
- AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+ TestNetworkDriver testDriver = new TestNetworkDriver();
+ testDriver.setAddress("127.0.0.1");
+
+ AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
- iosession.setAddress("127.1.2.3");
- session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+ testDriver.setAddress("127.1.2.3");
+ session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertTrue(reg.getAccessManager().authoriseConnect(session, virtualHost));
}
@@ -866,12 +865,12 @@
// Test config
VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
- AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- TestIoSession iosession = new TestIoSession();
- iosession.setAddress("127.0.0.1");
+ TestNetworkDriver testDriver = new TestNetworkDriver();
+ testDriver.setAddress("127.0.0.1");
- AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+ AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
+ session.setNetworkDriver(testDriver);
assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
}
@@ -935,12 +934,11 @@
ApplicationRegistry.initialise(reg, 1);
// Test config
- TestIoSession iosession = new TestIoSession();
- iosession.setAddress("127.0.0.1");
+ TestNetworkDriver testDriver = new TestNetworkDriver();
+ testDriver.setAddress("127.0.0.1");
VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
- AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+ AMQProtocolSession session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
RandomAccessFile fileBRandom = new RandomAccessFile(fileB, "rw");
Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Tue Sep 1 16:27:52 2009
@@ -44,7 +44,7 @@
private static final Logger log = Logger.getLogger(AMQProtocolSessionMBeanTest.class);
private MessageStore _messageStore = new SkeletonMessageStore();
- private AMQMinaProtocolSession _protocolSession;
+ private AMQProtocolEngine _protocolSession;
private AMQChannel _channel;
private AMQProtocolSessionMBean _mbean;
Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Tue Sep 1 16:27:52 2009
@@ -20,23 +20,22 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
+import java.security.Principal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import java.security.Principal;
-public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
{
// ChannelID(LIST) -> LinkedList<Pair>
final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
@@ -44,9 +43,7 @@
public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException
{
- super(new TestIoSession(),
- ApplicationRegistry.getInstance().getVirtualHostRegistry(),
- new AMQCodecFactory(true));
+ super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkDriver());
_channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java Tue Sep 1 16:27:52 2009
@@ -37,7 +37,7 @@
/** Test class to test MBean operations for AMQMinaProtocolSession. */
public class MaxChannelsTest extends TestCase
{
- private AMQMinaProtocolSession _session;
+ private AMQProtocolEngine _session;
public void testChannels() throws Exception
{
Added: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java?rev=810110&view=auto
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java (added)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java Tue Sep 1 16:27:52 2009
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.net.ssl.SSLEngine;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.NetworkDriverConfiguration;
+import org.apache.qpid.transport.OpenException;
+
+/**
+ * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
+ * so if this class is being used and some methods are to be used, then please update those.
+ */
+public class TestNetworkDriver implements NetworkDriver
+{
+ private final ConcurrentMap attributes = new ConcurrentHashMap();
+ private String _address = "127.0.0.1";
+ private int _port = 1;
+
+ public TestNetworkDriver()
+ {
+ }
+
+ public void setAddress(String string)
+ {
+ this._address = string;
+ }
+
+ public String getAddress()
+ {
+ return _address;
+ }
+
+ public void setPort(int _port)
+ {
+ this._port = _port;
+ }
+
+ public int getPort()
+ {
+ return _port;
+ }
+
+ public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
+ {
+
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return new InetSocketAddress(_address, _port);
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return new InetSocketAddress(_address, _port);
+ }
+
+ public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
+ SSLEngine sslEngine) throws OpenException
+ {
+
+ }
+
+ public void setMaxReadIdle(int idleTime)
+ {
+
+ }
+
+ public void setMaxWriteIdle(int idleTime)
+ {
+
+ }
+
+ public void close()
+ {
+
+ }
+
+ public void flush()
+ {
+
+ }
+
+ public void send(ByteBuffer msg)
+ {
+
+ }
+
+ public void setIdleTimeout(long l)
+ {
+
+ }
+
+}
Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Tue Sep 1 16:27:52 2009
@@ -20,37 +20,34 @@
*/
package org.apache.qpid.server.queue;
+import java.util.ArrayList;
+import java.util.LinkedList;
+
+import javax.management.Notification;
+
import junit.framework.TestCase;
+
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
-import org.apache.qpid.server.protocol.InternalTestProtocolSession;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.mina.common.ByteBuffer;
-
-import javax.management.Notification;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.Collections;
-import java.util.Set;
-import java.security.Principal;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
public class AMQQueueAlertTest extends TestCase
@@ -62,7 +59,7 @@
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
private VirtualHost _virtualHost;
- private AMQMinaProtocolSession _protocolSession;
+ private AMQProtocolEngine _protocolSession;
private MessageStore _messageStore = new MemoryMessageStore();
private StoreContext _storeContext = new StoreContext();
private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java Tue Sep 1 16:27:52 2009
@@ -30,17 +30,14 @@
import junit.framework.TestCase;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
-import org.apache.qpid.server.protocol.TestIoSession;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
+import org.apache.qpid.server.protocol.TestNetworkDriver;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
public class FirewallPluginTest extends TestCase
{
@@ -84,22 +81,22 @@
private TestableMemoryMessageStore _store;
private VirtualHost _virtualHost;
- private AMQMinaProtocolSession _session;
+ private AMQProtocolEngine _session;
+ private TestNetworkDriver _testDriver;
@Override
public void setUp() throws Exception
{
super.setUp();
_store = new TestableMemoryMessageStore();
- TestIoSession iosession = new TestIoSession();
- iosession.setAddress("127.0.0.1");
+ _testDriver = new TestNetworkDriver();
+ _testDriver.setAddress("127.0.0.1");
// Retreive VirtualHost from the Registry
VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry();
_virtualHost = virtualHostRegistry.getVirtualHost("test");
- AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- _session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+ _session = new AMQProtocolEngine(virtualHostRegistry, _testDriver);
}
public void tearDown() throws Exception
@@ -170,7 +167,7 @@
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -185,7 +182,7 @@
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -198,7 +195,7 @@
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1");
+ _testDriver.setAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -211,7 +208,7 @@
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1");
+ _testDriver.setAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -234,7 +231,7 @@
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -257,7 +254,7 @@
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -271,7 +268,7 @@
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -285,7 +282,7 @@
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -295,11 +292,11 @@
firstRule.setAccess("allow");
firstRule.setHostname("foo, bar, "+new InetSocketAddress("127.0.0.1", 5672).getHostName());
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{firstRule});
- ((TestIoSession) _session.getIOSession()).setAddress("10.0.0.1");
+ _testDriver.setAddress("10.0.0.1");
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1");
+ _testDriver.setAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Tue Sep 1 16:27:52 2009
@@ -191,7 +191,7 @@
_logger.debug("Protocol session created for session " + System.identityHashCode(session));
_failoverHandler = new FailoverHandler(this, session);
- final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false));
+ final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false, _protocolSession));
if (Boolean.getBoolean("amqj.shared_read_write_pool"))
{
Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Tue Sep 1 16:27:52 2009
@@ -31,7 +31,10 @@
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.thread.QpidThreadExecutor;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,7 +65,7 @@
private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class);
- private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
+ private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQProtocolEngineFactory";
private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>();
@@ -190,8 +193,6 @@
_acceptor = new VmPipeAcceptor();
IoServiceConfig config = _acceptor.getDefaultConfig();
-
- config.setThreadModel(ReadWriteThreadModel.getInstance());
}
synchronized (_inVmPipeAddress)
{
@@ -276,7 +277,10 @@
{
Class[] cnstr = {Integer.class};
Object[] params = {port};
- provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+
+ provider = new MINANetworkDriver();
+ ProtocolEngineFactory engineFactory = (ProtocolEngineFactory) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+ ((MINANetworkDriver) provider).setProtocolEngineFactory(engineFactory, true);
// Give the broker a second to create
_logger.info("Created VMBroker Instance:" + port);
}
Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java?rev=810110&r1=810109&r2=810110&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java Tue Sep 1 16:27:52 2009
@@ -23,6 +23,7 @@
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
* AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to
@@ -50,9 +51,9 @@
* @param expectProtocolInitiation <tt>true</tt> if the first frame received is going to be a protocol initiation
* frame, <tt>false</tt> if it is going to be a standard AMQ data block.
*/
- public AMQCodecFactory(boolean expectProtocolInitiation)
+ public AMQCodecFactory(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session)
{
- _frameDecoder = new AMQDecoder(expectProtocolInitiation);
+ _frameDecoder = new AMQDecoder(expectProtocolInitiation, session);
}
/**
@@ -70,7 +71,7 @@
*
* @return The AMQP decoder.
*/
- public ProtocolDecoder getDecoder()
+ public AMQDecoder getDecoder()
{
return _frameDecoder;
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org