You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/27 13:54:31 UTC

[04/15] activemq-artemis git commit: ARTEMIS-751 Simplification of the AMQP implementation

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java
deleted file mode 100644
index 59772f0..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.minimalclient;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.concurrent.Executors;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-import org.proton.plug.AMQPClientConnectionContext;
-import org.proton.plug.context.client.ProtonClientConnectionContextFactory;
-
-public class SimpleAMQPConnector implements Connector {
-
-   private Bootstrap bootstrap;
-
-   @Override
-   public void start() {
-
-      bootstrap = new Bootstrap();
-      bootstrap.channel(NioSocketChannel.class);
-      bootstrap.group(new NioEventLoopGroup(10));
-
-      bootstrap.handler(new ChannelInitializer<Channel>() {
-            @Override
-            public void initChannel(Channel channel) throws Exception {
-            }
-         });
-   }
-
-   @Override
-   public AMQPClientConnectionContext connect(String host, int port) throws Exception {
-      SocketAddress remoteDestination = new InetSocketAddress(host, port);
-
-      ChannelFuture future = bootstrap.connect(remoteDestination);
-
-      future.awaitUninterruptibly();
-
-      AMQPClientSPI clientConnectionSPI = new AMQPClientSPI(future.channel());
-
-      final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
-
-      future.channel().pipeline().addLast(new ChannelDuplexHandler() {
-            @Override
-            public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
-               ByteBuf buffer = (ByteBuf) msg;
-               connection.inputBuffer(buffer);
-            }
-         });
-
-      return connection;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java
deleted file mode 100644
index 10499ef..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.minimalserver;
-
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingDeque;
-
-public class DumbServer {
-
-   static ConcurrentMap<String, BlockingDeque<Object>> maps = new ConcurrentHashMap<>();
-
-   public static BlockingDeque<Object> getQueue(String name) {
-      BlockingDeque<Object> q = maps.get(name);
-      if (q == null) {
-         q = new LinkedBlockingDeque<>();
-         BlockingDeque<Object> oldValue = maps.putIfAbsent(name, q);
-         if (oldValue != null) {
-            q = oldValue;
-         }
-      }
-      return q;
-   }
-
-   public static void clear() {
-      for (BlockingDeque<Object> queue : maps.values()) {
-         // We clear the queues just in case there is a component holding it
-         queue.clear();
-      }
-      maps.clear();
-   }
-
-   public static void put(String queue, Object message) {
-      getQueue(queue).add(message);
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
deleted file mode 100644
index 6325ad7..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.minimalserver;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.engine.Connection;
-import org.jboss.logging.Logger;
-import org.proton.plug.AMQPConnectionContext;
-import org.proton.plug.AMQPConnectionCallback;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.SASLResult;
-import org.proton.plug.ServerSASL;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.sasl.AnonymousServerSASL;
-import org.proton.plug.sasl.ServerSASLPlain;
-import org.proton.plug.util.ByteUtil;
-import org.proton.plug.util.ReusableLatch;
-
-public class MinimalConnectionSPI implements AMQPConnectionCallback {
-
-   private static final Logger logger = Logger.getLogger(MinimalConnectionSPI.class);
-   Channel channel;
-
-   private AMQPConnectionContext connection;
-
-   public MinimalConnectionSPI(Channel channel) {
-      this.channel = channel;
-   }
-
-   ExecutorService executorService = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
-
-   @Override
-   public void close() {
-      executorService.shutdown();
-   }
-
-   @Override
-   public void setConnection(AMQPConnectionContext connection) {
-      this.connection = connection;
-   }
-
-   @Override
-   public AMQPConnectionContext getConnection() {
-      return connection;
-   }
-
-   final ReusableLatch latch = new ReusableLatch(0);
-
-   @Override
-   public ServerSASL[] getSASLMechnisms() {
-      return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()};
-   }
-
-   @Override
-   public boolean isSupportsAnonymous() {
-      return true;
-   }
-
-   @Override
-   public void sendSASLSupported() {
-
-   }
-
-   @Override
-   public boolean validateConnection(Connection connection, SASLResult saslResult) {
-      return true;
-   }
-
-   @Override
-   public Binary newTransaction() {
-      return null;
-   }
-
-   @Override
-   public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
-      return null;
-   }
-
-   @Override
-   public void removeTransaction(Binary txid) {
-
-   }
-
-   @Override
-   public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
-      final int bufferSize = bytes.writerIndex();
-
-      if (logger.isTraceEnabled()) {
-         // some debug
-         byte[] frame = new byte[bytes.writerIndex()];
-         int readerOriginalPos = bytes.readerIndex();
-
-         bytes.getBytes(0, frame);
-
-         try {
-            System.err.println("Buffer Outgoing: " + "\n" + ByteUtil.formatGroup(ByteUtil.bytesToHex(frame), 4, 16));
-         }
-         catch (Exception e) {
-            e.printStackTrace();
-         }
-
-         bytes.readerIndex(readerOriginalPos);
-      }
-
-      latch.countUp();
-      // ^^ debug
-
-      channel.writeAndFlush(bytes).addListener(new ChannelFutureListener() {
-         @Override
-         public void operationComplete(ChannelFuture future) throws Exception {
-            latch.countDown();
-
-            //   https://issues.apache.org/jira/browse/PROTON-645
-            //            connection.outputDone(bufferSize);
-            //            if (connection.capacity() > 0)
-            //            {
-            //               channel.read();
-            //            }
-         }
-      });
-
-      channel.flush();
-
-      if (connection.isSyncOnFlush()) {
-         try {
-            if (!latch.await(5, TimeUnit.SECONDS)) {
-               // TODO logs
-               System.err.println("Flush took longer than 5 seconds!!!");
-            }
-         }
-         catch (Throwable e) {
-            e.printStackTrace();
-         }
-      }
-      connection.outputDone(bufferSize);
-
-      //      if (connection.capacity() > 0)
-      //      {
-      //         channel.read();
-      //      }
-   }
-
-   @Override
-   public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
-      return new MinimalSessionSPI();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java
deleted file mode 100644
index 8729cb4..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.minimalserver;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.concurrent.Executors;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.ServerChannel;
-import io.netty.channel.group.ChannelGroup;
-import io.netty.channel.group.ChannelGroupFuture;
-import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.ByteToMessageDecoder;
-import io.netty.util.ResourceLeakDetector;
-import io.netty.util.concurrent.GlobalEventExecutor;
-
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-import org.proton.plug.AMQPServerConnectionContext;
-import org.proton.plug.context.server.ProtonServerConnectionContextFactory;
-import org.proton.plug.test.Constants;
-
-/**
- * A Netty TCP Acceptor that supports SSL
- */
-public class MinimalServer {
-
-   static {
-      // Disable resource leak detection for performance reasons by default
-      ResourceLeakDetector.setEnabled(false);
-   }
-
-   private Class<? extends ServerChannel> channelClazz;
-
-   private EventLoopGroup eventLoopGroup;
-
-   private volatile ChannelGroup serverChannelGroup;
-
-   private volatile ChannelGroup channelGroup;
-
-   private ServerBootstrap bootstrap;
-
-   private String host;
-
-   private boolean sasl;
-
-   // Constants.PORT is the default here
-   private int port;
-
-   public synchronized void start(String host, int port, final boolean sasl) throws Exception {
-      this.host = host;
-      this.port = port;
-      this.sasl = sasl;
-
-      if (channelClazz != null) {
-         // Already started
-         return;
-      }
-
-      int threadsToUse = Runtime.getRuntime().availableProcessors() * 3;
-      channelClazz = NioServerSocketChannel.class;
-      eventLoopGroup = new NioEventLoopGroup(threadsToUse, new SimpleServerThreadFactory("simple-server", true, Thread.currentThread().getContextClassLoader()));
-
-      bootstrap = new ServerBootstrap();
-      bootstrap.group(eventLoopGroup);
-      bootstrap.channel(channelClazz);
-
-      ChannelInitializer<Channel> factory = new ChannelInitializer<Channel>() {
-         @Override
-         public void initChannel(Channel channel) throws Exception {
-            ChannelPipeline pipeline = channel.pipeline();
-            pipeline.addLast("amqp-handler", new ProtocolDecoder());
-         }
-      };
-      bootstrap.childHandler(factory);
-
-      bootstrap.option(ChannelOption.SO_REUSEADDR, true).
-         childOption(ChannelOption.SO_REUSEADDR, true).
-         childOption(ChannelOption.SO_KEEPALIVE, true).
-         //       childOption(ChannelOption.AUTO_READ, false).
-            childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-
-      channelGroup = new DefaultChannelGroup("activemq-accepted-channels", GlobalEventExecutor.INSTANCE);
-
-      serverChannelGroup = new DefaultChannelGroup("activemq-acceptor-channels", GlobalEventExecutor.INSTANCE);
-
-      SocketAddress address;
-      address = new InetSocketAddress(host, port);
-      Channel serverChannel = bootstrap.bind(address).syncUninterruptibly().channel();
-      serverChannelGroup.add(serverChannel);
-
-   }
-
-   class ProtocolDecoder extends ByteToMessageDecoder {
-
-      AMQPServerConnectionContext connection;
-
-      ProtocolDecoder() {
-      }
-
-      @Override
-      public void channelActive(ChannelHandlerContext ctx) throws Exception {
-         super.channelActive(ctx);
-         connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new MinimalConnectionSPI(ctx.channel()), Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
-         //ctx.read();
-      }
-
-      @Override
-      protected void decode(final ChannelHandlerContext ctx, ByteBuf byteIn, List<Object> out) throws Exception {
-         connection.inputBuffer(byteIn);
-         ctx.flush();
-         //         if (connection.capacity() > 0)
-         //         {
-         //            ctx.read();
-         //         }
-      }
-   }
-
-   public synchronized void stop() {
-      if (serverChannelGroup != null) {
-         serverChannelGroup.close().awaitUninterruptibly();
-      }
-
-      if (channelGroup != null) {
-         ChannelGroupFuture future = channelGroup.close().awaitUninterruptibly();
-      }
-   }
-
-   public static void main(String[] arg) {
-      MinimalServer server = new MinimalServer();
-      try {
-         server.start("127.0.0.1", Constants.PORT, true);
-
-         while (true) {
-            Thread.sleep(360000000);
-         }
-      }
-      catch (Throwable e) {
-         e.printStackTrace();
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
deleted file mode 100644
index d366c5b..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.minimalserver;
-
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.message.ProtonJMessage;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.AMQPSessionContext;
-import org.proton.plug.SASLResult;
-import org.proton.plug.context.ProtonPlugSender;
-import org.proton.plug.context.server.ProtonServerSessionContext;
-import org.proton.plug.util.ProtonServerMessage;
-
-public class MinimalSessionSPI implements AMQPSessionCallback {
-
-   private SASLResult result;
-   ProtonServerSessionContext session;
-
-   @Override
-   public void init(AMQPSessionContext session, SASLResult result) {
-      this.session = (ProtonServerSessionContext) session;
-      this.result = result;
-   }
-
-   @Override
-   public void start() {
-   }
-
-   static AtomicInteger tempQueueGenerator = new AtomicInteger(0);
-
-   @Override
-   public String tempQueueName() {
-      return "TempQueueName" + tempQueueGenerator.incrementAndGet();
-   }
-
-   @Override
-   public Object createSender(ProtonPlugSender plugSender, String queue, String filer, boolean browserOnly) {
-      Consumer consumer = new Consumer(DumbServer.getQueue(queue));
-      return consumer;
-   }
-
-   @Override
-   public void startSender(Object brokerConsumer) {
-      ((Consumer) brokerConsumer).start();
-   }
-
-   @Override
-   public void createTemporaryQueue(String queueName) {
-
-   }
-
-   @Override
-   public void createDurableQueue(String address, String queueName, String filter) throws Exception {
-
-   }
-
-   @Override
-   public void offerProducerCredit(String address, int credits, int threshold, Receiver receiver) {
-
-   }
-
-   @Override
-   public void createTemporaryQueue(String address, String queueName, String filter) throws Exception {
-
-   }
-
-   @Override
-   public void deleteQueue(String address) throws Exception {
-
-   }
-
-   @Override
-   public String getPubSubPrefix() {
-      return null;
-   }
-
-   @Override
-   public void onFlowConsumer(Object consumer, int credits, boolean drain) {
-   }
-
-   @Override
-   public QueueQueryResult queueQuery(String queueName, boolean autoCreate) {
-      return new QueueQueryResult(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), false, false, null, 0, 0, false);
-   }
-
-   @Override
-   public boolean bindingQuery(String address) throws Exception {
-      return true;
-   }
-
-   @Override
-   public void closeSender(Object brokerConsumer) {
-      ((Consumer) brokerConsumer).close();
-   }
-
-   @Override
-   public ProtonJMessage encodeMessage(Object message, int deliveryCount) {
-      // We are storing internally as EncodedMessage on this minimalserver server
-      return (ProtonServerMessage) message;
-   }
-
-   @Override
-   public Transaction getTransaction(Binary txid) {
-      return new TransactionImpl(new NullStorageManager());
-   }
-
-   @Override
-   public Binary newTransaction() {
-      return null;
-   }
-
-   @Override
-   public void commitTX(Binary txid) throws Exception {
-
-   }
-
-   @Override
-   public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception {
-
-   }
-
-   @Override
-   public void close() {
-
-   }
-
-   @Override
-   public void ack(Transaction tx, Object brokerConsumer, Object message) {
-
-   }
-
-   @Override
-   public void cancel(Object brokerConsumer, Object message, boolean updateCounts) {
-
-   }
-
-   @Override
-   public void resumeDelivery(Object consumer) {
-      System.out.println("Resume delivery!!!");
-      ((Consumer) consumer).start();
-   }
-
-   @Override
-   public void serverSend(Transaction tx, Receiver receiver, Delivery delivery, String address, int messageFormat, ByteBuf buffer) {
-      ProtonServerMessage serverMessage = new ProtonServerMessage();
-      serverMessage.decode(buffer.nioBuffer());
-
-      BlockingDeque<Object> queue = DumbServer.getQueue(address);
-      queue.add(serverMessage);
-   }
-
-   class Consumer {
-
-      final BlockingDeque<Object> queue;
-
-      Consumer(BlockingDeque<Object> queue) {
-         this.queue = queue;
-      }
-
-      boolean running = false;
-      volatile Thread thread;
-
-      public void close() {
-         System.out.println("Closing!!!");
-         running = false;
-         if (thread != null && Thread.currentThread() != thread) {
-            try {
-               thread.join(1000);
-            }
-            catch (Throwable ignored) {
-            }
-         }
-
-         thread = null;
-      }
-
-      public synchronized void start() {
-         running = true;
-         if (thread == null) {
-            System.out.println("Start!!!");
-            thread = new Thread() {
-               @Override
-               public void run() {
-                  try {
-                     while (running) {
-                        Object msg = queue.poll(1, TimeUnit.SECONDS);
-
-                        if (msg != null) {
-                           session.serverDelivery(msg, Consumer.this, 1);
-                        }
-                     }
-                  }
-                  catch (Exception e) {
-                     e.printStackTrace();
-                  }
-               }
-            };
-            thread.start();
-         }
-      }
-
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/SimpleServerThreadFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/SimpleServerThreadFactory.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/SimpleServerThreadFactory.java
deleted file mode 100644
index ac1d28f..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/SimpleServerThreadFactory.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.minimalserver;
-
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public final class SimpleServerThreadFactory implements ThreadFactory {
-
-   private final ThreadGroup group;
-
-   private final AtomicInteger threadCount = new AtomicInteger(0);
-
-   private final int threadPriority;
-
-   private final boolean daemon;
-
-   private final ClassLoader tccl;
-
-   public SimpleServerThreadFactory(final String groupName, final boolean daemon, final ClassLoader tccl) {
-      group = new ThreadGroup(groupName + "-" + System.identityHashCode(this));
-
-      this.threadPriority = Thread.NORM_PRIORITY;
-
-      this.tccl = tccl;
-
-      this.daemon = daemon;
-   }
-
-   @Override
-   public Thread newThread(final Runnable command) {
-      final Thread t;
-      // attach the thread to a group only if there is no security manager:
-      // when sandboxed, the code does not have the RuntimePermission modifyThreadGroup
-      if (System.getSecurityManager() == null) {
-         t = new Thread(group, command, "Thread-" + threadCount.getAndIncrement() + " (" + group.getName() + ")");
-      }
-      else {
-         t = new Thread(command, "Thread-" + threadCount.getAndIncrement());
-      }
-
-      AccessController.doPrivileged(new PrivilegedAction<Object>() {
-         @Override
-         public Object run() {
-            t.setDaemon(daemon);
-            t.setPriority(threadPriority);
-            return null;
-         }
-      });
-
-      try {
-         AccessController.doPrivileged(new PrivilegedAction<Object>() {
-            @Override
-            public Object run() {
-               t.setContextClassLoader(tccl);
-               return null;
-            }
-         });
-      }
-      catch (java.security.AccessControlException e) {
-         e.printStackTrace();
-      }
-
-      return t;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/sasl/PlainSASLTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/sasl/PlainSASLTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/sasl/PlainSASLTest.java
deleted file mode 100644
index a085eab..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/sasl/PlainSASLTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.sasl;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.proton.plug.sasl.ClientSASLPlain;
-import org.proton.plug.sasl.PlainSASLResult;
-import org.proton.plug.sasl.ServerSASLPlain;
-
-public class PlainSASLTest {
-
-   @Test
-   public void testPlain() {
-      ClientSASLPlain plainSASL = new ClientSASLPlain("user-me", "password-secret");
-      byte[] bytesResult = plainSASL.getBytes();
-
-      ServerSASLPlain serverSASLPlain = new ServerSASLPlain();
-      PlainSASLResult result = (PlainSASLResult) serverSASLPlain.processSASL(bytesResult);
-      Assert.assertEquals("user-me", result.getUser());
-      Assert.assertEquals("password-secret", result.getPassword());
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/CreditsSemaphoreTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/CreditsSemaphoreTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/CreditsSemaphoreTest.java
deleted file mode 100644
index b7ae38b..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/CreditsSemaphoreTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.util;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.proton.plug.util.CreditsSemaphore;
-
-public class CreditsSemaphoreTest {
-
-   final CreditsSemaphore semaphore = new CreditsSemaphore(10);
-
-   final AtomicInteger errors = new AtomicInteger(0);
-
-   final AtomicInteger acquired = new AtomicInteger(0);
-
-   final CountDownLatch waiting = new CountDownLatch(1);
-
-   Thread thread = new Thread() {
-      @Override
-      public void run() {
-         try {
-            for (int i = 0; i < 12; i++) {
-               if (!semaphore.tryAcquire()) {
-                  waiting.countDown();
-                  semaphore.acquire();
-               }
-               acquired.incrementAndGet();
-            }
-         }
-         catch (Throwable e) {
-            e.printStackTrace();
-            errors.incrementAndGet();
-         }
-      }
-   };
-
-   @Test
-   public void testSetAndRelease() throws Exception {
-      thread.start();
-
-      // 5 seconds would be an eternity here
-      Assert.assertTrue(waiting.await(5, TimeUnit.SECONDS));
-
-      Assert.assertEquals(0, semaphore.getCredits());
-
-      long timeout = System.currentTimeMillis() + 1000;
-      while (!semaphore.hasQueuedThreads() && System.currentTimeMillis() < timeout) {
-         Thread.sleep(10);
-      }
-
-      Assert.assertTrue(semaphore.hasQueuedThreads());
-
-      semaphore.setCredits(2);
-
-      thread.join();
-
-      Assert.assertEquals(12, acquired.get());
-
-      Assert.assertFalse(semaphore.hasQueuedThreads());
-   }
-
-   @Test
-   public void testDownAndUp() throws Exception {
-      thread.start();
-
-      // 5 seconds would be an eternity here
-      Assert.assertTrue(waiting.await(5, TimeUnit.SECONDS));
-
-      Assert.assertEquals(0, semaphore.getCredits());
-
-      long timeout = System.currentTimeMillis() + 1000;
-      while (!semaphore.hasQueuedThreads() && System.currentTimeMillis() < timeout) {
-         Thread.sleep(10);
-      }
-
-      Assert.assertTrue(semaphore.hasQueuedThreads());
-
-      semaphore.release(2);
-
-      thread.join();
-
-      Assert.assertEquals(12, acquired.get());
-
-      Assert.assertFalse(semaphore.hasQueuedThreads());
-   }
-
-   @Test
-   public void testStartedZeroedSetLater() throws Exception {
-      semaphore.setCredits(0);
-
-      thread.start();
-
-      // 5 seconds would be an eternity here
-      Assert.assertTrue(waiting.await(5, TimeUnit.SECONDS));
-
-      Assert.assertEquals(0, semaphore.getCredits());
-
-      long timeout = System.currentTimeMillis() + 1000;
-      while (!semaphore.hasQueuedThreads() && System.currentTimeMillis() < timeout) {
-         Thread.sleep(10);
-      }
-
-      Assert.assertTrue(semaphore.hasQueuedThreads());
-
-      Assert.assertEquals(0, acquired.get());
-
-      semaphore.setCredits(12);
-
-      thread.join();
-
-      Assert.assertEquals(12, acquired.get());
-
-      Assert.assertFalse(semaphore.hasQueuedThreads());
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/ReusableLatchTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/ReusableLatchTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/ReusableLatchTest.java
deleted file mode 100644
index 8909b5e..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/ReusableLatchTest.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.util;
-
-import java.util.concurrent.CountDownLatch;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.proton.plug.util.ReusableLatch;
-
-public class ReusableLatchTest {
-
-   @Test
-   public void testLatchWithParameterizedDown() throws Exception {
-      ReusableLatch latch = new ReusableLatch(1000);
-
-      latch.countDown(5000);
-
-      Assert.assertTrue(latch.await(1000));
-
-      Assert.assertEquals(0, latch.getCount());
-   }
-
-   @Test
-   public void testLatchOnSingleThread() throws Exception {
-      ReusableLatch latch = new ReusableLatch();
-
-      for (int i = 1; i <= 100; i++) {
-         latch.countUp();
-         Assert.assertEquals(i, latch.getCount());
-      }
-
-      for (int i = 100; i > 0; i--) {
-         Assert.assertEquals(i, latch.getCount());
-         latch.countDown();
-         Assert.assertEquals(i - 1, latch.getCount());
-      }
-
-      latch.await();
-   }
-
-   /**
-    * This test will open numberOfThreads threads, and add numberOfAdds on the
-    * VariableLatch After those addthreads are finished, the latch count should
-    * be numberOfThreads * numberOfAdds Then it will open numberOfThreads
-    * threads again releasing numberOfAdds on the VariableLatch After those
-    * releaseThreads are finished, the latch count should be 0 And all the
-    * waiting threads should be finished also
-    *
-    * @throws Exception
-    */
-   @Test
-   public void testLatchOnMultiThread() throws Exception {
-      final ReusableLatch latch = new ReusableLatch();
-
-      latch.countUp(); // We hold at least one, so ThreadWaits won't go away
-
-      final int numberOfThreads = 100;
-      final int numberOfAdds = 100;
-
-      class ThreadWait extends Thread {
-
-         private volatile boolean waiting = true;
-
-         @Override
-         public void run() {
-            try {
-               if (!latch.await(5000)) {
-                  System.err.println("Latch timed out");
-               }
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-            }
-            waiting = false;
-         }
-      }
-
-      class ThreadAdd extends Thread {
-
-         private final CountDownLatch latchReady;
-
-         private final CountDownLatch latchStart;
-
-         ThreadAdd(final CountDownLatch latchReady, final CountDownLatch latchStart) {
-            this.latchReady = latchReady;
-            this.latchStart = latchStart;
-         }
-
-         @Override
-         public void run() {
-            try {
-               latchReady.countDown();
-               // Everybody should start at the same time, to worse concurrency
-               // effects
-               latchStart.await();
-               for (int i = 0; i < numberOfAdds; i++) {
-                  latch.countUp();
-               }
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-            }
-         }
-      }
-
-      CountDownLatch latchReady = new CountDownLatch(numberOfThreads);
-      CountDownLatch latchStart = new CountDownLatch(1);
-
-      ThreadAdd[] threadAdds = new ThreadAdd[numberOfThreads];
-      ThreadWait[] waits = new ThreadWait[numberOfThreads];
-
-      for (int i = 0; i < numberOfThreads; i++) {
-         threadAdds[i] = new ThreadAdd(latchReady, latchStart);
-         threadAdds[i].start();
-         waits[i] = new ThreadWait();
-         waits[i].start();
-      }
-
-      latchReady.await();
-      latchStart.countDown();
-
-      for (int i = 0; i < numberOfThreads; i++) {
-         threadAdds[i].join();
-      }
-
-      for (int i = 0; i < numberOfThreads; i++) {
-         Assert.assertTrue(waits[i].waiting);
-      }
-
-      Assert.assertEquals(numberOfThreads * numberOfAdds + 1, latch.getCount());
-
-      class ThreadDown extends Thread {
-
-         private final CountDownLatch latchReady;
-
-         private final CountDownLatch latchStart;
-
-         ThreadDown(final CountDownLatch latchReady, final CountDownLatch latchStart) {
-            this.latchReady = latchReady;
-            this.latchStart = latchStart;
-         }
-
-         @Override
-         public void run() {
-            try {
-               latchReady.countDown();
-               // Everybody should start at the same time, to worse concurrency
-               // effects
-               latchStart.await();
-               for (int i = 0; i < numberOfAdds; i++) {
-                  latch.countDown();
-               }
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-            }
-         }
-      }
-
-      latchReady = new CountDownLatch(numberOfThreads);
-      latchStart = new CountDownLatch(1);
-
-      ThreadDown[] down = new ThreadDown[numberOfThreads];
-
-      for (int i = 0; i < numberOfThreads; i++) {
-         down[i] = new ThreadDown(latchReady, latchStart);
-         down[i].start();
-      }
-
-      latchReady.await();
-      latchStart.countDown();
-
-      for (int i = 0; i < numberOfThreads; i++) {
-         down[i].join();
-      }
-
-      Assert.assertEquals(1, latch.getCount());
-
-      for (int i = 0; i < numberOfThreads; i++) {
-         Assert.assertTrue(waits[i].waiting);
-      }
-
-      latch.countDown();
-
-      for (int i = 0; i < numberOfThreads; i++) {
-         waits[i].join();
-      }
-
-      Assert.assertEquals(0, latch.getCount());
-
-      for (int i = 0; i < numberOfThreads; i++) {
-         Assert.assertFalse(waits[i].waiting);
-      }
-   }
-
-   @Test
-   public void testReuseLatch() throws Exception {
-      final ReusableLatch latch = new ReusableLatch(5);
-      for (int i = 0; i < 5; i++) {
-         latch.countDown();
-      }
-
-      latch.countUp();
-
-      class ThreadWait extends Thread {
-
-         private volatile boolean waiting = false;
-
-         private volatile Exception e;
-
-         private final CountDownLatch readyLatch = new CountDownLatch(1);
-
-         @Override
-         public void run() {
-            waiting = true;
-            readyLatch.countDown();
-            try {
-               if (!latch.await(1000)) {
-                  System.err.println("Latch timed out!");
-               }
-            }
-            catch (Exception e) {
-               e.printStackTrace();
-               this.e = e;
-            }
-            waiting = false;
-         }
-      }
-
-      ThreadWait t = new ThreadWait();
-      t.start();
-
-      t.readyLatch.await();
-
-      Assert.assertEquals(true, t.waiting);
-
-      latch.countDown();
-
-      t.join();
-
-      Assert.assertEquals(false, t.waiting);
-
-      Assert.assertNull(t.e);
-
-      latch.countUp();
-
-      t = new ThreadWait();
-      t.start();
-
-      t.readyLatch.await();
-
-      Assert.assertEquals(true, t.waiting);
-
-      latch.countDown();
-
-      t.join();
-
-      Assert.assertEquals(false, t.waiting);
-
-      Assert.assertNull(t.e);
-
-      Assert.assertTrue(latch.await(1000));
-
-      Assert.assertEquals(0, latch.getCount());
-
-      latch.countDown();
-
-      Assert.assertEquals(0, latch.getCount());
-
-   }
-
-   @Test
-   public void testTimeout() throws Exception {
-      ReusableLatch latch = new ReusableLatch();
-
-      latch.countUp();
-
-      long start = System.currentTimeMillis();
-      Assert.assertFalse(latch.await(1000));
-      long end = System.currentTimeMillis();
-
-      Assert.assertTrue("Timeout didn't work correctly", end - start >= 1000 && end - start < 2000);
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/SimpleServerAbstractTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/SimpleServerAbstractTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/SimpleServerAbstractTest.java
deleted file mode 100644
index 1bfa0b4..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/SimpleServerAbstractTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.util;
-
-import org.proton.plug.test.AbstractJMSTest;
-import org.proton.plug.test.Constants;
-import org.proton.plug.test.invm.InVMTestConnector;
-import org.proton.plug.test.minimalclient.Connector;
-import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
-import org.proton.plug.test.minimalserver.DumbServer;
-import org.proton.plug.test.minimalserver.MinimalServer;
-import org.junit.After;
-import org.junit.Before;
-
-public class SimpleServerAbstractTest {
-
-   protected final boolean useSASL;
-   protected final boolean useInVM;
-   protected MinimalServer server = new MinimalServer();
-
-   public SimpleServerAbstractTest(boolean useSASL, boolean useInVM) {
-      this.useSASL = useSASL;
-      this.useInVM = useInVM;
-   }
-
-   @Before
-   public void setUp() throws Exception {
-      DumbServer.clear();
-      AbstractJMSTest.forceGC();
-      if (!useInVM) {
-         server.start("127.0.0.1", Constants.PORT, useSASL);
-      }
-
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      if (!useInVM) {
-         server.stop();
-      }
-      DumbServer.clear();
-   }
-
-   protected Connector newConnector() {
-      if (useInVM) {
-         return new InVMTestConnector();
-      }
-      else {
-         return new SimpleAMQPConnector();
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-protocols/pom.xml b/artemis-protocols/pom.xml
index 3eb9ade..29289ac 100644
--- a/artemis-protocols/pom.xml
+++ b/artemis-protocols/pom.xml
@@ -36,7 +36,6 @@
       <module>artemis-amqp-protocol</module>
       <module>artemis-stomp-protocol</module>
       <module>artemis-openwire-protocol</module>
-      <module>artemis-proton-plug</module>
       <module>artemis-hornetq-protocol</module>
       <module>artemis-hqclient-protocol</module>
       <module>artemis-mqtt-protocol</module>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 4f56ca8..9ac3a72 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -153,18 +153,6 @@
       </dependency>
       <dependency>
          <groupId>org.apache.activemq</groupId>
-         <artifactId>artemis-proton-plug</artifactId>
-         <version>${project.version}</version>
-      </dependency>
-      <dependency>
-         <groupId>org.apache.activemq</groupId>
-         <artifactId>artemis-proton-plug</artifactId>
-         <version>${project.version}</version>
-         <scope>test</scope>
-         <type>test-jar</type>
-      </dependency>
-      <dependency>
-         <groupId>org.apache.activemq</groupId>
          <artifactId>artemis-hornetq-protocol</artifactId>
          <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonMaxFrameSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonMaxFrameSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonMaxFrameSizeTest.java
new file mode 100644
index 0000000..9f22362
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonMaxFrameSizeTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ProtonMaxFrameSizeTest extends ProtonTestBase {
+
+   private static final int FRAME_SIZE = 512;
+
+   @Override
+   protected void configureAmqp(Map<String, Object> params) {
+      params.put("maxFrameSize", FRAME_SIZE);
+   }
+
+   @Test
+   public void testMultipleTransfers() throws Exception {
+
+      String testQueueName = "ConnectionFrameSize";
+      int nMsgs = 200;
+
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+
+
+      AmqpConnection amqpConnection = client.createConnection();
+
+      try {
+         amqpConnection.connect();
+
+         AmqpSession session = amqpConnection.createSession();
+         AmqpSender sender = session.createSender("jms.queue." + testQueueName);
+
+         final int payload = FRAME_SIZE * 16;
+
+         for (int i = 0; i < nMsgs; ++i) {
+            AmqpMessage message = createAmqpMessage((byte) 'A', payload);
+            sender.send(message);
+         }
+
+         int count = getMessageCount(server.getPostOffice(), "jms.queue." + testQueueName);
+         assertEquals(nMsgs, count);
+
+         AmqpReceiver receiver = session.createReceiver("jms.queue." + testQueueName);
+         receiver.flow(nMsgs);
+
+         for (int i = 0; i < nMsgs; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull("failed at " + i, message);
+            MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
+            Data data = (Data) wrapped.getBody();
+            System.out.println("received : message: " + data.getValue().getLength());
+            assertEquals(payload, data.getValue().getLength());
+            message.accept();
+         }
+
+      }
+      finally {
+         amqpConnection.close();
+      }
+   }
+
+   private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
+      AmqpMessage message = new AmqpMessage();
+      byte[] payload = new byte[payloadSize];
+      for (int i = 0; i < payload.length; i++) {
+         payload[i] = value;
+      }
+      message.setBytes(payload);
+      return message;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
new file mode 100644
index 0000000..c8fc454
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import java.util.Map;
+
+
+public class ProtonPubSubTest extends ProtonTestBase {
+   private final String prefix = "foo.bar.";
+   private final String pubAddress = "pubAddress";
+   private final String prefixedPubAddress = prefix + "pubAddress";
+   private final SimpleString ssPubAddress = new SimpleString(pubAddress);
+   private final SimpleString ssprefixedPubAddress = new SimpleString(prefixedPubAddress);
+   private Connection connection;
+   private JmsConnectionFactory factory;
+
+   @Override
+   protected void configureAmqp(Map<String, Object> params) {
+      params.put("pubSubPrefix", prefix);
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true);
+      server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true);
+      factory = new JmsConnectionFactory("amqp://localhost:5672");
+      factory.setClientID("myClientID");
+      connection = factory.createConnection();
+      connection.setExceptionListener(new ExceptionListener() {
+         @Override
+         public void onException(JMSException exception) {
+            exception.printStackTrace();
+         }
+      });
+
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      try {
+         Thread.sleep(250);
+         if (connection != null) {
+            connection.close();
+         }
+      }
+      finally {
+         super.tearDown();
+      }
+   }
+
+   @Test
+   public void testNonDurablePubSub() throws Exception {
+      int numMessages = 100;
+      Topic topic = createTopic(pubAddress);
+      TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer sub = session.createSubscriber(topic);
+
+      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = sendSession.createProducer(topic);
+      connection.start();
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(sendSession.createTextMessage("message:" + i));
+      }
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage receive = (TextMessage) sub.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+      }
+   }
+
+   @Test
+   public void testNonDurableMultiplePubSub() throws Exception {
+      int numMessages = 100;
+      Topic topic = createTopic(pubAddress);
+      TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer sub = session.createSubscriber(topic);
+      MessageConsumer sub2 = session.createSubscriber(topic);
+      MessageConsumer sub3 = session.createSubscriber(topic);
+
+      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = sendSession.createProducer(topic);
+      connection.start();
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(sendSession.createTextMessage("message:" + i));
+      }
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage receive = (TextMessage) sub.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+         receive = (TextMessage) sub2.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+         receive = (TextMessage) sub3.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+      }
+   }
+
+
+   @Test
+   public void testDurablePubSub() throws Exception {
+      int numMessages = 100;
+      Topic topic = createTopic(pubAddress);
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+
+      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = sendSession.createProducer(topic);
+      connection.start();
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(sendSession.createTextMessage("message:" + i));
+      }
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage receive = (TextMessage) sub.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+      }
+   }
+
+   @Test
+   public void testDurableMultiplePubSub() throws Exception {
+      int numMessages = 100;
+      Topic topic = createTopic(pubAddress);
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+      TopicSubscriber sub2 = session.createDurableSubscriber(topic, "myPubId2");
+      TopicSubscriber sub3 = session.createDurableSubscriber(topic, "myPubId3");
+
+      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = sendSession.createProducer(topic);
+      connection.start();
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(sendSession.createTextMessage("message:" + i));
+      }
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage receive = (TextMessage) sub.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+         receive = (TextMessage) sub2.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+         receive = (TextMessage) sub3.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+      }
+   }
+
+   @Test
+   public void testDurablePubSubReconnect() throws Exception {
+      int numMessages = 100;
+      Topic topic = createTopic(pubAddress);
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+
+      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = sendSession.createProducer(topic);
+      connection.start();
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(sendSession.createTextMessage("message:" + i));
+      }
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage receive = (TextMessage) sub.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+      }
+      connection.close();
+      connection = factory.createConnection();
+      connection.setExceptionListener(new ExceptionListener() {
+         @Override
+         public void onException(JMSException exception) {
+            exception.printStackTrace();
+         }
+      });
+      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      sub = session.createDurableSubscriber(topic, "myPubId");
+
+      sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      producer = sendSession.createProducer(topic);
+      connection.start();
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(sendSession.createTextMessage("message:" + i));
+      }
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage receive = (TextMessage) sub.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+      }
+   }
+
+   @Test
+   public void testDurablePubSubUnsubscribe() throws Exception {
+      int numMessages = 100;
+      Topic topic = createTopic(pubAddress);
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+
+      Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = sendSession.createProducer(topic);
+      connection.start();
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(sendSession.createTextMessage("message:" + i));
+      }
+      for (int i = 0; i < numMessages; i++) {
+         TextMessage receive = (TextMessage) sub.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals(receive.getText(), "message:" + i);
+      }
+      sub.close();
+      session.unsubscribe("myPubId");
+   }
+
+
+   private javax.jms.Topic createTopic(String address) throws Exception {
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      try {
+         return session.createTopic(address);
+      }
+      finally {
+         session.close();
+      }
+   }
+}