You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/27 13:54:31 UTC
[04/15] activemq-artemis git commit: ARTEMIS-751 Simplification of
the AMQP implementation
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java
deleted file mode 100644
index 59772f0..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.minimalclient;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.concurrent.Executors;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-import org.proton.plug.AMQPClientConnectionContext;
-import org.proton.plug.context.client.ProtonClientConnectionContextFactory;
-
-public class SimpleAMQPConnector implements Connector {
-
- private Bootstrap bootstrap;
-
- @Override
- public void start() {
-
- bootstrap = new Bootstrap();
- bootstrap.channel(NioSocketChannel.class);
- bootstrap.group(new NioEventLoopGroup(10));
-
- bootstrap.handler(new ChannelInitializer<Channel>() {
- @Override
- public void initChannel(Channel channel) throws Exception {
- }
- });
- }
-
- @Override
- public AMQPClientConnectionContext connect(String host, int port) throws Exception {
- SocketAddress remoteDestination = new InetSocketAddress(host, port);
-
- ChannelFuture future = bootstrap.connect(remoteDestination);
-
- future.awaitUninterruptibly();
-
- AMQPClientSPI clientConnectionSPI = new AMQPClientSPI(future.channel());
-
- final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
-
- future.channel().pipeline().addLast(new ChannelDuplexHandler() {
- @Override
- public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
- ByteBuf buffer = (ByteBuf) msg;
- connection.inputBuffer(buffer);
- }
- });
-
- return connection;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java
deleted file mode 100644
index 10499ef..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.minimalserver;
-
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingDeque;
-
-public class DumbServer {
-
- static ConcurrentMap<String, BlockingDeque<Object>> maps = new ConcurrentHashMap<>();
-
- public static BlockingDeque<Object> getQueue(String name) {
- BlockingDeque<Object> q = maps.get(name);
- if (q == null) {
- q = new LinkedBlockingDeque<>();
- BlockingDeque<Object> oldValue = maps.putIfAbsent(name, q);
- if (oldValue != null) {
- q = oldValue;
- }
- }
- return q;
- }
-
- public static void clear() {
- for (BlockingDeque<Object> queue : maps.values()) {
- // We clear the queues just in case there is a component holding it
- queue.clear();
- }
- maps.clear();
- }
-
- public static void put(String queue, Object message) {
- getQueue(queue).add(message);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
deleted file mode 100644
index 6325ad7..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.minimalserver;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.engine.Connection;
-import org.jboss.logging.Logger;
-import org.proton.plug.AMQPConnectionContext;
-import org.proton.plug.AMQPConnectionCallback;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.SASLResult;
-import org.proton.plug.ServerSASL;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.sasl.AnonymousServerSASL;
-import org.proton.plug.sasl.ServerSASLPlain;
-import org.proton.plug.util.ByteUtil;
-import org.proton.plug.util.ReusableLatch;
-
-public class MinimalConnectionSPI implements AMQPConnectionCallback {
-
- private static final Logger logger = Logger.getLogger(MinimalConnectionSPI.class);
- Channel channel;
-
- private AMQPConnectionContext connection;
-
- public MinimalConnectionSPI(Channel channel) {
- this.channel = channel;
- }
-
- ExecutorService executorService = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
-
- @Override
- public void close() {
- executorService.shutdown();
- }
-
- @Override
- public void setConnection(AMQPConnectionContext connection) {
- this.connection = connection;
- }
-
- @Override
- public AMQPConnectionContext getConnection() {
- return connection;
- }
-
- final ReusableLatch latch = new ReusableLatch(0);
-
- @Override
- public ServerSASL[] getSASLMechnisms() {
- return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()};
- }
-
- @Override
- public boolean isSupportsAnonymous() {
- return true;
- }
-
- @Override
- public void sendSASLSupported() {
-
- }
-
- @Override
- public boolean validateConnection(Connection connection, SASLResult saslResult) {
- return true;
- }
-
- @Override
- public Binary newTransaction() {
- return null;
- }
-
- @Override
- public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
- return null;
- }
-
- @Override
- public void removeTransaction(Binary txid) {
-
- }
-
- @Override
- public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
- final int bufferSize = bytes.writerIndex();
-
- if (logger.isTraceEnabled()) {
- // some debug
- byte[] frame = new byte[bytes.writerIndex()];
- int readerOriginalPos = bytes.readerIndex();
-
- bytes.getBytes(0, frame);
-
- try {
- System.err.println("Buffer Outgoing: " + "\n" + ByteUtil.formatGroup(ByteUtil.bytesToHex(frame), 4, 16));
- }
- catch (Exception e) {
- e.printStackTrace();
- }
-
- bytes.readerIndex(readerOriginalPos);
- }
-
- latch.countUp();
- // ^^ debug
-
- channel.writeAndFlush(bytes).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- latch.countDown();
-
- // https://issues.apache.org/jira/browse/PROTON-645
- // connection.outputDone(bufferSize);
- // if (connection.capacity() > 0)
- // {
- // channel.read();
- // }
- }
- });
-
- channel.flush();
-
- if (connection.isSyncOnFlush()) {
- try {
- if (!latch.await(5, TimeUnit.SECONDS)) {
- // TODO logs
- System.err.println("Flush took longer than 5 seconds!!!");
- }
- }
- catch (Throwable e) {
- e.printStackTrace();
- }
- }
- connection.outputDone(bufferSize);
-
- // if (connection.capacity() > 0)
- // {
- // channel.read();
- // }
- }
-
- @Override
- public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
- return new MinimalSessionSPI();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java
deleted file mode 100644
index 8729cb4..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.minimalserver;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.concurrent.Executors;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.ServerChannel;
-import io.netty.channel.group.ChannelGroup;
-import io.netty.channel.group.ChannelGroupFuture;
-import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.ByteToMessageDecoder;
-import io.netty.util.ResourceLeakDetector;
-import io.netty.util.concurrent.GlobalEventExecutor;
-
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-import org.proton.plug.AMQPServerConnectionContext;
-import org.proton.plug.context.server.ProtonServerConnectionContextFactory;
-import org.proton.plug.test.Constants;
-
-/**
- * A Netty TCP Acceptor that supports SSL
- */
-public class MinimalServer {
-
- static {
- // Disable resource leak detection for performance reasons by default
- ResourceLeakDetector.setEnabled(false);
- }
-
- private Class<? extends ServerChannel> channelClazz;
-
- private EventLoopGroup eventLoopGroup;
-
- private volatile ChannelGroup serverChannelGroup;
-
- private volatile ChannelGroup channelGroup;
-
- private ServerBootstrap bootstrap;
-
- private String host;
-
- private boolean sasl;
-
- // Constants.PORT is the default here
- private int port;
-
- public synchronized void start(String host, int port, final boolean sasl) throws Exception {
- this.host = host;
- this.port = port;
- this.sasl = sasl;
-
- if (channelClazz != null) {
- // Already started
- return;
- }
-
- int threadsToUse = Runtime.getRuntime().availableProcessors() * 3;
- channelClazz = NioServerSocketChannel.class;
- eventLoopGroup = new NioEventLoopGroup(threadsToUse, new SimpleServerThreadFactory("simple-server", true, Thread.currentThread().getContextClassLoader()));
-
- bootstrap = new ServerBootstrap();
- bootstrap.group(eventLoopGroup);
- bootstrap.channel(channelClazz);
-
- ChannelInitializer<Channel> factory = new ChannelInitializer<Channel>() {
- @Override
- public void initChannel(Channel channel) throws Exception {
- ChannelPipeline pipeline = channel.pipeline();
- pipeline.addLast("amqp-handler", new ProtocolDecoder());
- }
- };
- bootstrap.childHandler(factory);
-
- bootstrap.option(ChannelOption.SO_REUSEADDR, true).
- childOption(ChannelOption.SO_REUSEADDR, true).
- childOption(ChannelOption.SO_KEEPALIVE, true).
- // childOption(ChannelOption.AUTO_READ, false).
- childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-
- channelGroup = new DefaultChannelGroup("activemq-accepted-channels", GlobalEventExecutor.INSTANCE);
-
- serverChannelGroup = new DefaultChannelGroup("activemq-acceptor-channels", GlobalEventExecutor.INSTANCE);
-
- SocketAddress address;
- address = new InetSocketAddress(host, port);
- Channel serverChannel = bootstrap.bind(address).syncUninterruptibly().channel();
- serverChannelGroup.add(serverChannel);
-
- }
-
- class ProtocolDecoder extends ByteToMessageDecoder {
-
- AMQPServerConnectionContext connection;
-
- ProtocolDecoder() {
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- super.channelActive(ctx);
- connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new MinimalConnectionSPI(ctx.channel()), Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
- //ctx.read();
- }
-
- @Override
- protected void decode(final ChannelHandlerContext ctx, ByteBuf byteIn, List<Object> out) throws Exception {
- connection.inputBuffer(byteIn);
- ctx.flush();
- // if (connection.capacity() > 0)
- // {
- // ctx.read();
- // }
- }
- }
-
- public synchronized void stop() {
- if (serverChannelGroup != null) {
- serverChannelGroup.close().awaitUninterruptibly();
- }
-
- if (channelGroup != null) {
- ChannelGroupFuture future = channelGroup.close().awaitUninterruptibly();
- }
- }
-
- public static void main(String[] arg) {
- MinimalServer server = new MinimalServer();
- try {
- server.start("127.0.0.1", Constants.PORT, true);
-
- while (true) {
- Thread.sleep(360000000);
- }
- }
- catch (Throwable e) {
- e.printStackTrace();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
deleted file mode 100644
index d366c5b..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.minimalserver;
-
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.message.ProtonJMessage;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.AMQPSessionContext;
-import org.proton.plug.SASLResult;
-import org.proton.plug.context.ProtonPlugSender;
-import org.proton.plug.context.server.ProtonServerSessionContext;
-import org.proton.plug.util.ProtonServerMessage;
-
-public class MinimalSessionSPI implements AMQPSessionCallback {
-
- private SASLResult result;
- ProtonServerSessionContext session;
-
- @Override
- public void init(AMQPSessionContext session, SASLResult result) {
- this.session = (ProtonServerSessionContext) session;
- this.result = result;
- }
-
- @Override
- public void start() {
- }
-
- static AtomicInteger tempQueueGenerator = new AtomicInteger(0);
-
- @Override
- public String tempQueueName() {
- return "TempQueueName" + tempQueueGenerator.incrementAndGet();
- }
-
- @Override
- public Object createSender(ProtonPlugSender plugSender, String queue, String filer, boolean browserOnly) {
- Consumer consumer = new Consumer(DumbServer.getQueue(queue));
- return consumer;
- }
-
- @Override
- public void startSender(Object brokerConsumer) {
- ((Consumer) brokerConsumer).start();
- }
-
- @Override
- public void createTemporaryQueue(String queueName) {
-
- }
-
- @Override
- public void createDurableQueue(String address, String queueName, String filter) throws Exception {
-
- }
-
- @Override
- public void offerProducerCredit(String address, int credits, int threshold, Receiver receiver) {
-
- }
-
- @Override
- public void createTemporaryQueue(String address, String queueName, String filter) throws Exception {
-
- }
-
- @Override
- public void deleteQueue(String address) throws Exception {
-
- }
-
- @Override
- public String getPubSubPrefix() {
- return null;
- }
-
- @Override
- public void onFlowConsumer(Object consumer, int credits, boolean drain) {
- }
-
- @Override
- public QueueQueryResult queueQuery(String queueName, boolean autoCreate) {
- return new QueueQueryResult(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), false, false, null, 0, 0, false);
- }
-
- @Override
- public boolean bindingQuery(String address) throws Exception {
- return true;
- }
-
- @Override
- public void closeSender(Object brokerConsumer) {
- ((Consumer) brokerConsumer).close();
- }
-
- @Override
- public ProtonJMessage encodeMessage(Object message, int deliveryCount) {
- // We are storing internally as EncodedMessage on this minimalserver server
- return (ProtonServerMessage) message;
- }
-
- @Override
- public Transaction getTransaction(Binary txid) {
- return new TransactionImpl(new NullStorageManager());
- }
-
- @Override
- public Binary newTransaction() {
- return null;
- }
-
- @Override
- public void commitTX(Binary txid) throws Exception {
-
- }
-
- @Override
- public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception {
-
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public void ack(Transaction tx, Object brokerConsumer, Object message) {
-
- }
-
- @Override
- public void cancel(Object brokerConsumer, Object message, boolean updateCounts) {
-
- }
-
- @Override
- public void resumeDelivery(Object consumer) {
- System.out.println("Resume delivery!!!");
- ((Consumer) consumer).start();
- }
-
- @Override
- public void serverSend(Transaction tx, Receiver receiver, Delivery delivery, String address, int messageFormat, ByteBuf buffer) {
- ProtonServerMessage serverMessage = new ProtonServerMessage();
- serverMessage.decode(buffer.nioBuffer());
-
- BlockingDeque<Object> queue = DumbServer.getQueue(address);
- queue.add(serverMessage);
- }
-
- class Consumer {
-
- final BlockingDeque<Object> queue;
-
- Consumer(BlockingDeque<Object> queue) {
- this.queue = queue;
- }
-
- boolean running = false;
- volatile Thread thread;
-
- public void close() {
- System.out.println("Closing!!!");
- running = false;
- if (thread != null && Thread.currentThread() != thread) {
- try {
- thread.join(1000);
- }
- catch (Throwable ignored) {
- }
- }
-
- thread = null;
- }
-
- public synchronized void start() {
- running = true;
- if (thread == null) {
- System.out.println("Start!!!");
- thread = new Thread() {
- @Override
- public void run() {
- try {
- while (running) {
- Object msg = queue.poll(1, TimeUnit.SECONDS);
-
- if (msg != null) {
- session.serverDelivery(msg, Consumer.this, 1);
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
- thread.start();
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/SimpleServerThreadFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/SimpleServerThreadFactory.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/SimpleServerThreadFactory.java
deleted file mode 100644
index ac1d28f..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/SimpleServerThreadFactory.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.minimalserver;
-
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public final class SimpleServerThreadFactory implements ThreadFactory {
-
- private final ThreadGroup group;
-
- private final AtomicInteger threadCount = new AtomicInteger(0);
-
- private final int threadPriority;
-
- private final boolean daemon;
-
- private final ClassLoader tccl;
-
- public SimpleServerThreadFactory(final String groupName, final boolean daemon, final ClassLoader tccl) {
- group = new ThreadGroup(groupName + "-" + System.identityHashCode(this));
-
- this.threadPriority = Thread.NORM_PRIORITY;
-
- this.tccl = tccl;
-
- this.daemon = daemon;
- }
-
- @Override
- public Thread newThread(final Runnable command) {
- final Thread t;
- // attach the thread to a group only if there is no security manager:
- // when sandboxed, the code does not have the RuntimePermission modifyThreadGroup
- if (System.getSecurityManager() == null) {
- t = new Thread(group, command, "Thread-" + threadCount.getAndIncrement() + " (" + group.getName() + ")");
- }
- else {
- t = new Thread(command, "Thread-" + threadCount.getAndIncrement());
- }
-
- AccessController.doPrivileged(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- t.setDaemon(daemon);
- t.setPriority(threadPriority);
- return null;
- }
- });
-
- try {
- AccessController.doPrivileged(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- t.setContextClassLoader(tccl);
- return null;
- }
- });
- }
- catch (java.security.AccessControlException e) {
- e.printStackTrace();
- }
-
- return t;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/sasl/PlainSASLTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/sasl/PlainSASLTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/sasl/PlainSASLTest.java
deleted file mode 100644
index a085eab..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/sasl/PlainSASLTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.sasl;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.proton.plug.sasl.ClientSASLPlain;
-import org.proton.plug.sasl.PlainSASLResult;
-import org.proton.plug.sasl.ServerSASLPlain;
-
-public class PlainSASLTest {
-
- @Test
- public void testPlain() {
- ClientSASLPlain plainSASL = new ClientSASLPlain("user-me", "password-secret");
- byte[] bytesResult = plainSASL.getBytes();
-
- ServerSASLPlain serverSASLPlain = new ServerSASLPlain();
- PlainSASLResult result = (PlainSASLResult) serverSASLPlain.processSASL(bytesResult);
- Assert.assertEquals("user-me", result.getUser());
- Assert.assertEquals("password-secret", result.getPassword());
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/CreditsSemaphoreTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/CreditsSemaphoreTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/CreditsSemaphoreTest.java
deleted file mode 100644
index b7ae38b..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/CreditsSemaphoreTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.util;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.proton.plug.util.CreditsSemaphore;
-
-public class CreditsSemaphoreTest {
-
- final CreditsSemaphore semaphore = new CreditsSemaphore(10);
-
- final AtomicInteger errors = new AtomicInteger(0);
-
- final AtomicInteger acquired = new AtomicInteger(0);
-
- final CountDownLatch waiting = new CountDownLatch(1);
-
- Thread thread = new Thread() {
- @Override
- public void run() {
- try {
- for (int i = 0; i < 12; i++) {
- if (!semaphore.tryAcquire()) {
- waiting.countDown();
- semaphore.acquire();
- }
- acquired.incrementAndGet();
- }
- }
- catch (Throwable e) {
- e.printStackTrace();
- errors.incrementAndGet();
- }
- }
- };
-
- @Test
- public void testSetAndRelease() throws Exception {
- thread.start();
-
- // 5 seconds would be an eternity here
- Assert.assertTrue(waiting.await(5, TimeUnit.SECONDS));
-
- Assert.assertEquals(0, semaphore.getCredits());
-
- long timeout = System.currentTimeMillis() + 1000;
- while (!semaphore.hasQueuedThreads() && System.currentTimeMillis() < timeout) {
- Thread.sleep(10);
- }
-
- Assert.assertTrue(semaphore.hasQueuedThreads());
-
- semaphore.setCredits(2);
-
- thread.join();
-
- Assert.assertEquals(12, acquired.get());
-
- Assert.assertFalse(semaphore.hasQueuedThreads());
- }
-
- @Test
- public void testDownAndUp() throws Exception {
- thread.start();
-
- // 5 seconds would be an eternity here
- Assert.assertTrue(waiting.await(5, TimeUnit.SECONDS));
-
- Assert.assertEquals(0, semaphore.getCredits());
-
- long timeout = System.currentTimeMillis() + 1000;
- while (!semaphore.hasQueuedThreads() && System.currentTimeMillis() < timeout) {
- Thread.sleep(10);
- }
-
- Assert.assertTrue(semaphore.hasQueuedThreads());
-
- semaphore.release(2);
-
- thread.join();
-
- Assert.assertEquals(12, acquired.get());
-
- Assert.assertFalse(semaphore.hasQueuedThreads());
- }
-
- @Test
- public void testStartedZeroedSetLater() throws Exception {
- semaphore.setCredits(0);
-
- thread.start();
-
- // 5 seconds would be an eternity here
- Assert.assertTrue(waiting.await(5, TimeUnit.SECONDS));
-
- Assert.assertEquals(0, semaphore.getCredits());
-
- long timeout = System.currentTimeMillis() + 1000;
- while (!semaphore.hasQueuedThreads() && System.currentTimeMillis() < timeout) {
- Thread.sleep(10);
- }
-
- Assert.assertTrue(semaphore.hasQueuedThreads());
-
- Assert.assertEquals(0, acquired.get());
-
- semaphore.setCredits(12);
-
- thread.join();
-
- Assert.assertEquals(12, acquired.get());
-
- Assert.assertFalse(semaphore.hasQueuedThreads());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/ReusableLatchTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/ReusableLatchTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/ReusableLatchTest.java
deleted file mode 100644
index 8909b5e..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/ReusableLatchTest.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.util;
-
-import java.util.concurrent.CountDownLatch;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.proton.plug.util.ReusableLatch;
-
-public class ReusableLatchTest {
-
- @Test
- public void testLatchWithParameterizedDown() throws Exception {
- ReusableLatch latch = new ReusableLatch(1000);
-
- latch.countDown(5000);
-
- Assert.assertTrue(latch.await(1000));
-
- Assert.assertEquals(0, latch.getCount());
- }
-
- @Test
- public void testLatchOnSingleThread() throws Exception {
- ReusableLatch latch = new ReusableLatch();
-
- for (int i = 1; i <= 100; i++) {
- latch.countUp();
- Assert.assertEquals(i, latch.getCount());
- }
-
- for (int i = 100; i > 0; i--) {
- Assert.assertEquals(i, latch.getCount());
- latch.countDown();
- Assert.assertEquals(i - 1, latch.getCount());
- }
-
- latch.await();
- }
-
- /**
- * This test will open numberOfThreads threads, and add numberOfAdds on the
- * VariableLatch After those addthreads are finished, the latch count should
- * be numberOfThreads * numberOfAdds Then it will open numberOfThreads
- * threads again releasing numberOfAdds on the VariableLatch After those
- * releaseThreads are finished, the latch count should be 0 And all the
- * waiting threads should be finished also
- *
- * @throws Exception
- */
- @Test
- public void testLatchOnMultiThread() throws Exception {
- final ReusableLatch latch = new ReusableLatch();
-
- latch.countUp(); // We hold at least one, so ThreadWaits won't go away
-
- final int numberOfThreads = 100;
- final int numberOfAdds = 100;
-
- class ThreadWait extends Thread {
-
- private volatile boolean waiting = true;
-
- @Override
- public void run() {
- try {
- if (!latch.await(5000)) {
- System.err.println("Latch timed out");
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- waiting = false;
- }
- }
-
- class ThreadAdd extends Thread {
-
- private final CountDownLatch latchReady;
-
- private final CountDownLatch latchStart;
-
- ThreadAdd(final CountDownLatch latchReady, final CountDownLatch latchStart) {
- this.latchReady = latchReady;
- this.latchStart = latchStart;
- }
-
- @Override
- public void run() {
- try {
- latchReady.countDown();
- // Everybody should start at the same time, to worse concurrency
- // effects
- latchStart.await();
- for (int i = 0; i < numberOfAdds; i++) {
- latch.countUp();
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- CountDownLatch latchReady = new CountDownLatch(numberOfThreads);
- CountDownLatch latchStart = new CountDownLatch(1);
-
- ThreadAdd[] threadAdds = new ThreadAdd[numberOfThreads];
- ThreadWait[] waits = new ThreadWait[numberOfThreads];
-
- for (int i = 0; i < numberOfThreads; i++) {
- threadAdds[i] = new ThreadAdd(latchReady, latchStart);
- threadAdds[i].start();
- waits[i] = new ThreadWait();
- waits[i].start();
- }
-
- latchReady.await();
- latchStart.countDown();
-
- for (int i = 0; i < numberOfThreads; i++) {
- threadAdds[i].join();
- }
-
- for (int i = 0; i < numberOfThreads; i++) {
- Assert.assertTrue(waits[i].waiting);
- }
-
- Assert.assertEquals(numberOfThreads * numberOfAdds + 1, latch.getCount());
-
- class ThreadDown extends Thread {
-
- private final CountDownLatch latchReady;
-
- private final CountDownLatch latchStart;
-
- ThreadDown(final CountDownLatch latchReady, final CountDownLatch latchStart) {
- this.latchReady = latchReady;
- this.latchStart = latchStart;
- }
-
- @Override
- public void run() {
- try {
- latchReady.countDown();
- // Everybody should start at the same time, to worse concurrency
- // effects
- latchStart.await();
- for (int i = 0; i < numberOfAdds; i++) {
- latch.countDown();
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- latchReady = new CountDownLatch(numberOfThreads);
- latchStart = new CountDownLatch(1);
-
- ThreadDown[] down = new ThreadDown[numberOfThreads];
-
- for (int i = 0; i < numberOfThreads; i++) {
- down[i] = new ThreadDown(latchReady, latchStart);
- down[i].start();
- }
-
- latchReady.await();
- latchStart.countDown();
-
- for (int i = 0; i < numberOfThreads; i++) {
- down[i].join();
- }
-
- Assert.assertEquals(1, latch.getCount());
-
- for (int i = 0; i < numberOfThreads; i++) {
- Assert.assertTrue(waits[i].waiting);
- }
-
- latch.countDown();
-
- for (int i = 0; i < numberOfThreads; i++) {
- waits[i].join();
- }
-
- Assert.assertEquals(0, latch.getCount());
-
- for (int i = 0; i < numberOfThreads; i++) {
- Assert.assertFalse(waits[i].waiting);
- }
- }
-
- @Test
- public void testReuseLatch() throws Exception {
- final ReusableLatch latch = new ReusableLatch(5);
- for (int i = 0; i < 5; i++) {
- latch.countDown();
- }
-
- latch.countUp();
-
- class ThreadWait extends Thread {
-
- private volatile boolean waiting = false;
-
- private volatile Exception e;
-
- private final CountDownLatch readyLatch = new CountDownLatch(1);
-
- @Override
- public void run() {
- waiting = true;
- readyLatch.countDown();
- try {
- if (!latch.await(1000)) {
- System.err.println("Latch timed out!");
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- this.e = e;
- }
- waiting = false;
- }
- }
-
- ThreadWait t = new ThreadWait();
- t.start();
-
- t.readyLatch.await();
-
- Assert.assertEquals(true, t.waiting);
-
- latch.countDown();
-
- t.join();
-
- Assert.assertEquals(false, t.waiting);
-
- Assert.assertNull(t.e);
-
- latch.countUp();
-
- t = new ThreadWait();
- t.start();
-
- t.readyLatch.await();
-
- Assert.assertEquals(true, t.waiting);
-
- latch.countDown();
-
- t.join();
-
- Assert.assertEquals(false, t.waiting);
-
- Assert.assertNull(t.e);
-
- Assert.assertTrue(latch.await(1000));
-
- Assert.assertEquals(0, latch.getCount());
-
- latch.countDown();
-
- Assert.assertEquals(0, latch.getCount());
-
- }
-
- @Test
- public void testTimeout() throws Exception {
- ReusableLatch latch = new ReusableLatch();
-
- latch.countUp();
-
- long start = System.currentTimeMillis();
- Assert.assertFalse(latch.await(1000));
- long end = System.currentTimeMillis();
-
- Assert.assertTrue("Timeout didn't work correctly", end - start >= 1000 && end - start < 2000);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/SimpleServerAbstractTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/SimpleServerAbstractTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/SimpleServerAbstractTest.java
deleted file mode 100644
index 1bfa0b4..0000000
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/util/SimpleServerAbstractTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.test.util;
-
-import org.proton.plug.test.AbstractJMSTest;
-import org.proton.plug.test.Constants;
-import org.proton.plug.test.invm.InVMTestConnector;
-import org.proton.plug.test.minimalclient.Connector;
-import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
-import org.proton.plug.test.minimalserver.DumbServer;
-import org.proton.plug.test.minimalserver.MinimalServer;
-import org.junit.After;
-import org.junit.Before;
-
-public class SimpleServerAbstractTest {
-
- protected final boolean useSASL;
- protected final boolean useInVM;
- protected MinimalServer server = new MinimalServer();
-
- public SimpleServerAbstractTest(boolean useSASL, boolean useInVM) {
- this.useSASL = useSASL;
- this.useInVM = useInVM;
- }
-
- @Before
- public void setUp() throws Exception {
- DumbServer.clear();
- AbstractJMSTest.forceGC();
- if (!useInVM) {
- server.start("127.0.0.1", Constants.PORT, useSASL);
- }
-
- }
-
- @After
- public void tearDown() throws Exception {
- if (!useInVM) {
- server.stop();
- }
- DumbServer.clear();
- }
-
- protected Connector newConnector() {
- if (useInVM) {
- return new InVMTestConnector();
- }
- else {
- return new SimpleAMQPConnector();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-protocols/pom.xml b/artemis-protocols/pom.xml
index 3eb9ade..29289ac 100644
--- a/artemis-protocols/pom.xml
+++ b/artemis-protocols/pom.xml
@@ -36,7 +36,6 @@
<module>artemis-amqp-protocol</module>
<module>artemis-stomp-protocol</module>
<module>artemis-openwire-protocol</module>
- <module>artemis-proton-plug</module>
<module>artemis-hornetq-protocol</module>
<module>artemis-hqclient-protocol</module>
<module>artemis-mqtt-protocol</module>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 4f56ca8..9ac3a72 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -153,18 +153,6 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
- <artifactId>artemis-proton-plug</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>artemis-proton-plug</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
<artifactId>artemis-hornetq-protocol</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonMaxFrameSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonMaxFrameSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonMaxFrameSizeTest.java
new file mode 100644
index 0000000..9f22362
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonMaxFrameSizeTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ProtonMaxFrameSizeTest extends ProtonTestBase {
+
+ private static final int FRAME_SIZE = 512;
+
+ @Override
+ protected void configureAmqp(Map<String, Object> params) {
+ params.put("maxFrameSize", FRAME_SIZE);
+ }
+
+ @Test
+ public void testMultipleTransfers() throws Exception {
+
+ String testQueueName = "ConnectionFrameSize";
+ int nMsgs = 200;
+
+ AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+
+
+ AmqpConnection amqpConnection = client.createConnection();
+
+ try {
+ amqpConnection.connect();
+
+ AmqpSession session = amqpConnection.createSession();
+ AmqpSender sender = session.createSender("jms.queue." + testQueueName);
+
+ final int payload = FRAME_SIZE * 16;
+
+ for (int i = 0; i < nMsgs; ++i) {
+ AmqpMessage message = createAmqpMessage((byte) 'A', payload);
+ sender.send(message);
+ }
+
+ int count = getMessageCount(server.getPostOffice(), "jms.queue." + testQueueName);
+ assertEquals(nMsgs, count);
+
+ AmqpReceiver receiver = session.createReceiver("jms.queue." + testQueueName);
+ receiver.flow(nMsgs);
+
+ for (int i = 0; i < nMsgs; ++i) {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull("failed at " + i, message);
+ MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
+ Data data = (Data) wrapped.getBody();
+ System.out.println("received : message: " + data.getValue().getLength());
+ assertEquals(payload, data.getValue().getLength());
+ message.accept();
+ }
+
+ }
+ finally {
+ amqpConnection.close();
+ }
+ }
+
+ private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
+ AmqpMessage message = new AmqpMessage();
+ byte[] payload = new byte[payloadSize];
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = value;
+ }
+ message.setBytes(payload);
+ return message;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
new file mode 100644
index 0000000..c8fc454
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import java.util.Map;
+
+
+public class ProtonPubSubTest extends ProtonTestBase {
+ private final String prefix = "foo.bar.";
+ private final String pubAddress = "pubAddress";
+ private final String prefixedPubAddress = prefix + "pubAddress";
+ private final SimpleString ssPubAddress = new SimpleString(pubAddress);
+ private final SimpleString ssprefixedPubAddress = new SimpleString(prefixedPubAddress);
+ private Connection connection;
+ private JmsConnectionFactory factory;
+
+ @Override
+ protected void configureAmqp(Map<String, Object> params) {
+ params.put("pubSubPrefix", prefix);
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true);
+ server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true);
+ factory = new JmsConnectionFactory("amqp://localhost:5672");
+ factory.setClientID("myClientID");
+ connection = factory.createConnection();
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ exception.printStackTrace();
+ }
+ });
+
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ try {
+ Thread.sleep(250);
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ finally {
+ super.tearDown();
+ }
+ }
+
+ @Test
+ public void testNonDurablePubSub() throws Exception {
+ int numMessages = 100;
+ Topic topic = createTopic(pubAddress);
+ TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer sub = session.createSubscriber(topic);
+
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(topic);
+ connection.start();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sendSession.createTextMessage("message:" + i));
+ }
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage receive = (TextMessage) sub.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ }
+ }
+
+ @Test
+ public void testNonDurableMultiplePubSub() throws Exception {
+ int numMessages = 100;
+ Topic topic = createTopic(pubAddress);
+ TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer sub = session.createSubscriber(topic);
+ MessageConsumer sub2 = session.createSubscriber(topic);
+ MessageConsumer sub3 = session.createSubscriber(topic);
+
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(topic);
+ connection.start();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sendSession.createTextMessage("message:" + i));
+ }
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage receive = (TextMessage) sub.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ receive = (TextMessage) sub2.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ receive = (TextMessage) sub3.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ }
+ }
+
+
+ @Test
+ public void testDurablePubSub() throws Exception {
+ int numMessages = 100;
+ Topic topic = createTopic(pubAddress);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(topic);
+ connection.start();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sendSession.createTextMessage("message:" + i));
+ }
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage receive = (TextMessage) sub.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ }
+ }
+
+ @Test
+ public void testDurableMultiplePubSub() throws Exception {
+ int numMessages = 100;
+ Topic topic = createTopic(pubAddress);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+ TopicSubscriber sub2 = session.createDurableSubscriber(topic, "myPubId2");
+ TopicSubscriber sub3 = session.createDurableSubscriber(topic, "myPubId3");
+
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(topic);
+ connection.start();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sendSession.createTextMessage("message:" + i));
+ }
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage receive = (TextMessage) sub.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ receive = (TextMessage) sub2.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ receive = (TextMessage) sub3.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ }
+ }
+
+ @Test
+ public void testDurablePubSubReconnect() throws Exception {
+ int numMessages = 100;
+ Topic topic = createTopic(pubAddress);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(topic);
+ connection.start();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sendSession.createTextMessage("message:" + i));
+ }
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage receive = (TextMessage) sub.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ }
+ connection.close();
+ connection = factory.createConnection();
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ exception.printStackTrace();
+ }
+ });
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ sub = session.createDurableSubscriber(topic, "myPubId");
+
+ sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = sendSession.createProducer(topic);
+ connection.start();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sendSession.createTextMessage("message:" + i));
+ }
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage receive = (TextMessage) sub.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ }
+ }
+
+ @Test
+ public void testDurablePubSubUnsubscribe() throws Exception {
+ int numMessages = 100;
+ Topic topic = createTopic(pubAddress);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId");
+
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(topic);
+ connection.start();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(sendSession.createTextMessage("message:" + i));
+ }
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage receive = (TextMessage) sub.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals(receive.getText(), "message:" + i);
+ }
+ sub.close();
+ session.unsubscribe("myPubId");
+ }
+
+
+ private javax.jms.Topic createTopic(String address) throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try {
+ return session.createTopic(address);
+ }
+ finally {
+ session.close();
+ }
+ }
+}