You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/08/18 18:16:11 UTC

svn commit: r805477 - in /qpid/branches/java-network-refactor/qpid/java: client/src/main/java/org/apache/qpid/client/transport/ common/src/main/java/org/apache/qpid/protocol/ common/src/main/java/org/apache/qpid/thread/ common/src/main/java/org/apache/...

Author: aidan
Date: Tue Aug 18 16:16:10 2009
New Revision: 805477

URL: http://svn.apache.org/viewvc?rev=805477&view=rev
Log:
QPID-2024: Add ProtocolEngine and NetworkDriver interfaces and a NetworkDriver implementation that uses MINA. 

Added:
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
      - copied, changed from r805454, qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
    qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
    qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/
    qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/
    qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
Removed:
    qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java
Modified:
    qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=805477&r1=805476&r2=805477&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Tue Aug 18 16:16:10 2009
@@ -31,6 +31,7 @@
 import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.thread.QpidThreadExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

Added: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java?rev=805477&view=auto
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java (added)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java Tue Aug 18 16:16:10 2009
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import java.net.SocketAddress;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.Receiver;
+
+/**
+ * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
+ * decodes it and then process the result. 
+ */ 
+public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer> 
+{ 
+   // Sets the network driver providing data for this ProtocolEngine 
+   void setNetworkDriver (NetworkDriver driver);
+ 
+   // Returns the remote address of the NetworkDriver 
+   SocketAddress getRemoteAddress();
+ 
+   // Returns number of bytes written 
+   long getWrittenBytes();
+ 
+   // Returns number of bytes read 
+   long getReadBytes();
+ 
+   // Called by the NetworkDriver when the socket has been closed for reading 
+   void closed();
+ 
+   // Called when the NetworkEngine has not written data for the specified period of time (will trigger a  
+   // heartbeat) 
+   void writerIdle();
+  
+   // Called when the NetworkEngine has not read data for the specified period of time (will close the connection) 
+   void readerIdle();
+ 
+   /**
+    * Accepts an AMQFrame for writing to the network. The ProtocolEngine encodes the frame into bytes and
+    * passes the data onto the NetworkDriver for sending
+    */ 
+   void writeFrame(AMQDataBlock frame);
+} 
\ No newline at end of file

Added: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java?rev=805477&view=auto
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java (added)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java Tue Aug 18 16:16:10 2009
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+public interface ProtocolEngineFactory  
+{ 
+ 
+  // Returns a new instance of a ProtocolEngine 
+  ProtocolEngine newProtocolEngine(); 
+   
+} 
\ No newline at end of file

