You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/12/08 18:01:58 UTC
svn commit: r1643866 - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/protocol/
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
broker-plugins/amqp-0-8-protoc...
Author: kwall
Date: Mon Dec 8 17:01:57 2014
New Revision: 1643866
URL: http://svn.apache.org/r1643866
Log:
NonBlockingSenderReceiver for the Java Broker for AMQP-0-10 and 0-9. Heartbeating working.
AMQ 1.0 not tested.
System tests failing at about ~PriorityQueueTest with seeming resource leak.
Added:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1643866&r1=1643865&r2=1643866&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Mon Dec 8 17:01:57 2014
@@ -278,7 +278,7 @@ public class MultiVersionProtocolEngine
private class SelfDelegateProtocolEngine implements ServerProtocolEngine
{
private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
- private long _lastReadTime;
+ private long _lastReadTime = System.currentTimeMillis();
public SocketAddress getRemoteAddress()
{
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1643866&r1=1643865&r2=1643866&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Mon Dec 8 17:01:57 2014
@@ -52,8 +52,8 @@ public class ProtocolEngine_0_10 extend
private ServerConnection _connection;
private long _createTime = System.currentTimeMillis();
- private long _lastReadTime;
- private long _lastWriteTime;
+ private long _lastReadTime = _createTime;
+ private long _lastWriteTime = _createTime;
public ProtocolEngine_0_10(ServerConnection conn,
NetworkConnection network)
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1643866&r1=1643865&r2=1643866&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Mon Dec 8 17:01:57 2014
@@ -170,7 +170,7 @@ public class AMQProtocolEngine implement
private Sender<ByteBuffer> _sender;
private volatile boolean _deferFlush;
- private long _lastReceivedTime;
+ private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want?
private boolean _blocking;
private final ReentrantLock _receivedLock;
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java?rev=1643866&r1=1643865&r2=1643866&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java Mon Dec 8 17:01:57 2014
@@ -41,8 +41,7 @@ public class NonBlockingConnection imple
private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
private final SocketChannel _socket;
private final long _timeout;
- private final NonBlockingSender _ioSender;
- private final NonBlockingReceiver _ioReceiver;
+ private final NonBlockingSenderReceiver _nonBlockingSenderReceiver;
private int _maxReadIdle;
private int _maxWriteIdle;
private Principal _principal;
@@ -55,36 +54,30 @@ public class NonBlockingConnection imple
_socket = socket;
_timeout = timeout;
- _ioReceiver = new NonBlockingReceiver(_socket, delegate, receiveBufferSize,_timeout);
- _ioReceiver.setTicker(ticker);
+// _ioReceiver = new NonBlockingReceiver(_socket, delegate, receiveBufferSize,_timeout);
+// _nonBlockingSenderReceiver.setTicker(ticker);
- _ioSender = new NonBlockingSender(_socket, 2 * sendBufferSize, _timeout);
+// _ioSender = new NonBlockingSender(_socket, 2 * sendBufferSize, _timeout);
- _ioSender.setReceiver(_ioReceiver);
+// _ioSender.setReceiver(_nonBlockingSenderReceiver);
+
+ _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker);
}
public void start()
{
- _ioSender.initiate();
- _ioReceiver.initiate();
+ _nonBlockingSenderReceiver.initiate();
}
public Sender<ByteBuffer> getSender()
{
- return _ioSender;
+ return _nonBlockingSenderReceiver;
}
public void close()
{
- try
- {
- _ioSender.close();
- }
- finally
- {
- _ioReceiver.close(false);
- }
+ _nonBlockingSenderReceiver.close();
}
public SocketAddress getRemoteAddress()
Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1643866&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java Mon Dec 8 17:01:57 2014
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.Ticker;
+
+public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class);
+
+ private final SocketChannel _socketChannel;
+ private final Selector _selector;
+
+ private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
+
+ private final Thread _ioThread;
+ private final String _remoteSocketAddress;
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private final Receiver<ByteBuffer> _receiver;
+ private final int _receiveBufSize;
+ private final Ticker _ticker;
+
+
+
+ public NonBlockingSenderReceiver(final SocketChannel socketChannel, Receiver<ByteBuffer> receiver, int receiveBufSize, Ticker ticker)
+ {
+ _socketChannel = socketChannel;
+ _receiver = receiver;
+ _receiveBufSize = receiveBufSize;
+ _ticker = ticker;
+
+ try
+ {
+ _remoteSocketAddress = socketChannel.getRemoteAddress().toString();
+ _socketChannel.configureBlocking(false);
+ _selector = Selector.open();
+ _socketChannel.register(_selector, SelectionKey.OP_READ);
+ }
+ catch (IOException e)
+ {
+ throw new SenderException("Unable to prepare the channel for non-blocking IO", e);
+ }
+ try
+ {
+ //Create but deliberately don't start the thread.
+ _ioThread = Threading.getThreadFactory().createThread(this);
+ }
+ catch(Exception e)
+ {
+ throw new SenderException("Error creating SenderReceiver thread for " + _remoteSocketAddress, e);
+ }
+
+ _ioThread.setDaemon(true);
+ _ioThread.setName(String.format("IoSenderReceiver - %s", _remoteSocketAddress));
+
+ }
+
+ public void initiate()
+ {
+ _ioThread.start();
+ }
+
+ @Override
+ public void setIdleTimeout(final int i)
+ {
+ // Probably unused - dead code to be removed??
+ }
+
+ @Override
+ public void send(final ByteBuffer msg)
+ {
+ // append to list and do selector wakeup
+ _buffers.add(msg);
+ _selector.wakeup();
+ }
+
+ @Override
+ public void run()
+ {
+ // never ending loop doing
+ // try to write all pending byte buffers, handle situation where zero bytes or part of a byte buffer is written
+ // read as much as you can
+ // try to write all pending byte buffers
+
+ while (!_closed.get())
+ {
+
+ try
+ {
+ long currentTime = System.currentTimeMillis();
+ int tick = _ticker.getTimeToNextTick(currentTime);
+ if(tick <= 0)
+ {
+ tick = _ticker.tick(currentTime);
+ }
+
+ LOGGER.debug("Tick " + tick);
+
+ int numberReady = _selector.select(tick <= 0 ? 1 : tick);
+ Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+ selectionKeys.clear();
+
+ LOGGER.debug("Number Ready " + numberReady);
+
+ doWrite();
+ doRead();
+ boolean fullyWritten = doWrite();
+
+ _socketChannel.register(_selector, fullyWritten ? SelectionKey.OP_READ : (SelectionKey.OP_WRITE | SelectionKey.OP_READ));
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e);
+ close();
+ }
+ }
+
+ try
+ {
+ while(!doWrite())
+ {
+ }
+
+ try
+ {
+ _receiver.closed();
+ }
+ finally
+ {
+ _socketChannel.close();
+ }
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception performing final output for thread '" + _remoteSocketAddress + "': " + e);
+ }
+ }
+
+
+
+ @Override
+ public void flush()
+ {
+ // maybe just wakeup?
+
+ }
+
+ @Override
+ public void close()
+ {
+ LOGGER.debug("Closing " + _remoteSocketAddress);
+
+ _closed.set(true);
+ _selector.wakeup();
+
+ }
+
+ private boolean doWrite() throws IOException
+ {
+ int byteBuffersWritten = 0;
+
+ ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
+ Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
+ for (int i = 0; i < bufArray.length; i++)
+ {
+ bufArray[i] = bufferIterator.next();
+ }
+
+ _socketChannel.write(bufArray);
+
+ for (ByteBuffer buf : bufArray)
+ {
+ if (buf.remaining() == 0)
+ {
+ byteBuffersWritten++;
+ _buffers.poll();
+ }
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Written " + byteBuffersWritten + " byte buffer(s) completely");
+ }
+
+ return bufArray.length == byteBuffersWritten;
+ }
+
+ private void doRead() throws IOException
+ {
+
+ ByteBuffer buffer;
+ int remaining;
+ do
+ {
+ buffer = ByteBuffer.allocate(_receiveBufSize);
+ _socketChannel.read(buffer);
+ remaining = buffer.remaining();
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Read " + buffer.position() + " byte(s)");
+ }
+ buffer.flip();
+ _receiver.received(buffer);
+ }
+ while (remaining == 0);
+
+ }
+}
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes?rev=1643866&r1=1643865&r2=1643866&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/test-profiles/JavaExcludes Mon Dec 8 17:01:57 2014
@@ -34,3 +34,4 @@ org.apache.qpid.test.client.message.Sele
// QPID-6262: Temporary exclusion whilst NIO refactoring is in flight
org.apache.qpid.client.ssl.SSLTest#*
+org.apache.qpid.server.transport.TCPandSSLTransportTest#*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org