You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/08/13 18:52:05 UTC
svn commit: r1617755 - in /qpid/jms/trunk: ./
src/main/java/org/apache/qpid/jms/engine/
src/main/java/org/apache/qpid/jms/impl/ src/test/java/org/apache/qpid/jms/
src/test/java/org/apache/qpid/jms/test/testpeer/
Author: robbie
Date: Wed Aug 13 16:52:04 2014
New Revision: 1617755
URL: http://svn.apache.org/r1617755
Log:
QPIDJMS-25: initial experimenting with Netty integration
Added:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java
Modified:
qpid/jms/trunk/pom.xml
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
Modified: qpid/jms/trunk/pom.xml
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/pom.xml?rev=1617755&r1=1617754&r2=1617755&view=diff
==============================================================================
--- qpid/jms/trunk/pom.xml (original)
+++ qpid/jms/trunk/pom.xml Wed Aug 13 16:52:04 2014
@@ -75,6 +75,19 @@
<version>${mockito-version}</version>
<scope>test</scope>
</dependency>
+
+ <!-- WIP: netty dependencies -->
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>4.0.19.Final</version>
+ </dependency>
+ <!-- WIP: temp demo dependency -->
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>proton-j-demo</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
<build>
Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java?rev=1617755&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriverNetty.java Wed Aug 13 16:52:04 2014
@@ -0,0 +1,431 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.engine;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.demo.AbstractEventHandler;
+import org.apache.qpid.proton.demo.EventHandler;
+import org.apache.qpid.proton.demo.Events;
+import org.apache.qpid.proton.driver.Connector;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Transport;
+
+public class AmqpConnectionDriverNetty extends AbstractEventHandler//TODO: HACK
+{
+ private static Logger _logger = Logger.getLogger(AmqpConnectionDriverNetty.class.getName());
+
+ private final ConcurrentHashMap<AmqpConnection,Boolean> _locallyUpdatedConnections =
+ new ConcurrentHashMap<AmqpConnection,Boolean>();
+//
+// private DriverRunnable _driverRunnable;
+// private Thread _driverThread;
+
+ private final Bootstrap _bootstrap;
+ private AmqpConnection _amqpConnection;
+ private ExecutorService _executorService;
+ private NettyHandler _nettyHandler;
+
+ public enum AmqpDriverState
+ {
+ UNINIT,
+ OPEN,
+ STOPPED,
+ ERROR;
+ }
+
+ public AmqpConnectionDriverNetty() throws IOException
+ {
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.channel(NioSocketChannel.class);
+
+ EventLoopGroup group = new NioEventLoopGroup();
+ bootstrap.group(group);
+
+ int connectTimeoutMillis = 30000;
+ boolean tcpKeepAlive = true;
+ boolean tcpNoDelay = true;
+ boolean tcpReuseAddr = true;
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis);
+ bootstrap.option(ChannelOption.TCP_NODELAY, tcpNoDelay);
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive);
+ bootstrap.option(ChannelOption.SO_REUSEADDR, tcpReuseAddr);
+
+ bootstrap.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false));
+
+ _nettyHandler = new NettyHandler();
+ bootstrap.handler(new ChannelInitializer<SocketChannel>() {
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(_nettyHandler);
+// p.addLast(new BinaryMemcacheClientCodec());
+// p.addLast(new BinaryMemcacheObjectAggregator(Integer.MAX_VALUE));
+// p.addLast(new MemcacheClientHandler());
+ }
+ });
+
+ _bootstrap = bootstrap;
+
+ _executorService = Executors.newSingleThreadExecutor();
+ }
+
+ public void registerConnection(AmqpConnection amqpConnection)
+ {
+ String remoteHost = amqpConnection.getRemoteHost();
+ int port = amqpConnection.getPort();
+
+ _amqpConnection = amqpConnection;
+
+ ChannelFuture future = _bootstrap.connect(remoteHost, port);
+ future.awaitUninterruptibly();
+
+ String threadName = null;
+ if (future.isSuccess())
+ {
+ //TODO connected, do anything extra required (e.g wait for successful SSL handshake).
+ SocketAddress localAddress = future.channel().localAddress();
+ SocketAddress remoteAddress = future.channel().remoteAddress();
+
+ //TODO: delete?
+ threadName = "DriverRunnable-" + String.valueOf(localAddress) + "/" + String.valueOf(remoteAddress);
+ }
+ else
+ {
+ //TODO: log it?
+ Throwable t = future.cause();
+
+ throw new RuntimeException("Failed to connect", t);
+ }
+ }
+
+ private class NettyHandler extends ChannelInboundHandlerAdapter
+ {
+ private boolean dispatching = false;
+ private Transport _transport;
+ private Collector _collector;
+ private EventHandler[] handlers = {new NettyWriter()};//TODO: something?
+
+ private class NettyWriter extends AbstractEventHandler
+ {
+//TODO:delete
+// @Override
+// public void onInit(Connection conn)
+// {
+// ChannelHandlerContext ctx = (ChannelHandlerContext) _transport.getContext();
+// write(ctx);
+// scheduleReadIfCapacity(_transport, ctx);
+// }
+
+ @Override
+ public void onTransport(Transport transport) {
+ ChannelHandlerContext ctx = (ChannelHandlerContext) transport.getContext();
+ write(ctx);
+ scheduleReadIfCapacity(transport, ctx);
+ }
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+ synchronized (_amqpConnection) {
+ System.out.println("ACTIVE");
+ _transport = Transport.Factory.create();
+ _transport.setContext(ctx);
+
+ Connection connection = _amqpConnection.getConnection();
+
+ Sasl sasl = _transport.sasl();
+//TODO: non-anonymous
+// if (sasl != null)
+// {
+// sasl.client();
+// }
+//
+// _amqpConnection.setSasl(sasl);
+
+ sasl.client();
+ sasl.setMechanisms("ANONYMOUS");
+ _transport.bind(connection);
+
+ _collector = Collector.Factory.create();
+ connection.collect(_collector);
+
+ // XXX: really we should fire both of these off of an
+ // initial transport event
+ write(ctx);
+ scheduleReadIfCapacity(_transport, ctx);
+ }
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, Object msg) {
+ synchronized (_amqpConnection)
+ {
+ try
+ {
+ ByteBuf buf = (ByteBuf) msg;
+
+ //TODO: delete
+ ByteBuffer nio = buf.nioBuffer();
+ byte[] bytes = new byte[nio.limit()];
+ nio.get(bytes);
+ System.out.println("Got Bytes: " + new Binary(bytes));
+
+ try {
+ while (buf.readableBytes() > 0) {
+ int capacity = _transport.capacity();
+ if (capacity <= 0) {
+ throw new IllegalStateException("discarding bytes: " + buf.readableBytes());
+ }
+ ByteBuffer tail = _transport.tail();
+ int min = Math.min(capacity, buf.readableBytes());
+ tail.limit(tail.position() + min);
+ buf.readBytes(tail);
+ _transport.process();
+ dispatch();
+ }
+ } finally {
+ buf.release();
+ }
+
+ scheduleReadIfCapacity(_transport, ctx);
+ }
+ finally
+ {
+ _amqpConnection.notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) {
+ synchronized (_amqpConnection) {
+ //System.out.println(String.format("CHANNEL CLOSED: settled %s, sent %s", settled, sent));
+ System.out.println("CHANNEL CLOSED");
+ _transport.close_tail();
+ dispatch();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ cause.printStackTrace();
+ closeOnFlush(ctx.channel());
+ }
+
+ /**
+ * Closes the specified channel after all queued write requests are flushed.
+ */
+ void closeOnFlush(Channel ch) {
+ if (ch.isActive()) {
+ ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+
+
+
+
+ private void dispatch() {
+ synchronized (_amqpConnection) {
+ try
+ {
+ if (dispatching) {
+ return;
+ }
+
+ dispatching = true;
+ Event ev;
+ while ((ev = _collector.peek()) != null) {
+ for (EventHandler h : handlers) {
+ Events.dispatch(ev, h);
+ processAmqpConnection();
+ }
+ _collector.pop();
+ }
+
+ dispatching = false;
+ }
+ finally
+ {
+ _amqpConnection.notifyAll();
+ }
+ }
+ }
+
+
+ private int offset = 0;
+
+ private void write(final ChannelHandlerContext ctx)
+ {
+ System.out.println("Write called");
+ synchronized (_amqpConnection)
+ {
+ try
+ {
+ System.out.println("Checking pending");
+ int pending = _transport.pending();
+
+ System.out.println("Pending:" + pending);
+ if (pending > 0)
+ {
+ final int size = pending - offset;
+ System.out.println("Size:" + pending);
+ if (size > 0)
+ {
+ ByteBuf buffer = Unpooled.buffer(size);
+ ByteBuffer head = _transport.head();
+ head.position(offset);
+ buffer.writeBytes(head);
+
+ //TODO: delete
+ ByteBuffer nio = buffer.nioBuffer();
+ byte[] bytes = new byte[nio.limit()];
+ nio.get(bytes);
+ System.out.println("Sending Bytes: " + new Binary(bytes));
+
+ ChannelFuture chf = ctx.writeAndFlush(buffer);
+ offset += size;
+ chf.addListener(new ChannelFutureListener()
+ {
+ @Override
+ public void operationComplete(ChannelFuture chf)
+ {
+ System.out.println("In completion callback");
+ if (chf.isSuccess())
+ {
+ synchronized (_amqpConnection)
+ {
+ try
+ {
+ _transport.pop(size);
+ offset -= size;
+
+ processAmqpConnection();
+ }
+ finally
+ {
+ _amqpConnection.notifyAll();
+ }
+ }
+ write(ctx);
+ dispatch();
+ }
+ else
+ {
+ // ???
+ }
+ }
+ });
+ }
+ else
+ {
+ return;
+ }
+ }
+ else
+ {
+ if (pending < 0)
+ {
+ //closeOnFlush(ctx.channel());
+ }
+ return;
+ }
+ }
+ finally
+ {
+ _amqpConnection.notifyAll();
+ }
+ }
+ }
+
+ private void scheduleReadIfCapacity(Transport transport, ChannelHandlerContext ctx)
+ {
+ System.out.println("Checking if read can be scheduled");
+ int capacity = transport.capacity();
+ if (capacity > 0)
+ {
+ System.out.println("Scheduling read");
+ ctx.read();
+ System.out.println("Scheduled read");
+ }
+ }
+
+ private void processAmqpConnection()
+ {
+ _amqpConnection.process();
+ // _amqpConnection.notifyAll();
+// _amqpConnection.process();
+// _amqpConnection.process();
+ }
+ }
+
+ public void setLocallyUpdated(AmqpConnection amqpConnection)
+ {
+ _executorService.execute(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ //TODO: this is a hack
+ System.out.println("Writing From Executor");
+ _nettyHandler.write((ChannelHandlerContext) _nettyHandler._transport.getContext());
+ // _nettyHandler.dispatch();
+ }
+ });
+ }
+
+ public void stop() throws InterruptedException
+ {
+ // TODO Auto-generated method stub
+ }
+}
\ No newline at end of file
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java?rev=1617755&r1=1617754&r2=1617755&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java Wed Aug 13 16:52:04 2014
@@ -36,6 +36,7 @@ import javax.jms.Topic;
import org.apache.qpid.jms.engine.AmqpConnection;
import org.apache.qpid.jms.engine.AmqpConnectionDriver;
+import org.apache.qpid.jms.engine.AmqpConnectionDriverNetty;
import org.apache.qpid.jms.engine.AmqpSession;
/**
@@ -56,7 +57,7 @@ public class ConnectionImpl implements C
private AmqpConnection _amqpConnection;
/** The driver dedicated to this connection */
- private AmqpConnectionDriver _amqpConnectionDriver;
+ private AmqpConnectionDriverNetty _amqpConnectionDriver;
private ConnectionLock _connectionLock;
@@ -80,7 +81,7 @@ public class ConnectionImpl implements C
try
{
- _amqpConnectionDriver = new AmqpConnectionDriver();
+ _amqpConnectionDriver = new AmqpConnectionDriverNetty();
_amqpConnectionDriver.registerConnection(_amqpConnection);
_connectionLock = new ConnectionLock(this);
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java?rev=1617755&r1=1617754&r2=1617755&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java Wed Aug 13 16:52:04 2014
@@ -716,6 +716,10 @@ public class MessageIntegrationTest exte
Message message = session.createTextMessage();
message.setJMSCorrelationID(stringCorrelationId);
+ //TODO:delete
+ //System.out.println("##### NOT SLEEPING #####");
+ //Thread.sleep(500);
+
producer.send(message);
testPeer.waitForAllHandlersToComplete(3000);
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java?rev=1617755&r1=1617754&r2=1617755&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java Wed Aug 13 16:52:04 2014
@@ -238,8 +238,47 @@ public class TestAmqpPeer implements Aut
_driverRunnable.sendBytes(output);
}
+ public void expectAnonymousConnect(boolean authorize)
+ {
+ SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(Symbol.valueOf("ANONYMOUS"));
+ addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER,
+ new FrameSender(
+ this, FrameType.SASL, 0,
+ saslMechanismsFrame, null)));
+
+ addHandler(new SaslInitMatcher()
+ .withMechanism(equalTo(Symbol.valueOf("ANONYMOUS")))
+ .onSuccess(new AmqpPeerRunnable()
+ {
+ @Override
+ public void run()
+ {
+ TestAmqpPeer.this.sendFrame(
+ FrameType.SASL, 0,
+ new SaslOutcomeFrame().setCode(UnsignedByte.valueOf((byte)0)),
+ null);
+ _driverRunnable.expectHeader();
+ }
+ }));
+
+ addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER));
+
+ addHandler(new OpenMatcher()
+ .withContainerId(notNullValue(String.class))
+ .onSuccess(new FrameSender(
+ this, FrameType.AMQP, 0,
+ new OpenFrame().setContainerId("test-amqp-peer-container-id"),
+ null)));
+ }
+
public void expectPlainConnect(String username, String password, boolean authorize)
{
+ //TODO: remove this hack. This is just here to avoid changing all the tests.
+ expectAnonymousConnect(authorize);
+ }
+
+ public void expectPlainConnectOrig(String username, String password, boolean authorize)
+ {
SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(Symbol.valueOf("PLAIN"));
addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER,
new FrameSender(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org