You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/08/13 18:52:33 UTC
svn commit: r1617757 -
/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java
Author: robbie
Date: Wed Aug 13 16:52:33 2014
New Revision: 1617757
URL: http://svn.apache.org/r1617757
Log:
QPIDJMS-25: cleanup/fixups/formatting
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java?rev=1617757&r1=1617756&r2=1617757&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java Wed Aug 13 16:52:33 2014
@@ -38,37 +38,26 @@ import io.netty.channel.socket.SocketCha
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.demo.AbstractEventHandler;
import org.apache.qpid.proton.demo.EventHandler;
import org.apache.qpid.proton.demo.Events;
-import org.apache.qpid.proton.driver.Connector;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Transport;
public class AmqpConnectionDriverNetty extends AbstractEventHandler//TODO: HACK
{
- private static Logger _logger = Logger.getLogger(AmqpConnectionDriverNetty.class.getName());
-
- private final ConcurrentHashMap<AmqpConnection,Boolean> _locallyUpdatedConnections =
- new ConcurrentHashMap<AmqpConnection,Boolean>();
-//
-// private DriverRunnable _driverRunnable;
-// private Thread _driverThread;
+ //TODO: use or delete:
+ //private static Logger _logger = Logger.getLogger(AmqpConnectionDriverNetty.class.getName());
private final Bootstrap _bootstrap;
private AmqpConnection _amqpConnection;
@@ -85,17 +74,20 @@ public class AmqpConnectionDriverNetty e
public AmqpConnectionDriverNetty() throws IOException
{
+ //TODO: make config options configurable
+ int connectTimeoutMillis = 30000;
+ boolean autoRead = false;
+ boolean tcpNoDelay = true;
+ boolean tcpKeepAlive = true;
+ boolean tcpReuseAddr = true;
+
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
-
EventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group);
- int connectTimeoutMillis = 30000;
- boolean tcpKeepAlive = true;
- boolean tcpNoDelay = true;
- boolean tcpReuseAddr = true;
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis);
+ bootstrap.option(ChannelOption.AUTO_READ, autoRead);
bootstrap.option(ChannelOption.TCP_NODELAY, tcpNoDelay);
bootstrap.option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive);
bootstrap.option(ChannelOption.SO_REUSEADDR, tcpReuseAddr);
@@ -103,15 +95,12 @@ public class AmqpConnectionDriverNetty e
bootstrap.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false));
_nettyHandler = new NettyHandler();
- bootstrap.handler(new ChannelInitializer<SocketChannel>() {
-
+ bootstrap.handler(new ChannelInitializer<SocketChannel>()
+ {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(_nettyHandler);
-// p.addLast(new BinaryMemcacheClientCodec());
-// p.addLast(new BinaryMemcacheObjectAggregator(Integer.MAX_VALUE));
-// p.addLast(new MemcacheClientHandler());
}
});
@@ -130,19 +119,12 @@ public class AmqpConnectionDriverNetty e
ChannelFuture future = _bootstrap.connect(remoteHost, port);
future.awaitUninterruptibly();
- String threadName = null;
if (future.isSuccess())
{
//TODO connected, do anything extra required (e.g wait for successful SSL handshake).
- SocketAddress localAddress = future.channel().localAddress();
- SocketAddress remoteAddress = future.channel().remoteAddress();
-
- //TODO: delete?
- threadName = "DriverRunnable-" + String.valueOf(localAddress) + "/" + String.valueOf(remoteAddress);
}
else
{
- //TODO: log it?
Throwable t = future.cause();
throw new RuntimeException("Failed to connect", t);
@@ -158,27 +140,35 @@ public class AmqpConnectionDriverNetty e
private class NettyWriter extends AbstractEventHandler
{
-//TODO:delete
-// @Override
-// public void onInit(Connection conn)
-// {
-// ChannelHandlerContext ctx = (ChannelHandlerContext) _transport.getContext();
-// write(ctx);
-// scheduleReadIfCapacity(_transport, ctx);
-// }
-
@Override
- public void onTransport(Transport transport) {
+ public void onTransport(Transport transport)
+ {
ChannelHandlerContext ctx = (ChannelHandlerContext) transport.getContext();
write(ctx);
scheduleReadIfCapacity(transport, ctx);
}
+
+ @Override
+ public void onFlow(Link link)
+ {
+ //TODO: delete, somehow.
+ //This is only here because the client currently sends its messages without checking for credit,
+ //which can result in the flow arriving after the send attempt, which results in no transport event
+ //being emitted to signal the message can now be written, and so it may never get sent.
+
+ logMessage("Forcing transport write attempt after flow");
+ ChannelHandlerContext ctx = (ChannelHandlerContext) _transport.getContext();
+ write(ctx);
+ scheduleReadIfCapacity(_transport, ctx);
+ }
}
@Override
- public void channelActive(ChannelHandlerContext ctx) {
- synchronized (_amqpConnection) {
- System.out.println("ACTIVE");
+ public void channelActive(ChannelHandlerContext ctx)
+ {
+ synchronized (_amqpConnection)
+ {
+ logMessage("ACTIVE");
_transport = Transport.Factory.create();
_transport.setContext(ctx);
@@ -202,23 +192,23 @@ public class AmqpConnectionDriverNetty e
}
@Override
- public void channelRead(final ChannelHandlerContext ctx, Object msg) {
+ public void channelRead(final ChannelHandlerContext ctx, Object msg)
+ {
synchronized (_amqpConnection)
{
try
{
ByteBuf buf = (ByteBuf) msg;
- //TODO: delete
- ByteBuffer nio = buf.nioBuffer();
- byte[] bytes = new byte[nio.limit()];
- nio.get(bytes);
- System.out.println("Got Bytes: " + new Binary(bytes));
+ echoBytes("Got Bytes: ", buf);
- try {
- while (buf.readableBytes() > 0) {
+ try
+ {
+ while (buf.readableBytes() > 0)
+ {
int capacity = _transport.capacity();
- if (capacity <= 0) {
+ if (capacity <= 0)
+ {
throw new IllegalStateException("discarding bytes: " + buf.readableBytes());
}
ByteBuffer tail = _transport.tail();
@@ -229,7 +219,9 @@ public class AmqpConnectionDriverNetty e
processAmqpConnection();
dispatch();
}
- } finally {
+ }
+ finally
+ {
buf.release();
}
@@ -243,47 +235,51 @@ public class AmqpConnectionDriverNetty e
}
@Override
- public void channelInactive(ChannelHandlerContext ctx) {
- synchronized (_amqpConnection) {
- //System.out.println(String.format("CHANNEL CLOSED: settled %s, sent %s", settled, sent));
- System.out.println("CHANNEL CLOSED");
+ public void channelInactive(ChannelHandlerContext ctx)
+ {
+ synchronized (_amqpConnection)
+ {
+ logMessage("CHANNEL CLOSED");
_transport.close_tail();
dispatch();
}
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ {
+ logMessage("Exception caught: " + cause.getMessage());
cause.printStackTrace();
closeOnFlush(ctx.channel());
}
- /**
- * Closes the specified channel after all queued write requests are flushed.
- */
- void closeOnFlush(Channel ch) {
- if (ch.isActive()) {
+ void closeOnFlush(Channel ch)
+ {
+ if (ch.isActive())
+ {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
-
-
-
- private void dispatch() {
- System.out.println("Dispatch called");
- synchronized (_amqpConnection) {
+ private void dispatch()
+ {
+ logMessage("Dispatch called");
+ synchronized (_amqpConnection)
+ {
try
{
- if (dispatching) {
- System.out.println("Dispatch skipped");
+ if (dispatching)
+ {
+ logMessage("Dispatch skipped");
return;
}
- System.out.println("Dispatching");
+ logMessage("Dispatching");
dispatching = true;
Event ev;
- while ((ev = _collector.peek()) != null) {
- for (EventHandler h : handlers) {
+ while ((ev = _collector.peek()) != null)
+ {
+ for (EventHandler h : handlers)
+ {
Events.dispatch(ev, h);
//TODO: delete
//processAmqpConnection();
@@ -305,19 +301,19 @@ public class AmqpConnectionDriverNetty e
private void write(final ChannelHandlerContext ctx)
{
- System.out.println("Write called");
+ logMessage("Write called");
synchronized (_amqpConnection)
{
try
{
- System.out.println("Checking pending");
+ logMessage("Checking pending");
int pending = _transport.pending();
- System.out.println("Pending:" + pending);
+ logMessage("Pending:" + pending);
if (pending > 0)
{
final int size = pending - offset;
- System.out.println("Size:" + pending);
+ logMessage("Size:" + pending);
if (size > 0)
{
ByteBuf buffer = Unpooled.buffer(size);
@@ -326,10 +322,7 @@ public class AmqpConnectionDriverNetty e
buffer.writeBytes(head);
//TODO: delete
- ByteBuffer nio = buffer.nioBuffer();
- byte[] bytes = new byte[nio.limit()];
- nio.get(bytes);
- System.out.println("Sending Bytes: " + new Binary(bytes));
+ echoBytes("Sending Bytes: ", buffer);
ChannelFuture chf = ctx.writeAndFlush(buffer);
offset += size;
@@ -338,7 +331,7 @@ public class AmqpConnectionDriverNetty e
@Override
public void operationComplete(ChannelFuture chf)
{
- System.out.println("In completion callback");
+ logMessage("Running write completion callback");
if (chf.isSuccess())
{
synchronized (_amqpConnection)
@@ -356,7 +349,7 @@ public class AmqpConnectionDriverNetty e
_amqpConnection.notifyAll();
}
}
- write(ctx);
+ write(ctx);//TODO: fix. Calling this can cause us to fire channelInactive before reading any responses, see below
dispatch();
}
else
@@ -365,6 +358,7 @@ public class AmqpConnectionDriverNetty e
}
}
});
+ logMessage("Write completion callback added");
}
else
{
@@ -375,6 +369,7 @@ public class AmqpConnectionDriverNetty e
{
if (pending < 0)
{
+ //TODO: fix. Calling this can cause us to fire channelInactive before reading any responses
//closeOnFlush(ctx.channel());
}
return;
@@ -389,19 +384,19 @@ public class AmqpConnectionDriverNetty e
private void scheduleReadIfCapacity(Transport transport, ChannelHandlerContext ctx)
{
- System.out.println("Checking if read can be scheduled");
+ logMessage("Checking if read can be scheduled");
int capacity = transport.capacity();
if (capacity > 0)
{
- System.out.println("Scheduling read");
+ logMessage("Scheduling read");
ctx.read();
- System.out.println("Scheduled read");
+ logMessage("Scheduled read");
}
}
private void processAmqpConnection()
{
- System.out.println("Processing AmqpConnection");
+ logMessage("Processing AmqpConnection");
_amqpConnection.process();
}
}
@@ -413,10 +408,19 @@ public class AmqpConnectionDriverNetty e
@Override
public void run()
{
- //TODO: this is a hack
- System.out.println("Writing From Executor");
+ //TODO: this is a hack to somewhat replicate how the previous
+ //driver worked, until the higher layer code is rewritten not to need it
+ logMessage("Lazy Writing From Executor");
+// try
+// {
+// Thread.sleep(100);
+// }
+// catch (InterruptedException e)
+// {
+// //ignore
+// }
_nettyHandler.write((ChannelHandlerContext) _nettyHandler._transport.getContext());
- // _nettyHandler.dispatch();
+ //TODO:delete: _nettyHandler.dispatch();
}
});
}
@@ -425,4 +429,20 @@ public class AmqpConnectionDriverNetty e
{
// TODO Auto-generated method stub
}
+
+ private void logMessage(String message)
+ {
+ String name = Thread.currentThread().getName();
+ System.out.println("[" + name + "] " + message);
+ }
+
+ private void echoBytes(String msgPrefix, ByteBuf buf)
+ {
+ //TODO: delete this method
+ ByteBuffer nio = buf.nioBuffer();
+ byte[] bytes = new byte[nio.limit()];
+ nio.get(bytes);
+
+ logMessage(msgPrefix + new Binary(bytes));
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org