Copied: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java (from r805454, qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java?p2=qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java&p1=qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java&r1=805454&r2=805477&rev=805477&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java Tue Aug 18 16:16:10 2009
@@ -1,4 +1,25 @@
-package org.apache.qpid.client.transport;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.thread;
 
 import org.apache.qpid.thread.Threading;
 

Added: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java?rev=805477&view=auto
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java (added)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java Tue Aug 18 16:16:10 2009
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.SocketAddress;
+
+import javax.net.ssl.SSLEngine;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+
+public interface NetworkDriver extends Sender<java.nio.ByteBuffer> 
+{ 
+   // Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to  
+   // it using the SSLEngine if provided 
+   void open(int port, InetAddress destination, ProtocolEngine engine,
+           NetworkDriverConfiguration config, SSLEngine sslEngine)
+   throws OpenException; 
+   
+   // listens for incoming connections on the specified ports and address and creates a new NetworkDriver which 
+   // processes incoming connections with ProtocolEngines created from factory using the SSLEngine if provided 
+   void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,  
+              NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException; 
+ 
+   // Returns the remote address of underlying socket 
+   SocketAddress getRemoteAddress();
+ 
+   /**
+    * The length of time after which the ProtocolEngines readIdle() method should be called if no data has been 
+    * read in seconds
+    */  
+   void setMaxReadIdle(int idleTime);
+  
+   /**
+    * The length of time after which the ProtocolEngines writeIdle() method should be called if no data has been 
+    * written in seconds
+    */   
+   void setMaxWriteIdle(int idleTime);
+ 
+} 
\ No newline at end of file

Added: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java?rev=805477&view=auto
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java (added)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java Tue Aug 18 16:16:10 2009
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+/**
+ * This interface provides a means for NetworkDrivers to configure TCP options such as incoming and outgoing
+ * buffer sizes and set particular options on the socket. NetworkDrivers should honour the values returned
+ * from here if the underlying implementation supports them.  
+ */ 
+public interface NetworkDriverConfiguration  
+{  
+    // Taken from Socket  
+    boolean getKeepAlive();
+    boolean getOOBInline();
+    boolean getReuseAddress();
+    Integer getSoLinger(); // null means off 
+    int getSoTimeout(); 
+    boolean getTcpNoDelay(); 
+    int getTrafficClass();
+
+    // The amount of memory in bytes to allocate to the incoming buffer 
+    int getReceiveBufferSize();  
+
+    // The amount of memory in bytes to allocate to the outgoing buffer 
+    int getSendBufferSize();  
+} 

Added: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java?rev=805477&view=auto
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java (added)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java Tue Aug 18 16:16:10 2009
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport;
+
+public class OpenException extends Exception
+{
+
+    public OpenException(String string, Throwable lastException)
+    {
+        super(string, lastException);
+    }
+
+}

Added: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java?rev=805477&view=auto
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java (added)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java Tue Aug 18 16:16:10 2009
@@ -0,0 +1,379 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network.mina;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLEngine;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
+import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.SessionUtil;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.thread.QpidThreadExecutor;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.NetworkDriverConfiguration;
+import org.apache.qpid.transport.OpenException;
+
+public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
+{
+
+    private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
+    
+    ProtocolEngine _protocolEngine;
+    private boolean _useNIO = false;
+    private int _processors = 4;
+    private boolean _executorPool = false;
+    private SSLContextFactory _sslFactory = null;
+    private SocketConnector _socketConnector;
+    private IoAcceptor _acceptor;
+    private IoSession _ioSession;
+    private ProtocolEngineFactory _factory;
+    private boolean _protectIO;
+    private NetworkDriverConfiguration _config;
+    private Throwable _lastException;
+    private boolean _acceptingConnections = false;
+
+    public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO)
+    {
+        _useNIO = useNIO;
+        _processors = processors;
+        _executorPool = executorPool;
+        _protectIO = protectIO;
+    }
+
+    public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO,
+            ProtocolEngine protocolEngine, IoSession session)
+    {
+        _useNIO = useNIO;
+        _processors = processors;
+        _executorPool = executorPool;
+        _protectIO = protectIO;
+        _protocolEngine = protocolEngine;
+        _ioSession = session;
+    }
+    
+    public MINANetworkDriver()
+    {
+
+    }
+
+    public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory,
+            NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
+    {
+
+        _factory = factory;
+        _config = config;
+        
+        if (_useNIO)
+        {
+            _acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(_processors,
+                    new NewThreadExecutor());
+        }
+        else
+        {
+            _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor());
+        }
+
+        SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig();
+        SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
+
+        if (config != null)
+        {
+            sc.setReceiveBufferSize(config.getReceiveBufferSize());
+            sc.setSendBufferSize(config.getSendBufferSize());
+            sc.setTcpNoDelay(config.getTcpNoDelay());
+        }
+
+        // if we do not use the executor pool threading model we get the default
+        // leader follower
+        // implementation provided by MINA
+        if (_executorPool)
+        {
+            sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
+        }
+
+        if (sslFactory != null)
+        {
+            _sslFactory = sslFactory;
+        }
+
+        if (addresses != null && addresses.length > 0)
+        {
+            for (InetAddress addr : addresses)
+            {
+                try
+                {
+                    _acceptor.bind(new InetSocketAddress(addr, port), this, sconfig);
+                }
+                catch (IOException e)
+                {
+                    throw new BindException(String.format("Could not bind to {0}:{2}", addr, port));
+                }
+            }
+        }
+        else
+        {
+            try
+            {
+                _acceptor.bind(new InetSocketAddress(port), this, sconfig);
+            }
+            catch (IOException e)
+            {
+                throw new BindException(String.format("Could not bind to *:{1}", port));
+            }
+        }
+        _acceptingConnections  = true;
+    }
+
+    public SocketAddress getRemoteAddress()
+    {
+        return _ioSession.getRemoteAddress();
+    }
+
+    public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
+            SSLEngine sslEngine) throws OpenException
+    {
+        if (_useNIO)
+        {
+            _socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor());
+        }
+        else
+        {
+            _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking
+                                                                                 // connector
+        }
+        
+        org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
+        // the MINA default is currently to use the pooled allocator although this may change in future
+        // once more testing of the performance of the simple allocator has been done
+        if (!Boolean.getBoolean("amqj.enablePooledAllocator"))
+        {
+            org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+        }
+        
+
+        SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig();
+
+        // if we do not use our own thread model we get the MINA default which is to use
+        // its own leader-follower model
+        boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
+        if (readWriteThreading)
+        {
+            cfg.setThreadModel(ReadWriteThreadModel.getInstance());
+        }
+        
+        SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+        scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() :  true);
+        scfg.setSendBufferSize((config != null) ? config.getSendBufferSize() : DEFAULT_BUFFER_SIZE);
+        scfg.setReceiveBufferSize((config != null) ? config.getReceiveBufferSize() : DEFAULT_BUFFER_SIZE);
+        
+        // Don't have the connector's worker thread wait around for other
+        // connections (we only use
+        // one SocketConnector per connection at the moment anyway). This allows
+        // short-running
+        // clients (like unit tests) to complete quickly.
+        _socketConnector.setWorkerTimeout(0);
+        ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg);
+        future.join();
+        if (!future.isConnected())
+        {
+            throw new OpenException("Could not open connection", _lastException);
+        }
+        _ioSession = future.getSession();
+        ReadWriteThreadModel.getInstance().getAsynchronousReadFilter().createNewJobForSession(_ioSession);
+        ReadWriteThreadModel.getInstance().getAsynchronousWriteFilter().createNewJobForSession(_ioSession);
+        _ioSession.setAttachment(engine);
+        engine.setNetworkDriver(this);
+        _protocolEngine = engine;
+    }
+
+    public void setMaxReadIdle(int idleTime)
+    {
+        _ioSession.setIdleTime(IdleStatus.READER_IDLE, idleTime);
+    }
+
+    public void setMaxWriteIdle(int idleTime)
+    {
+        _ioSession.setIdleTime(IdleStatus.WRITER_IDLE, idleTime);
+    }
+
+    public void close()
+    {
+        if (_acceptor != null)
+        {
+            _acceptor.unbindAll();
+        }
+        if (_ioSession != null)
+        {
+            _ioSession.close();
+        }
+    }
+
+    public void flush()
+    {
+        // MINA doesn't support flush 
+    }
+
+    public void send(ByteBuffer msg)
+    {
+        WriteFuture future = _ioSession.write(org.apache.mina.common.ByteBuffer.wrap(msg));
+        future.join();
+    }
+
+    public void setIdleTimeout(long l)
+    {
+        // MINA doesn't support setting SO_TIMEOUT
+    }
+
+    public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception
+    {
+        if (_protocolEngine != null)
+        {
+            _protocolEngine.exception(throwable);
+        }
+        _lastException = throwable;
+    }
+
+    /**
+     * Invoked when a message is received on a particular protocol session. Note
+     * that a protocol session is directly tied to a particular physical
+     * connection.
+     * 
+     * @param protocolSession
+     *            the protocol session that received the message
+     * @param message
+     *            the message itself (i.e. a decoded frame)
+     * 
+     * @throws Exception
+     *             if the message cannot be processed
+     */
+    public void messageReceived(IoSession protocolSession, Object message) throws Exception
+    {
+        if (message instanceof org.apache.mina.common.ByteBuffer)
+        {
+            ((ProtocolEngine) protocolSession.getAttachment()).received(((org.apache.mina.common.ByteBuffer) message).buf());
+        }
+        else
+        {
+            throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message);
+        }
+    }
+
+    public void sessionClosed(IoSession protocolSession) throws Exception
+    {
+        ((ProtocolEngine) protocolSession.getAttachment()).closed();
+    }
+
+    public void sessionCreated(IoSession protocolSession) throws Exception
+    {
+        if (_acceptingConnections)
+        {
+            // Configure the session with SSL if necessary
+            SessionUtil.initialize(protocolSession);
+            if (_executorPool)
+            {
+                if (_sslFactory != null)
+                {
+                    protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
+                            new SSLFilter(_sslFactory.buildServerContext()));
+                }
+            }
+            else
+            {
+                if (_sslFactory != null)
+                {
+                    protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
+                            new SSLFilter(_sslFactory.buildServerContext()));
+                }
+            }
+
+            // Do we want to have read/write buffer limits?
+            if (_protectIO)
+            {
+                //Add IO Protection Filters
+                IoFilterChain chain = protocolSession.getFilterChain();
+
+                protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+
+                ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+                readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
+                readfilter.attach(chain);
+
+                WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+                writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
+                writefilter.attach(chain);
+
+                protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+            }
+
+            // Set up the protocol engine
+            ProtocolEngine protocolEngine = _factory.newProtocolEngine();
+            MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession);
+            protocolEngine.setNetworkDriver(newDriver);
+            protocolSession.setAttachment(protocolEngine);
+        }
+    }
+
+    public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+    {
+        if (IdleStatus.WRITER_IDLE.equals(status))
+        {   
+            ((ProtocolEngine) session.getAttachment()).writerIdle();
+        }
+        else if (IdleStatus.READER_IDLE.equals(status))
+        {
+            ((ProtocolEngine) session.getAttachment()).readerIdle();        
+        }
+    }
+
+    private ProtocolEngine getProtocolEngine()
+    {
+       return _protocolEngine;
+    }
+
+}

