You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/12/08 18:01:58 UTC

svn commit: r1643866 - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java: broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ broker-plugins/amqp-0-8-protoc...

Author: kwall
Date: Mon Dec  8 17:01:57 2014
New Revision: 1643866

URL: http://svn.apache.org/r1643866
Log:
NonBlockingSenderReceiver for the Java Broker for AMQP-0-10 and 0-9. Heartbeating working.
AMQ 1.0 not tested.

System tests failing at about ~PriorityQueueTest with seeming resource leak.

Added:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
Modified:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1643866&r1=1643865&r2=1643866&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Mon Dec  8 17:01:57 2014
@@ -278,7 +278,7 @@ public class MultiVersionProtocolEngine
     private class SelfDelegateProtocolEngine implements ServerProtocolEngine
     {
         private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
-        private long _lastReadTime;
+        private long _lastReadTime = System.currentTimeMillis();
 
         public SocketAddress getRemoteAddress()
         {

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1643866&r1=1643865&r2=1643866&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Mon Dec  8 17:01:57 2014
@@ -52,8 +52,8 @@ public class ProtocolEngine_0_10  extend
     private ServerConnection _connection;
 
     private long _createTime = System.currentTimeMillis();
-    private long _lastReadTime;
-    private long _lastWriteTime;
+    private long _lastReadTime = _createTime;
+    private long _lastWriteTime = _createTime;
 
     public ProtocolEngine_0_10(ServerConnection conn,
                                NetworkConnection network)

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1643866&r1=1643865&r2=1643866&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Mon Dec  8 17:01:57 2014
@@ -170,7 +170,7 @@ public class AMQProtocolEngine implement
     private Sender<ByteBuffer> _sender;
 
     private volatile boolean _deferFlush;
-    private long _lastReceivedTime;
+    private long _lastReceivedTime = System.currentTimeMillis();  // TODO consider if this is what we want?
     private boolean _blocking;
 
     private final ReentrantLock _receivedLock;

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java?rev=1643866&r1=1643865&r2=1643866&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java Mon Dec  8 17:01:57 2014
@@ -41,8 +41,7 @@ public class NonBlockingConnection imple
     private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
     private final SocketChannel _socket;
     private final long _timeout;
-    private final NonBlockingSender _ioSender;
-    private final NonBlockingReceiver _ioReceiver;
+    private final NonBlockingSenderReceiver _nonBlockingSenderReceiver;
     private int _maxReadIdle;
     private int _maxWriteIdle;
     private Principal _principal;
@@ -55,36 +54,30 @@ public class NonBlockingConnection imple
         _socket = socket;
         _timeout = timeout;
 
-        _ioReceiver = new NonBlockingReceiver(_socket, delegate, receiveBufferSize,_timeout);
-        _ioReceiver.setTicker(ticker);
+//        _ioReceiver = new NonBlockingReceiver(_socket, delegate, receiveBufferSize,_timeout);
+//        _nonBlockingSenderReceiver.setTicker(ticker);
 
-        _ioSender = new NonBlockingSender(_socket, 2 * sendBufferSize, _timeout);
+//        _ioSender = new NonBlockingSender(_socket, 2 * sendBufferSize, _timeout);
 
-        _ioSender.setReceiver(_ioReceiver);
+//        _ioSender.setReceiver(_nonBlockingSenderReceiver);
+
+        _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker);
 
     }
 
     public void start()
     {
-        _ioSender.initiate();
-        _ioReceiver.initiate();
+        _nonBlockingSenderReceiver.initiate();
     }
 
     public Sender<ByteBuffer> getSender()
     {
-        return _ioSender;
+        return _nonBlockingSenderReceiver;
     }
 
     public void close()
     {
-        try
-        {
-            _ioSender.close();
-        }
-        finally
-        {
-            _ioReceiver.close(false);
-        }
+        _nonBlockingSenderReceiver.close();
     }
 
     public SocketAddress getRemoteAddress()

Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1643866&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java Mon Dec  8 17:01:57 2014
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.Ticker;
+
+public class NonBlockingSenderReceiver  implements Runnable, Sender<ByteBuffer>
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class);
+
+    private final SocketChannel _socketChannel;
+    private final Selector _selector;
+
+    private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
+
+    private final Thread _ioThread;
+    private final String _remoteSocketAddress;
+    private final AtomicBoolean _closed = new AtomicBoolean(false);
+    private final Receiver<ByteBuffer> _receiver;
+    private final int _receiveBufSize;
+    private final Ticker _ticker;
+
+
+
+    public NonBlockingSenderReceiver(final SocketChannel socketChannel, Receiver<ByteBuffer> receiver, int receiveBufSize, Ticker ticker)
+    {
+        _socketChannel = socketChannel;
+        _receiver = receiver;
+        _receiveBufSize = receiveBufSize;
+        _ticker = ticker;
+
+        try
+        {
+            _remoteSocketAddress = socketChannel.getRemoteAddress().toString();
+            _socketChannel.configureBlocking(false);
+            _selector = Selector.open();
+            _socketChannel.register(_selector, SelectionKey.OP_READ);
+        }
+        catch (IOException e)
+        {
+            throw new SenderException("Unable to prepare the channel for non-blocking IO", e);
+        }
+        try
+        {
+            //Create but deliberately don't start the thread.
+            _ioThread = Threading.getThreadFactory().createThread(this);
+        }
+        catch(Exception e)
+        {
+            throw new SenderException("Error creating SenderReceiver thread for " + _remoteSocketAddress, e);
+        }
+
+        _ioThread.setDaemon(true);
+        _ioThread.setName(String.format("IoSenderReceiver - %s", _remoteSocketAddress));
+
+    }
+
+    public void initiate()
+    {
+        _ioThread.start();
+    }
+
+    @Override
+    public void setIdleTimeout(final int i)
+    {
+        // Probably unused - dead code to be removed??
+    }
+
+    @Override
+    public void send(final ByteBuffer msg)
+    {
+        // append to list and do selector wakeup
+        _buffers.add(msg);
+        _selector.wakeup();
+    }
+
+    @Override
+    public void run()
+    {
+        // never ending loop doing
+        //  try to write all pending byte buffers, handle situation where zero bytes or part of a byte buffer is written
+        //  read as much as you can
+        //  try to write all pending byte buffers
+
+        while (!_closed.get())
+        {
+
+            try
+            {
+                long currentTime = System.currentTimeMillis();
+                int tick = _ticker.getTimeToNextTick(currentTime);
+                if(tick <= 0)
+                {
+                    tick = _ticker.tick(currentTime);
+                }
+
+                LOGGER.debug("Tick " + tick);
+
+                int numberReady = _selector.select(tick <= 0 ? 1 : tick);
+                Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+                selectionKeys.clear();
+
+                LOGGER.debug("Number Ready " +  numberReady);
+
+                doWrite();
+                doRead();
+                boolean fullyWritten = doWrite();
+
+                _socketChannel.register(_selector, fullyWritten ? SelectionKey.OP_READ : (SelectionKey.OP_WRITE | SelectionKey.OP_READ));
+            }
+            catch (IOException e)
+            {
+                LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e);
+                close();
+            }
+        }
+
+        try
+        {
+            while(!doWrite())
+            {
+            }
+
+            try
+            {
+                _receiver.closed();
+            }
+            finally
+            {
+                _socketChannel.close();
+            }
+        }
+        catch (IOException e)
+        {
+            LOGGER.info("Exception performing final output for thread '" + _remoteSocketAddress + "': " + e);
+        }
+    }
+
+
+
+    @Override
+    public void flush()
+    {
+        // maybe just wakeup?
+
+    }
+
+    @Override
+    public void close()
+    {
+        LOGGER.debug("Closing " +  _remoteSocketAddress);
+
+        _closed.set(true);
+        _selector.wakeup();
+
+    }
+
+    private boolean doWrite() throws IOException
+    {
+        int byteBuffersWritten = 0;
+
+        ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
+        Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
+        for (int i = 0; i < bufArray.length; i++)
+        {
+            bufArray[i] = bufferIterator.next();
+        }
+
+        _socketChannel.write(bufArray);
+
+        for (ByteBuffer buf : bufArray)
+        {
+            if (buf.remaining() == 0)
+            {
+                byteBuffersWritten++;
+                _buffers.poll();
+            }
+        }
+
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Written " + byteBuffersWritten + " byte buffer(s) completely");
+        }
+
+        return bufArray.length == byteBuffersWritten;
+    }
+
+    private void doRead() throws IOException
+    {
+
+        ByteBuffer buffer;
+        int remaining;
+        do
+        {
+            buffer = ByteBuffer.allocate(_receiveBufSize);
+            _socketChannel.read(buffer);
+            remaining = buffer.remaining();
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Read " + buffer.position() + " byte(s)");
+            }
+            buffer.flip();
+            _receiver.received(buffer);
+        }
+        while (remaining == 0);
+
+    }
+}

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes?rev=1643866&r1=1643865&r2=1643866&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes Mon Dec  8 17:01:57 2014
@@ -34,3 +34,4 @@ org.apache.qpid.test.client.message.Sele
 
 // QPID-6262: Temporary exclusion whilst NIO refactoring is in flight
 org.apache.qpid.client.ssl.SSLTest#*
+org.apache.qpid.server.transport.TCPandSSLTransportTest#*



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org