Added: qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java?rev=805477&view=auto
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java (added)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java Tue Aug 18 16:16:10 2009
@@ -0,0 +1,473 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network.mina;
+
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.OpenException;
+
+public class MINANetworkDriverTest extends TestCase
+{
+ 
+    private static final String TEST_DATA = "YHALOTHAR";
+    private static final int TEST_PORT = 2323;
+    private NetworkDriver _server;
+    private NetworkDriver _client;
+    private CountingProtocolEngine _countingEngine; // Keeps a count of how many bytes it's read
+    private Exception _thrownEx;
+
+    @Override
+    public void setUp()
+    {
+        _server = new MINANetworkDriver();
+        _client = new MINANetworkDriver();
+        _thrownEx = null;
+        _countingEngine = new CountingProtocolEngine();
+    }
+
+    @Override
+    public void tearDown()
+    {
+        if (_server != null)
+        {
+            _server.close();
+        }
+
+        if (_client != null)
+        {
+            _client.close();
+        }
+    }
+    
+    /**
+     * Tests that a socket can't be opened if a driver hasn't been bound
+     * to the port and can be opened if a driver has been bound.
+     * @throws BindException
+     * @throws UnknownHostException
+     * @throws OpenException 
+     */
+    public void testBindOpen() throws BindException, UnknownHostException, OpenException  
+    {
+        try
+        {
+            _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        } 
+        catch (OpenException e)
+        {
+            _thrownEx = e;
+        }
+
+        assertNotNull("Open should have failed since no engine bound", _thrownEx);        
+        
+        _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+        
+        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+    } 
+    
+    /**
+     * Tests that a socket can't be opened after a bound NetworkDriver has been closed
+     * @throws BindException
+     * @throws UnknownHostException
+     * @throws OpenException
+     */
+    public void testBindOpenCloseOpen() throws BindException, UnknownHostException, OpenException  
+    {
+        _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        _client.close();
+        _server.close();
+        
+        try
+        {
+            _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        } 
+        catch (OpenException e)
+        {
+            _thrownEx = e;
+        }
+        assertNotNull("Open should have failed", _thrownEx);
+    }
+    
+    /**
+     * Checks that the right exception is thrown when binding a NetworkDriver to an already
+     * existing socket. 
+     */
+    public void testBindPortInUse() 
+    {
+        try
+        {
+            _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+        }
+        catch (BindException e)
+        {
+            fail("First bind should not fail");
+        }
+        
+        try
+        {
+            _client.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+        }
+        catch (BindException e)
+        {
+            _thrownEx = e;
+        }
+        assertNotNull("Second bind should throw BindException", _thrownEx);
+    } 
+    
+    /**
+     * tests that bytes sent on a network driver are received at the other end
+     * 
+     * @throws UnknownHostException
+     * @throws OpenException
+     * @throws InterruptedException 
+     * @throws BindException 
+     */
+    public void testSend() throws UnknownHostException, OpenException, InterruptedException, BindException 
+    {
+        // Open a connection from a counting engine to an echo engine
+        _server.bind(TEST_PORT, null,  new EchoProtocolEngineSingletonFactory(), null, null);
+        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        
+        // Tell the counting engine how much data we're sending
+        _countingEngine.setNewLatch(TEST_DATA.getBytes().length);
+        
+        // Send the data and wait for up to 2 seconds to get it back 
+        _client.send(ByteBuffer.wrap(TEST_DATA.getBytes()));
+        _countingEngine.getLatch().await(2, TimeUnit.SECONDS);
+        
+        // Check what we got
+        assertEquals("Wrong amount of data recieved", TEST_DATA.getBytes().length, _countingEngine.getReadBytes());
+    } 
+    
+    /**
+     * Opens a connection with a low read idle and check that it gets triggered
+     * @throws BindException 
+     * @throws OpenException 
+     * @throws UnknownHostException 
+     * 
+     */
+    public void testSetReadIdle() throws BindException, UnknownHostException, OpenException 
+    {
+        // Open a connection from a counting engine to an echo engine
+        _server.bind(TEST_PORT, null,  new EchoProtocolEngineSingletonFactory(), null, null);
+        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        assertFalse("Reader should not have been idle", _countingEngine.getReaderHasBeenIdle());
+        _client.setMaxReadIdle(1);
+        try
+        {
+            Thread.sleep(1000);
+        }
+        catch (InterruptedException e)
+        {
+            // Eat it
+        }
+        assertTrue("Reader should have been idle", _countingEngine.getReaderHasBeenIdle());
+    } 
+    
+    /**
+     * Opens a connection with a low write idle and check that it gets triggered
+     * @throws BindException 
+     * @throws OpenException 
+     * @throws UnknownHostException 
+     * 
+     */
+    public void testSetWriteIdle() throws BindException, UnknownHostException, OpenException 
+    {
+        // Open a connection from a counting engine to an echo engine
+        _server.bind(TEST_PORT, null,  new EchoProtocolEngineSingletonFactory(), null, null);
+        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        assertFalse("Reader should not have been idle", _countingEngine.getWriterHasBeenIdle());
+        _client.setMaxWriteIdle(1);
+        try
+        {
+            Thread.sleep(1000);
+        }
+        catch (InterruptedException e)
+        {
+            // Eat it
+        }
+        assertTrue("Reader should have been idle", _countingEngine.getWriterHasBeenIdle());
+    } 
+    
+    
+    /**
+     * Creates and then closes a connection from client to server and checks that the server
+     * has its closed() method called. Then creates a new client and closes the server to check
+     * that the client has its closed() method called.  
+     * @throws BindException
+     * @throws UnknownHostException
+     * @throws OpenException
+     */
+    public void testClosed() throws BindException, UnknownHostException, OpenException 
+    {
+        // Open a connection from a counting engine to an echo engine
+        EchoProtocolEngineSingletonFactory factory = new EchoProtocolEngineSingletonFactory();
+        _server.bind(TEST_PORT, null, factory, null, null);
+        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        EchoProtocolEngine serverEngine = null; 
+        while (serverEngine == null)
+        {
+            serverEngine = factory.getEngine();
+            if (serverEngine == null)
+            {
+                try
+                {
+                    Thread.sleep(10);
+                }
+                catch (InterruptedException e)
+                {
+                }
+            }
+        }
+        assertFalse("Server should not have been closed", serverEngine.getClosed());
+        serverEngine.setNewLatch(1);
+        _client.close();
+        try
+        {
+            serverEngine.getLatch().await(2, TimeUnit.SECONDS);
+        }
+        catch (InterruptedException e)
+        {
+        }
+        assertTrue("Server should have been closed", serverEngine.getClosed());
+
+        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        _countingEngine.setClosed(false);
+        assertFalse("Client should not have been closed", _countingEngine.getClosed());
+        _countingEngine.setNewLatch(1);
+        _server.close();
+        try
+        {
+            _countingEngine.getLatch().await(2, TimeUnit.SECONDS);
+        }
+        catch (InterruptedException e)
+        {
+        }
+        assertTrue("Client should have been closed", _countingEngine.getClosed());
+    } 
+
+    /**
+     * Create a connection and instruct the client to throw an exception when it gets some data
+     * and that the latch gets counted down. 
+     * @throws BindException
+     * @throws UnknownHostException
+     * @throws OpenException
+     * @throws InterruptedException
+     */
+    public void testExceptionCaught() throws BindException, UnknownHostException, OpenException, InterruptedException 
+    {
+        _server.bind(TEST_PORT, null,  new EchoProtocolEngineSingletonFactory(), null, null);
+        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+
+
+        assertEquals("Exception should not have been thrown", 1, 
+                _countingEngine.getExceptionLatch().getCount());
+        _countingEngine.setErrorOnNextRead(true);
+        _countingEngine.setNewLatch(TEST_DATA.getBytes().length);
+        _client.send(ByteBuffer.wrap(TEST_DATA.getBytes()));
+        _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS);
+        assertEquals("Exception should not been thrown", 0, 
+                _countingEngine.getExceptionLatch().getCount());
+    } 
+    
+    /**
+     * Opens a connection and checks that the remote address is the one that was asked for
+     * @throws BindException
+     * @throws UnknownHostException
+     * @throws OpenException
+     */
+    public void testGetRemoteAddress() throws BindException, UnknownHostException, OpenException
+    {
+        _server.bind(TEST_PORT, null,  new EchoProtocolEngineSingletonFactory(), null, null);
+        _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+        assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), TEST_PORT),
+                     _client.getRemoteAddress());
+    }
+
+    private class EchoProtocolEngineSingletonFactory implements ProtocolEngineFactory
+    {
+        EchoProtocolEngine _engine = null;
+        
+        public ProtocolEngine newProtocolEngine()
+        {
+            if (_engine == null)
+            {
+                _engine = new EchoProtocolEngine();
+            }
+            return getEngine();
+        }
+        
+        public EchoProtocolEngine getEngine()
+        {
+            return _engine;
+        }
+    }
+    
+    public class CountingProtocolEngine implements ProtocolEngine
+    {
+
+        protected NetworkDriver _driver;
+        public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>();
+        private int _readBytes;
+        private CountDownLatch _latch = new CountDownLatch(0);
+        private boolean _readerHasBeenIdle;
+        private boolean _writerHasBeenIdle;
+        private boolean _closed = false;
+        private boolean _nextReadErrors = false;
+        private CountDownLatch _exceptionLatch = new CountDownLatch(1);
+        
+        public void closed()
+        {
+            setClosed(true);
+            _latch.countDown();
+        }
+
+        public void setErrorOnNextRead(boolean b)
+        {
+            _nextReadErrors = b;
+        }
+
+        public void setNewLatch(int length)
+        {
+            _latch = new CountDownLatch(length);
+        }
+
+        public long getReadBytes()
+        {
+            return _readBytes;
+        }
+
+        public SocketAddress getRemoteAddress()
+        {
+            if (_driver != null)
+            {
+                return _driver.getRemoteAddress();
+            } 
+            else
+            {
+                return null;
+            }
+        }
+
+        public long getWrittenBytes()
+        {
+            return 0;
+        }
+
+        public void readerIdle()
+        {
+            _readerHasBeenIdle = true;
+        }
+
+        public void setNetworkDriver(NetworkDriver driver)
+        {
+            _driver = driver;
+        }
+
+        public void writeFrame(AMQDataBlock frame)
+        {
+            
+        }
+
+        public void writerIdle()
+        {
+           _writerHasBeenIdle = true;
+        }
+
+        public void exception(Throwable t)
+        {
+            _exceptionLatch.countDown();
+        }
+
+        public CountDownLatch getExceptionLatch()
+        {
+            return _exceptionLatch;
+        }
+        
+        public void received(ByteBuffer msg)
+        {
+            // increment read bytes and count down the latch for that many
+            int bytes = msg.remaining();
+            _readBytes += bytes;
+            for (int i = 0; i < bytes; i++)
+            {
+                _latch.countDown();
+            }
+            
+            // Throw an error if we've been asked too, but we can still count
+            if (_nextReadErrors)
+            {
+                throw new RuntimeException("Was asked to error");
+            }
+        }
+        
+        public CountDownLatch getLatch()
+        {
+            return _latch;
+        }
+
+        public boolean getWriterHasBeenIdle()
+        {
+            return _writerHasBeenIdle;
+        }
+
+        public boolean getReaderHasBeenIdle()
+        {
+            return _readerHasBeenIdle;
+        }
+
+        public void setClosed(boolean _closed)
+        {
+            this._closed = _closed;
+        }
+
+        public boolean getClosed()
+        {
+            return _closed;
+        }
+    }
+
+    private class EchoProtocolEngine extends CountingProtocolEngine
+    {
+
+        public void received(ByteBuffer msg)
+        {
+            super.received(msg);
+            msg.rewind();
+            _driver.send(msg);
+        }
+    }
+}
\ No newline at end of file



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org